pnowojski commented on a change in pull request #8858: [hotfix][tests] Change
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297158095
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -311,71 +310,45 @@ public void testCancellationFailsWithBlockingLock()
throws Exception {
@Test
public void testFailingCheckpointStreamOperator() throws Exception {
- final long checkpointId = 42L;
- final long timestamp = 1L;
-
- TaskInfo mockTaskInfo = mock(TaskInfo.class);
-
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
- when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
- Environment mockEnvironment = new
MockEnvironmentBuilder().build();
-
- StreamTask<?, ?> streamTask = new
EmptyStreamTask(mockEnvironment);
- CheckpointMetaData checkpointMetaData = new
CheckpointMetaData(checkpointId, timestamp);
-
- // mock the operators
- StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
- StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
- StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
// mock the returned snapshots
OperatorSnapshotFutures operatorSnapshotResult1 =
mock(OperatorSnapshotFutures.class);
OperatorSnapshotFutures operatorSnapshotResult2 =
mock(OperatorSnapshotFutures.class);
final Exception testException = new Exception("Test exception");
- when(streamOperator1.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenThrow(testException);
-
- OperatorID operatorID1 = new OperatorID();
- OperatorID operatorID2 = new OperatorID();
- OperatorID operatorID3 = new OperatorID();
- when(streamOperator1.getOperatorID()).thenReturn(operatorID1);
- when(streamOperator2.getOperatorID()).thenReturn(operatorID2);
- when(streamOperator3.getOperatorID()).thenReturn(operatorID3);
-
- // set up the task
-
- StreamOperator<?>[] streamOperators = {streamOperator1,
streamOperator2, streamOperator3};
+ LinkedBlockingQueue<Event> eventQueue = new
LinkedBlockingQueue<>();
- OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain
= mock(OperatorChain.class);
-
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+ try (MockEnvironment mockEnvironment = new
MockEnvironmentBuilder().build()) {
+ RunningTask<EmptyStreamTask> task = runTask(() -> new
EmptyStreamTask(
+ mockEnvironment,
+ eventQueue,
+ operatorChain(
+
streamOperatorWithSnapshot(operatorSnapshotResult1),
+
streamOperatorWithSnapshot(operatorSnapshotResult2),
+
streamOperatorWithSnapshotException(testException))));
+ EmptyStreamTask streamTask = task.streamTask;
- Whitebox.setInternalState(streamTask, "isRunning", true);
- Whitebox.setInternalState(streamTask, "lock", new Object());
- Whitebox.setInternalState(streamTask, "operatorChain",
operatorChain);
- Whitebox.setInternalState(streamTask, "cancelables", new
CloseableRegistry());
- Whitebox.setInternalState(streamTask, "configuration", new
StreamConfig(new Configuration()));
- Whitebox.setInternalState(streamTask, "checkpointStorage", new
MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
+ assertThat(eventQueue.take(),
is(Event.TASK_IS_RUNNING));
+ assertTrue(eventQueue.isEmpty());
- CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
- CheckpointExceptionHandler checkpointExceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true,
mockEnvironment);
- Whitebox.setInternalState(streamTask,
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+ try {
+ streamTask.triggerCheckpoint(
+ new CheckpointMetaData(42L, 1L),
+
CheckpointOptions.forCheckpointWithDefaultLocation(),
+ false);
+
+ fail("Expected test exception here.");
+ } catch (Exception e) {
+ assertThat(e.getCause(), is(testException));
+ }
- StreamTask.AsyncCheckpointExceptionHandler
asyncCheckpointExceptionHandler =
- new
StreamTask.AsyncCheckpointExceptionHandler(streamTask);
- Whitebox.setInternalState(streamTask,
"asynchronousCheckpointExceptionHandler", asyncCheckpointExceptionHandler);
+ verify(operatorSnapshotResult1).cancel();
+ verify(operatorSnapshotResult2).cancel();
- try {
- streamTask.triggerCheckpoint(checkpointMetaData,
CheckpointOptions.forCheckpointWithDefaultLocation(), false);
- fail("Expected test exception here.");
- } catch (Exception e) {
- assertEquals(testException, e.getCause());
+ task.streamTask.setSourceFinished();
Review comment:
Why are you calling `setSourceFinished` only 2 times? In other places task
is failing exceptionally/being cancelled?
Also I think better name would be "input" not "source".
If so maybe rename the method to `finishInput()`? `finishTaskGracefully()`?
Or at least add a java doc explaining what this method is doing.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services