cameronlee314 commented on a change in pull request #1366:
URL: https://github.com/apache/samza/pull/1366#discussion_r431474125



##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -181,22 +181,10 @@ class TaskInstance(
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
         format (taskName, incomingMessageSsp))
 
-      if (isAsyncTask) {

Review comment:
       Just double checking here too: Is this check no longer necessary?

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
##########
@@ -52,18 +50,6 @@ public static Runnable 
createRunLoop(scala.collection.immutable.Map<TaskName, Ta
 
     log.info("Got commit milliseconds: {}.", taskCommitMs);
 
-    int asyncTaskCount = taskInstances.values().count(new 
AbstractFunction1<TaskInstance, Object>() {
-      @Override
-      public Boolean apply(TaskInstance t) {
-        return t.isAsyncTask();
-      }
-    });
-
-    // asyncTaskCount should be either 0 or the number of all taskInstances
-    if (asyncTaskCount > 0 && asyncTaskCount < taskInstances.size()) {
-      throw new SamzaException("Mixing StreamTask and AsyncStreamTask is not 
supported");
-    }

Review comment:
       Was this validation moved somewhere else? Or is it no longer necessary?

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
##########
@@ -231,14 +251,12 @@ public void testProcessMultipleTasks() throws Exception {
     when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);

Review comment:
       Is this unused now? Could you please check the other tests to see if 
there are other unused variables too?

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
##########
@@ -390,23 +401,21 @@ public void testCommitSingleTask() throws Exception {
     when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
     OffsetManager offsetManager = mock(OffsetManager.class);
 
-    TestTask task0 = new TestTask(true, true, false, 
task0ProcessedMessagesLatch);
+    TestTask task0 = spy(createTestTask(true, true, false, 
task0ProcessedMessagesLatch, 0, taskName0, ssp0, offsetManager));
     task0.setCommitRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
-    TestTask task1 = new TestTask(true, false, true, 
task1ProcessedMessagesLatch);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, 
offsetManager, consumerMultiplexer);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, 
offsetManager, consumerMultiplexer);
+    TestTask task1 = spy(createTestTask(true, false, true, 
task1ProcessedMessagesLatch, 0, taskName1, ssp1, offsetManager));
 
-    Map<TaskName, TaskInstance> tasks = new HashMap<>();
-    tasks.put(taskName0, t0);
-    tasks.put(taskName1, t1);
+    Map<TaskName, RunLoopTask> tasks = new HashMap<>();
+    tasks.put(taskName0, task0);
+    tasks.put(taskName1, task1);
 
     int maxMessagesInFlight = 1;
     RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, 
maxMessagesInFlight, windowMs, commitMs,
                                             callbackTimeoutMs, 
maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
     //have a null message in between to make sure task0 finishes processing 
and invoke the commit
     when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
         .thenAnswer(x -> {
-            task0ProcessedMessagesLatch.await();
+//            task0ProcessedMessagesLatch.await();

Review comment:
       Was this intended to be removed?

##########
File path: samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
##########
@@ -538,6 +533,9 @@ public void testEndOfStreamWithMultipleTasks() throws 
Exception {
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
+    verify(task0, times(1)).endOfStream(any());

Review comment:
       `times(1)` is the default for `verify`, so you don't need to pass as an 
argument.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to