[
https://issues.apache.org/jira/browse/BEAM-9116?focusedWorklogId=373002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-373002
]
ASF GitHub Bot logged work on BEAM-9116:
----------------------------------------
Author: ASF GitHub Bot
Created on: 16/Jan/20 13:24
Start Date: 16/Jan/20 13:24
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #10580: [BEAM-9116] Limit
the number of past invocations stored in JobService
URL: https://github.com/apache/beam/pull/10580#discussion_r367413905
##########
File path:
runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobServiceTest.java
##########
@@ -186,7 +193,59 @@ public void testJobSubmissionUsesJobInvokerAndIsSuccess()
throws Exception {
verify(invocation, times(1)).start();
}
+ @Test
+ public void testInvocationCleanup() {
+ final JobApi.GetJobsRequest getJobsRequest =
JobApi.GetJobsRequest.newBuilder().build();
+
+ final int maxRunningJobs = maxInvocationHistory + new Random().nextInt(50);
+ // Store state listeners to be able to complete the invocations
+ final List<Consumer<JobApi.JobStateEvent>> stateListeners = new
ArrayList<>(maxRunningJobs);
+
+ for (int i = 0; i < maxRunningJobs; i++) {
+ when(invocation.getId()).thenReturn(String.valueOf(i));
+ prepareAndRunJob();
+
+ // Retrieve the state listener for this invocation
+ ArgumentCaptor<Consumer<JobApi.JobStateEvent>> stateListener =
+ ArgumentCaptor.forClass(Consumer.class);
+ verify(invocation, times(i +
1)).addStateListener(stateListener.capture());
+ stateListeners.add(stateListener.getValue());
+
+ // Retrieve current list of jobs
+ RecordingObserver<JobApi.GetJobsResponse> recorder = new
RecordingObserver<>();
+ service.getJobs(getJobsRequest, recorder);
+ assertThat(recorder.isSuccessful(), is(true));
+
+ // All running invocations must be available and never be discarded
+ assertThat(recorder.getValue().getJobInfoCount(), is(i + 1));
+ }
+
+ // Complete the invocations one by one and check invocation history
+ JobApi.JobStateEvent terminalEvent =
+
JobApi.JobStateEvent.newBuilder().setState(JobApi.JobState.Enum.DONE).build();
+
+ for (int i = 0; i < maxRunningJobs; i++) {
+ // finish invocation
+ stateListeners.get(i).accept(terminalEvent);
+
+ RecordingObserver<JobApi.GetJobsResponse> recorder = new
RecordingObserver<>();
+ service.getJobs(getJobsRequest, recorder);
+
+ // All running invocations must never be discarded but we keep a maximum
number of completed
+ // invocations
+ int jobInfoCount = recorder.getValue().getJobInfoCount();
+ if (i / maxInvocationHistory == 0) {
+ // Less than maxInvocationHistory invocations have finished
+ assertThat(jobInfoCount, is(maxRunningJobs));
+ } else {
+ // We must start to discard completed invocations to not exceed
maxInvocationHistory
+ assertThat(jobInfoCount, is(maxInvocationHistory + maxRunningJobs - i
- 1));
Review comment:
Sounds reasonable. Please have a look at the updated PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 373002)
Time Spent: 2h 10m (was: 2h)
> Limit the number of past invocations stored in the job service
> --------------------------------------------------------------
>
> Key: BEAM-9116
> URL: https://issues.apache.org/jira/browse/BEAM-9116
> Project: Beam
> Issue Type: Bug
> Components: jobserver
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.19.0
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> The {{InMemoryJobService}} stores an unbounded number of past job
> invocations. When this job server is long-running this can cause memory
> issues, as seen with our test setup for running Python tests.
> We should limit the number of past job invocations via a flag in the job
> server with a sensible default.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)