This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 78e4232 Changed StreamOperatorTask to throw appropriate Exception
when InputOperator is not found for a SystemStream in the OperatorGraph (#1483)
78e4232 is described below
commit 78e42325410c795482176c7e8d2effabd24de227
Author: ajo thomas <[email protected]>
AuthorDate: Tue Mar 30 17:12:13 2021 -0700
Changed StreamOperatorTask to throw appropriate Exception when
InputOperator is not found for a SystemStream in the OperatorGraph (#1483)
Symptom:
Samza job fails with a TaskCallbackTimeoutException if InputOperator is not
found in OperatorGraph for a SystemStream
Cause:
Currently, the pipeline silently fails when there is no InputOperator for a
SystemStream in the OperatorGraph as the code doesn't handle such cases. The
pipeline just silently fails and the job would throw an exception due to sync
task callback timeout (TaskCallbackTimeoutException).
One of the scenarios that causes this is due to non-determinism in the plan
seen by the containers and the AM. It can happen in multiple scenarios where
the transform/operators have non-determinism in generating their names. e.g. we
noticed beam transform appended hashcode.
Changes:
Call failure method of the TaskCallback with a SamzaException with the
appropriate exception message.
---
.../org/apache/samza/task/StreamOperatorTask.java | 17 +++++++++
.../apache/samza/task/TestStreamOperatorTask.java | 40 ++++++++++++++++++++--
2 files changed, 55 insertions(+), 2 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 5a474cd..0079fab 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.task;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -146,6 +147,14 @@ public class StreamOperatorTask implements
AsyncStreamTask, InitableTask, Window
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ final String errMessage = String.format("InputOperator not found in
OperatorGraph for %s. The available input"
+ + " operators are: %s. Please check SystemStream configuration
for the `SystemConsumer` and/or task.inputs"
+ + " task configuration.", systemStream,
operatorImplGraph.getAllInputOperators());
+ LOG.error(errMessage);
+ callback.failure(new SamzaException(errMessage));
}
} catch (Exception e) {
LOG.error("Failed to process the incoming message due to ", e);
@@ -184,6 +193,14 @@ public class StreamOperatorTask implements
AsyncStreamTask, InitableTask, Window
this.taskThreadPool = taskThreadPool;
}
+ /**
+ * Package private setter for private var operatorImplGraph to be used in
TestStreamOperatorTask tests.
+ * */
+ @VisibleForTesting
+ void setOperatorImplGraph(OperatorImplGraph operatorImplGraph) {
+ this.operatorImplGraph = operatorImplGraph;
+ }
+
/* package private for testing */
OperatorImplGraph getOperatorImplGraph() {
return this.operatorImplGraph;
diff --git
a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
index 2d43c63..2ad9f7a 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
@@ -22,17 +22,27 @@ package org.apache.samza.task;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.context.JobContext;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.impl.OperatorImplGraph;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.only;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
public class TestStreamOperatorTask {
public static OperatorImplGraph getOperatorImplGraph(StreamOperatorTask
task) {
@@ -80,4 +90,30 @@ public class TestStreamOperatorTask {
mockTaskCoordinator, mockTaskCallback);
failureLatch.await();
}
+
+ /**
+ * Tests if the appropriate SamzaException is propagated to the TaskCallback
if there is no InputOperator for a given
+ * SystemStream in the OperatorGraph.
+ * */
+ @Test
+ public void testExceptionIfInputOperatorMissing() throws
NoSuchFieldException, IllegalAccessException {
+ IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class,
RETURNS_DEEP_STUBS);
+ SystemStream testSystemStream = new SystemStream("foo", "bar");
+
when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+ OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+ when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+ StreamOperatorTask operatorTask = new
StreamOperatorTask(mock(OperatorSpecGraph.class));
+ operatorTask.setOperatorImplGraph(mockOperatorImplGraph);
+ TaskCallback mockTaskCallback = mock(TaskCallback.class);
+ operatorTask.processAsync(mockIme, mock(MessageCollector.class),
mock(TaskCoordinator.class), mockTaskCallback);
+
+ ArgumentCaptor<Throwable> throwableCaptor =
ArgumentCaptor.forClass(Throwable.class);
+ verify(mockTaskCallback, only()).failure(throwableCaptor.capture());
+ assertEquals(throwableCaptor.getValue().getClass(), SamzaException.class);
+ String expectedErrMessage = String.format("InputOperator not found in
OperatorGraph for %s. The available input"
+ + " operators are: %s. Please check SystemStream configuration for the
`SystemConsumer` and/or task.inputs"
+ + " task configuration.", testSystemStream,
mockOperatorImplGraph.getAllInputOperators());
+ assertEquals(throwableCaptor.getValue().getMessage(), expectedErrMessage);
+ }
}