[ https://issues.apache.org/jira/browse/BEAM-9116?focusedWorklogId=373002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373002 ]
ASF GitHub Bot logged work on BEAM-9116: ---------------------------------------- Author: ASF GitHub Bot Created on: 16/Jan/20 13:24 Start Date: 16/Jan/20 13:24 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10580: [BEAM-9116] Limit the number of past invocations stored in JobService URL: https://github.com/apache/beam/pull/10580#discussion_r367413905 ########## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java ########## @@ -186,7 +193,59 @@ public void testJobSubmissionUsesJobInvokerAndIsSuccess() throws Exception { verify(invocation, times(1)).start(); } + @Test + public void testInvocationCleanup() { + final JobApi.GetJobsRequest getJobsRequest = JobApi.GetJobsRequest.newBuilder().build(); + + final int maxRunningJobs = maxInvocationHistory + new Random().nextInt(50); + // Store state listeners to be able to complete the invocations + final List<Consumer<JobApi.JobStateEvent>> stateListeners = new ArrayList<>(maxRunningJobs); + + for (int i = 0; i < maxRunningJobs; i++) { + when(invocation.getId()).thenReturn(String.valueOf(i)); + prepareAndRunJob(); + + // Retrieve the state listener for this invocation + ArgumentCaptor<Consumer<JobApi.JobStateEvent>> stateListener = + ArgumentCaptor.forClass(Consumer.class); + verify(invocation, times(i + 1)).addStateListener(stateListener.capture()); + stateListeners.add(stateListener.getValue()); + + // Retrieve current list of jobs + RecordingObserver<JobApi.GetJobsResponse> recorder = new RecordingObserver<>(); + service.getJobs(getJobsRequest, recorder); + assertThat(recorder.isSuccessful(), is(true)); + + // All running invocations must be available and never be discarded + assertThat(recorder.getValue().getJobInfoCount(), is(i + 1)); + } + + // Complete the invocations one by one and check invocation history + JobApi.JobStateEvent terminalEvent = + JobApi.JobStateEvent.newBuilder().setState(JobApi.JobState.Enum.DONE).build(); + + for (int i = 0; i < maxRunningJobs; i++) { + // finish invocation + stateListeners.get(i).accept(terminalEvent); + + RecordingObserver<JobApi.GetJobsResponse> recorder = new RecordingObserver<>(); + service.getJobs(getJobsRequest, recorder); + + // All running invocations must never be discarded but we keep a maximum number of completed + // invocations + int jobInfoCount = recorder.getValue().getJobInfoCount(); + if (i / maxInvocationHistory == 0) { + // Less than maxInvocationHistory invocations have finished + assertThat(jobInfoCount, is(maxRunningJobs)); + } else { + // We must start to discard completed invocations to not exceed maxInvocationHistory + assertThat(jobInfoCount, is(maxInvocationHistory + maxRunningJobs - i - 1)); Review comment: Sounds reasonable. Please have a look at the updated PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 373002) Time Spent: 2h 10m (was: 2h) > Limit the number of past invocations stored in the job service > -------------------------------------------------------------- > > Key: BEAM-9116 > URL: https://issues.apache.org/jira/browse/BEAM-9116 > Project: Beam > Issue Type: Bug > Components: jobserver > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Priority: Major > Fix For: 2.19.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > The {{InMemoryJobService}} stores an unbounded number of past job > invocations. When this job server is long-running this can cause memory > issues, as seen with our test setup for running Python tests. > We should limit the number of past job invocations via a flag in the job > server with a sensible default. -- This message was sent by Atlassian Jira (v8.3.4#803005)