cameronlee314 commented on a change in pull request #1344: SAMZA-2510: 
Incorrect shutdown status due to race between runloop and process callback 
thread
URL: https://github.com/apache/samza/pull/1344#discussion_r408445170
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
 ##########
 @@ -780,4 +782,27 @@ public void 
testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedExc
 
     commitLatch.await();
   }
+
+  @Test(expected = SamzaException.class)
+  public void testExceptionIsPropagatedAfterShutdown() {
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
+    TestTask task0 = new TestTask(false, false, false, null);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, 
offsetManager, consumerMultiplexer);
+
+    Map<TaskName, TaskInstance> tasks = ImmutableMap.of(taskName0, t0);
+
+    int maxMessagesInFlight = 2;
+    RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, 
maxMessagesInFlight, windowMs, commitMs,
+        callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
+        () -> 0L, false);
+    when(consumerMultiplexer.choose(false))
+        .thenReturn(envelope0)
+        .thenReturn(ssp0EndOfStream)
 
 Review comment:
   Will this test just trigger the `throwable != null` condition (and not the 
`shutdownNow` condition) for exiting the main while loop? It seems like the 
`envelope0` gets submitted and then processing starts, so `throwable` could get 
set before `ssp0EndOfStream` even gets seen. It seems like it depends on when 
the context switch happens.
   If it is non-deterministic what the test is actually covering, then maybe 
just turn this into a `testExceptionIsPropagated` test (I think this class 
might need a test for that anyways).
   If you think it's worth maintaining a test for this race, maybe you could 
put a latch into `TestTask` processing somewhere, so you could make sure 
`shutdownNow` gets set (you could use `RunLoop.shutdown` to set the flag) 
before the `throwable` gets set.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to