ajothomas commented on a change in pull request #1483:
URL: https://github.com/apache/samza/pull/1483#discussion_r603602715
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
+ String.format("InputOperator not found in OperatorGraph for %s",
systemStream.toString())));
Review comment:
Added appropriate exceptions and corrected the description of the PR.
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
+ String.format("InputOperator not found in OperatorGraph for %s",
systemStream.toString())));
Review comment:
Right, removed `toString()`
##########
File path:
samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
##########
@@ -80,4 +84,34 @@ public void testExceptionsInProcessInvokesTaskCallback()
throws InterruptedExcep
mockTaskCoordinator, mockTaskCallback);
failureLatch.await();
}
+
+ /**
+ * Tests if the appropriate SamzaException is propagated to the TaskCallback
if there is no InputOperator for a given
+ * SystemStream in the OperatorGraph.
+ * */
+ @Test
+ public void testExceptionIfInputOperatorMissing() throws
NoSuchFieldException, IllegalAccessException {
+ IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class,
RETURNS_DEEP_STUBS);
+ SystemStream testSystemStream = new SystemStream("foo", "bar");
+
when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+ OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+ when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+ // OperatorImplGraph is a private field ('operatorImplGraph') in
StreamOperatorTask and need to be set with
+ // the mocked value reflectively
+ StreamOperatorTask operatorTask = new
StreamOperatorTask(mock(OperatorSpecGraph.class));
+ Field operatorImplGraphField =
operatorTask.getClass().getDeclaredField("operatorImplGraph");
+ operatorImplGraphField.setAccessible(true);
+ operatorImplGraphField.set(operatorTask, mockOperatorImplGraph);
+
+ TaskCallback mockTaskCallback = mock(TaskCallback.class);
+ operatorTask.processAsync(mockIme, mock(MessageCollector.class),
mock(TaskCoordinator.class), mockTaskCallback);
+ // ArgumentCaptor for capturing the SamzaException passed to the callback
+ ArgumentCaptor<Throwable> throwableCaptor =
ArgumentCaptor.forClass(Throwable.class);
+ verify(mockTaskCallback, only()).failure(throwableCaptor.capture());
+ assertEquals(throwableCaptor.getValue().getClass(), SamzaException.class);
+ assertEquals(
+ throwableCaptor.getValue().getMessage(),
+ String.format("InputOperator not found in OperatorGraph for %s",
testSystemStream.toString()));
Review comment:
Done.
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
Review comment:
Makes sense, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]