[ 
https://issues.apache.org/jira/browse/BEAM-9116?focusedWorklogId=373002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373002
 ]

ASF GitHub Bot logged work on BEAM-9116:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Jan/20 13:24
            Start Date: 16/Jan/20 13:24
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #10580: [BEAM-9116] Limit 
the number of past invocations stored in JobService
URL: https://github.com/apache/beam/pull/10580#discussion_r367413905
 
 

 ##########
 File path: 
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
 ##########
 @@ -186,7 +193,59 @@ public void testJobSubmissionUsesJobInvokerAndIsSuccess() 
throws Exception {
     verify(invocation, times(1)).start();
   }
 
+  @Test
+  public void testInvocationCleanup() {
+    final JobApi.GetJobsRequest getJobsRequest = 
JobApi.GetJobsRequest.newBuilder().build();
+
+    final int maxRunningJobs = maxInvocationHistory + new Random().nextInt(50);
+    // Store state listeners to be able to complete the invocations
+    final List<Consumer<JobApi.JobStateEvent>> stateListeners = new 
ArrayList<>(maxRunningJobs);
+
+    for (int i = 0; i < maxRunningJobs; i++) {
+      when(invocation.getId()).thenReturn(String.valueOf(i));
+      prepareAndRunJob();
+
+      // Retrieve the state listener for this invocation
+      ArgumentCaptor<Consumer<JobApi.JobStateEvent>> stateListener =
+          ArgumentCaptor.forClass(Consumer.class);
+      verify(invocation, times(i + 
1)).addStateListener(stateListener.capture());
+      stateListeners.add(stateListener.getValue());
+
+      // Retrieve current list of jobs
+      RecordingObserver<JobApi.GetJobsResponse> recorder = new 
RecordingObserver<>();
+      service.getJobs(getJobsRequest, recorder);
+      assertThat(recorder.isSuccessful(), is(true));
+
+      // All running invocations must be available and never be discarded
+      assertThat(recorder.getValue().getJobInfoCount(), is(i + 1));
+    }
+
+    // Complete the invocations one by one and check invocation history
+    JobApi.JobStateEvent terminalEvent =
+        
JobApi.JobStateEvent.newBuilder().setState(JobApi.JobState.Enum.DONE).build();
+
+    for (int i = 0; i < maxRunningJobs; i++) {
+      // finish invocation
+      stateListeners.get(i).accept(terminalEvent);
+
+      RecordingObserver<JobApi.GetJobsResponse> recorder = new 
RecordingObserver<>();
+      service.getJobs(getJobsRequest, recorder);
+
+      // All running invocations must never be discarded but we keep a maximum 
number of completed
+      // invocations
+      int jobInfoCount = recorder.getValue().getJobInfoCount();
+      if (i / maxInvocationHistory == 0) {
+        // Less than maxInvocationHistory invocations have finished
+        assertThat(jobInfoCount, is(maxRunningJobs));
+      } else {
+        // We must start to discard completed invocations to not exceed 
maxInvocationHistory
+        assertThat(jobInfoCount, is(maxInvocationHistory + maxRunningJobs - i 
- 1));
 
 Review comment:
   Sounds reasonable. Please have a look at the updated PR.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 373002)
    Time Spent: 2h 10m  (was: 2h)

> Limit the number of past invocations stored in the job service
> --------------------------------------------------------------
>
>                 Key: BEAM-9116
>                 URL: https://issues.apache.org/jira/browse/BEAM-9116
>             Project: Beam
>          Issue Type: Bug
>          Components: jobserver
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.19.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The {{InMemoryJobService}} stores an unbounded number of past job 
> invocations. When this job server is long-running this can cause memory 
> issues, as seen with our test setup for running Python tests.
> We should limit the number of past job invocations via a flag in the job 
> server with a sensible default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to