This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e4232  Changed StreamOperatorTask to throw appropriate Exception 
when InputOperator is not found for a SystemStream in the OperatorGraph (#1483)
78e4232 is described below

commit 78e42325410c795482176c7e8d2effabd24de227
Author: ajo thomas <[email protected]>
AuthorDate: Tue Mar 30 17:12:13 2021 -0700

    Changed StreamOperatorTask to throw appropriate Exception when 
InputOperator is not found for a SystemStream in the OperatorGraph (#1483)
    
    Symptom:
    Samza job fails with a TaskCallbackTimeoutException if InputOperator is not 
found in OperatorGraph for a SystemStream
    
    Cause:
    Currently, the pipeline silently fails when there is no InputOperator for a 
SystemStream in the OperatorGraph as the code doesn't handle such cases. The 
pipeline just silently fails and the job would throw an exception due to sync 
task callback timeout (TaskCallbackTimeoutException).
    One of the scenarios that causes this is due to non-determinism in the plan 
seen by the containers and the AM. It can happen in multiple scenarios where 
the transform/operators have non-determinism in generating their names. e.g. we 
noticed beam transform appended hashcode.
    
    Changes:
    Call failure method of the TaskCallback with a SamzaException with the 
appropriate exception message.
---
 .../org/apache/samza/task/StreamOperatorTask.java  | 17 +++++++++
 .../apache/samza/task/TestStreamOperatorTask.java  | 40 ++++++++++++++++++++--
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 5a474cd..0079fab 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.task;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -146,6 +147,14 @@ public class StreamOperatorTask implements 
AsyncStreamTask, InitableTask, Window
               callback.complete();
             }
           });
+        } else {
+          // If InputOperator is not found in the operator graph for a given 
SystemStream, throw an exception else the
+          // job will timeout due to async task callback timeout 
(TaskCallbackTimeoutException)
+          final String errMessage = String.format("InputOperator not found in 
OperatorGraph for %s. The available input"
+              + " operators are: %s. Please check SystemStream configuration 
for the `SystemConsumer` and/or task.inputs"
+              + " task configuration.", systemStream, 
operatorImplGraph.getAllInputOperators());
+          LOG.error(errMessage);
+          callback.failure(new SamzaException(errMessage));
         }
       } catch (Exception e) {
         LOG.error("Failed to process the incoming message due to ", e);
@@ -184,6 +193,14 @@ public class StreamOperatorTask implements 
AsyncStreamTask, InitableTask, Window
     this.taskThreadPool = taskThreadPool;
   }
 
+  /**
+   * Package private setter for private var operatorImplGraph to be used in 
TestStreamOperatorTask tests.
+   * */
+  @VisibleForTesting
+  void setOperatorImplGraph(OperatorImplGraph operatorImplGraph) {
+    this.operatorImplGraph = operatorImplGraph;
+  }
+
   /* package private for testing */
   OperatorImplGraph getOperatorImplGraph() {
     return this.operatorImplGraph;
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java 
b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 2d43c63..2ad9f7a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -22,17 +22,27 @@ package org.apache.samza.task;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.JobContext;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.impl.OperatorImplGraph;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Clock;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 
 public class TestStreamOperatorTask {
   public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask 
task) {
@@ -80,4 +90,30 @@ public class TestStreamOperatorTask {
         mockTaskCoordinator, mockTaskCallback);
     failureLatch.await();
   }
+
+  /**
+   * Tests if the appropriate SamzaException is propagated to the TaskCallback 
if there is no InputOperator for a given
+   * SystemStream in the OperatorGraph.
+   * */
+  @Test
+  public void testExceptionIfInputOperatorMissing() throws 
NoSuchFieldException, IllegalAccessException {
+    IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class, 
RETURNS_DEEP_STUBS);
+    SystemStream testSystemStream = new SystemStream("foo", "bar");
+    
when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+    OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+    when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+    StreamOperatorTask operatorTask = new 
StreamOperatorTask(mock(OperatorSpecGraph.class));
+    operatorTask.setOperatorImplGraph(mockOperatorImplGraph);
+    TaskCallback mockTaskCallback = mock(TaskCallback.class);
+    operatorTask.processAsync(mockIme, mock(MessageCollector.class), 
mock(TaskCoordinator.class), mockTaskCallback);
+
+    ArgumentCaptor<Throwable> throwableCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+    verify(mockTaskCallback, only()).failure(throwableCaptor.capture());
+    assertEquals(throwableCaptor.getValue().getClass(), SamzaException.class);
+    String expectedErrMessage = String.format("InputOperator not found in 
OperatorGraph for %s. The available input"
+        + " operators are: %s. Please check SystemStream configuration for the 
`SystemConsumer` and/or task.inputs"
+        + " task configuration.", testSystemStream, 
mockOperatorImplGraph.getAllInputOperators());
+    assertEquals(throwableCaptor.getValue().getMessage(), expectedErrMessage);
+  }
 }

Reply via email to