pnowojski commented on a change in pull request #8858: [hotfix][tests] Change 
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297132959
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 ##########
 @@ -311,71 +310,45 @@ public void testCancellationFailsWithBlockingLock() 
throws Exception {
 
        @Test
        public void testFailingCheckpointStreamOperator() throws Exception {
-               final long checkpointId = 42L;
-               final long timestamp = 1L;
-
-               TaskInfo mockTaskInfo = mock(TaskInfo.class);
-               
when(mockTaskInfo.getTaskNameWithSubtasks()).thenReturn("foobar");
-               when(mockTaskInfo.getIndexOfThisSubtask()).thenReturn(0);
-               Environment mockEnvironment = new 
MockEnvironmentBuilder().build();
-
-               StreamTask<?, ?> streamTask = new 
EmptyStreamTask(mockEnvironment);
-               CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
-
-               // mock the operators
-               StreamOperator<?> streamOperator1 = mock(StreamOperator.class);
-               StreamOperator<?> streamOperator2 = mock(StreamOperator.class);
-               StreamOperator<?> streamOperator3 = mock(StreamOperator.class);
 
                // mock the returned snapshots
                OperatorSnapshotFutures operatorSnapshotResult1 = 
mock(OperatorSnapshotFutures.class);
                OperatorSnapshotFutures operatorSnapshotResult2 = 
mock(OperatorSnapshotFutures.class);
 
                final Exception testException = new Exception("Test exception");
 
-               when(streamOperator1.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
-               when(streamOperator2.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), 
any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
-               when(streamOperator3.snapshotState(anyLong(), anyLong(), 
any(CheckpointOptions.class), 
any(CheckpointStreamFactory.class))).thenThrow(testException);
-
-               OperatorID operatorID1 = new OperatorID();
-               OperatorID operatorID2 = new OperatorID();
-               OperatorID operatorID3 = new OperatorID();
-               when(streamOperator1.getOperatorID()).thenReturn(operatorID1);
-               when(streamOperator2.getOperatorID()).thenReturn(operatorID2);
-               when(streamOperator3.getOperatorID()).thenReturn(operatorID3);
-
-               // set up the task
-
-               StreamOperator<?>[] streamOperators = {streamOperator1, 
streamOperator2, streamOperator3};
+               LinkedBlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<>();
 
-               OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain 
= mock(OperatorChain.class);
-               
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+               try (MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build()) {
+                       RunningTask<EmptyStreamTask> task = runTask(() -> new 
EmptyStreamTask(
+                               mockEnvironment,
+                               eventQueue,
+                               operatorChain(
+                                       
streamOperatorWithSnapshot(operatorSnapshotResult1),
+                                       
streamOperatorWithSnapshot(operatorSnapshotResult2),
+                                       
streamOperatorWithSnapshotException(testException))));
+                       EmptyStreamTask streamTask = task.streamTask;
 
-               Whitebox.setInternalState(streamTask, "isRunning", true);
-               Whitebox.setInternalState(streamTask, "lock", new Object());
-               Whitebox.setInternalState(streamTask, "operatorChain", 
operatorChain);
-               Whitebox.setInternalState(streamTask, "cancelables", new 
CloseableRegistry());
-               Whitebox.setInternalState(streamTask, "configuration", new 
StreamConfig(new Configuration()));
-               Whitebox.setInternalState(streamTask, "checkpointStorage", new 
MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
+                       assertThat(eventQueue.take(), 
is(Event.TASK_IS_RUNNING));
+                       assertTrue(eventQueue.isEmpty());
 
-               CheckpointExceptionHandlerFactory 
checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
-               CheckpointExceptionHandler checkpointExceptionHandler =
-                       
checkpointExceptionHandlerFactory.createCheckpointExceptionHandler(true, 
mockEnvironment);
-               Whitebox.setInternalState(streamTask, 
"synchronousCheckpointExceptionHandler", checkpointExceptionHandler);
+                       try {
+                               streamTask.triggerCheckpoint(
+                                       new CheckpointMetaData(42L, 1L),
+                                       
CheckpointOptions.forCheckpointWithDefaultLocation(),
+                                       false);
+
+                               fail("Expected test exception here.");
+                       } catch (Exception e) {
+                               assertThat(e.getCause(), is(testException));
 
 Review comment:
   (optional) maybe ditto about `ExceptionUtils.findCause`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to