mynameborat commented on a change in pull request #1483:
URL: https://github.com/apache/samza/pull/1483#discussion_r602463185
##########
File path:
samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
##########
@@ -19,26 +19,30 @@
package org.apache.samza.task;
+import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.context.JobContext;
import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.impl.OperatorImplGraph;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
Review comment:
There is mix and match of static star imports in the code. That said, we
have settled on no star imports. Would you mind adding explicit imports
instead? Maybe, clean the mockito ones in the file too :-)
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
+ String.format("InputOperator not found in OperatorGraph for %s",
systemStream.toString())));
Review comment:
nit: `systemStream.toString()` seems redundant. passing `systemStream`
should be sufficient.
##########
File path:
samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
##########
@@ -80,4 +84,34 @@ public void testExceptionsInProcessInvokesTaskCallback()
throws InterruptedExcep
mockTaskCoordinator, mockTaskCallback);
failureLatch.await();
}
+
+ /**
+ * Tests if the appropriate SamzaException is propagated to the TaskCallback
if there is no InputOperator for a given
+ * SystemStream in the OperatorGraph.
+ * */
+ @Test
+ public void testExceptionIfInputOperatorMissing() throws
NoSuchFieldException, IllegalAccessException {
+ IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class,
RETURNS_DEEP_STUBS);
+ SystemStream testSystemStream = new SystemStream("foo", "bar");
+
when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+ OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+ when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+ // OperatorImplGraph is a private field ('operatorImplGraph') in
StreamOperatorTask and need to be set with
+ // the mocked value reflectively
+ StreamOperatorTask operatorTask = new
StreamOperatorTask(mock(OperatorSpecGraph.class));
+ Field operatorImplGraphField =
operatorTask.getClass().getDeclaredField("operatorImplGraph");
+ operatorImplGraphField.setAccessible(true);
+ operatorImplGraphField.set(operatorTask, mockOperatorImplGraph);
Review comment:
would be simpler to add a setter to the field and make it `package
private`? Tests can leverage this in future and don't have to repeat this
boiler plate of reflection code.
##########
File path:
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
##########
@@ -146,6 +146,11 @@ public final void processAsync(IncomingMessageEnvelope
ime, MessageCollector col
callback.complete();
}
});
+ } else {
+ // If InputOperator is not found in the operator graph for a given
SystemStream, throw an exception else the
+ // job will timeout due to async task callback timeout
(TaskCallbackTimeoutException)
+ callback.failure(new SamzaException(
Review comment:
Can we log the error as well similar to the pattern in the rest of the
class before invoking `callback.failure(...)`? Callback failures are
asynchronous and other exceptions can happen as well and hence, having a log
helps drafting timelines during debugging.
##########
File path:
samza-core/src/test/java/org/apache/samza/task/TestStreamOperatorTask.java
##########
@@ -80,4 +84,34 @@ public void testExceptionsInProcessInvokesTaskCallback()
throws InterruptedExcep
mockTaskCoordinator, mockTaskCallback);
failureLatch.await();
}
+
+ /**
+ * Tests if the appropriate SamzaException is propagated to the TaskCallback
if there is no InputOperator for a given
+ * SystemStream in the OperatorGraph.
+ * */
+ @Test
+ public void testExceptionIfInputOperatorMissing() throws
NoSuchFieldException, IllegalAccessException {
+ IncomingMessageEnvelope mockIme = mock(IncomingMessageEnvelope.class,
RETURNS_DEEP_STUBS);
+ SystemStream testSystemStream = new SystemStream("foo", "bar");
+
when(mockIme.getSystemStreamPartition().getSystemStream()).thenReturn(testSystemStream);
+
+ OperatorImplGraph mockOperatorImplGraph = mock(OperatorImplGraph.class);
+ when(mockOperatorImplGraph.getInputOperator(anyObject())).thenReturn(null);
+ // OperatorImplGraph is a private field ('operatorImplGraph') in
StreamOperatorTask and need to be set with
+ // the mocked value reflectively
+ StreamOperatorTask operatorTask = new
StreamOperatorTask(mock(OperatorSpecGraph.class));
+ Field operatorImplGraphField =
operatorTask.getClass().getDeclaredField("operatorImplGraph");
+ operatorImplGraphField.setAccessible(true);
+ operatorImplGraphField.set(operatorTask, mockOperatorImplGraph);
+
+ TaskCallback mockTaskCallback = mock(TaskCallback.class);
+ operatorTask.processAsync(mockIme, mock(MessageCollector.class),
mock(TaskCoordinator.class), mockTaskCallback);
+ // ArgumentCaptor for capturing the SamzaException passed to the callback
+ ArgumentCaptor<Throwable> throwableCaptor =
ArgumentCaptor.forClass(Throwable.class);
+ verify(mockTaskCallback, only()).failure(throwableCaptor.capture());
+ assertEquals(throwableCaptor.getValue().getClass(), SamzaException.class);
+ assertEquals(
+ throwableCaptor.getValue().getMessage(),
+ String.format("InputOperator not found in OperatorGraph for %s",
testSystemStream.toString()));
Review comment:
same as above. `testSystemStream` should be sufficient for the
`String.format(...)` call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]