pnowojski commented on a change in pull request #8858: [hotfix][tests] Change
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297132959
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -311,71 +310,45 @@ public void testCancellationFailsWithBlockingLock()
throws Exception {
@Test
public void testFailingCheckpointStreamOperator() throws Exception {
- final long checkpointId = 42L;
- final long timestamp = 1L;
-
- TaskInfo mockTaskInfo = mock(TaskInfo.class);
-
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
- when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
- Environment mockEnvironment = new
MockEnvironmentBuilder().build();
-
- StreamTask<?, ?> streamTask = new
EmptyStreamTask(mockEnvironment);
- CheckpointMetaData checkpointMetaData = new
CheckpointMetaData(checkpointId, timestamp);
-
- // mock the operators
- StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
- StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
- StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
// mock the returned snapshots
OperatorSnapshotFutures operatorSnapshotResult1 =
mock(OperatorSnapshotFutures.class);
OperatorSnapshotFutures operatorSnapshotResult2 =
mock(OperatorSnapshotFutures.class);
final Exception testException = new Exception("Test exception");
- when(streamOperator1.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class),
any(CheckpointStreamFactory.class))).thenThrow(testException);
-
- OperatorID operatorID1 = new OperatorID();
- OperatorID operatorID2 = new OperatorID();
- OperatorID operatorID3 = new OperatorID();
- when(streamOperator1.getOperatorID()).thenReturn(operatorID1);
- when(streamOperator2.getOperatorID()).thenReturn(operatorID2);
- when(streamOperator3.getOperatorID()).thenReturn(operatorID3);
-
- // set up the task
-
- StreamOperator<?>[] streamOperators = {streamOperator1,
streamOperator2, streamOperator3};
+ LinkedBlockingQueue<Event> eventQueue = new
LinkedBlockingQueue<>();
- OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain
= mock(OperatorChain.class);
-
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+ try (MockEnvironment mockEnvironment = new
MockEnvironmentBuilder().build()) {
+ RunningTask<EmptyStreamTask> task = runTask(() -> new
EmptyStreamTask(
+ mockEnvironment,
+ eventQueue,
+ operatorChain(
+
streamOperatorWithSnapshot(operatorSnapshotResult1),
+
streamOperatorWithSnapshot(operatorSnapshotResult2),
+
streamOperatorWithSnapshotException(testException))));
+ EmptyStreamTask streamTask = task.streamTask;
- Whitebox.setInternalState(streamTask, "isRunning", true);
- Whitebox.setInternalState(streamTask, "lock", new Object());
- Whitebox.setInternalState(streamTask, "operatorChain",
operatorChain);
- Whitebox.setInternalState(streamTask, "cancelables", new
CloseableRegistry());
- Whitebox.setInternalState(streamTask, "configuration", new
StreamConfig(new Configuration()));
- Whitebox.setInternalState(streamTask, "checkpointStorage", new
MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
+ assertThat(eventQueue.take(),
is(Event.TASK_IS_RUNNING));
+ assertTrue(eventQueue.isEmpty());
- CheckpointExceptionHandlerFactory
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
- CheckpointExceptionHandler checkpointExceptionHandler =
-
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true,
mockEnvironment);
- Whitebox.setInternalState(streamTask,
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+ try {
+ streamTask.triggerCheckpoint(
+ new CheckpointMetaData(42L, 1L),
+
CheckpointOptions.forCheckpointWithDefaultLocation(),
+ false);
+
+ fail("Expected test exception here.");
+ } catch (Exception e) {
+ assertThat(e.getCause(), is(testException));
Review comment:
(optional) maybe ditto about `ExceptionUtils.findCause`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services