akalash commented on a change in pull request #17187:
URL: https://github.com/apache/flink/pull/17187#discussion_r710184536



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1318,58 +1320,94 @@ public void testThreadInvariants() throws Throwable {
         }
     }
 
-    /**
-     * This test ensures that {@link RecordWriter} is correctly closed even if 
we fail to construct
-     * {@link OperatorChain}, for example because of user class 
deserialization error.
-     */
     @Test
-    public void 
testRecordWriterClosedOnStreamOperatorFactoryDeserializationError()
+    public void testRecordWriterClosedOnTransitDeployingStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitInitializingStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
+    }
+
+    @Test
+    public void testRecordWriterClosedOnTransitRunningStateError() throws 
Exception {
+        testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
+    }
+
+    private void testRecordWriterClosedOnTransitStateError(ExecutionState 
executionState)
+            throws Exception {
+        // Throw the exception when the state updating to the expected one.
+        NoOpTaskManagerActions taskManagerActions =
+                new NoOpTaskManagerActions() {
+                    @Override
+                    public void updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
+                        if (taskExecutionState.getExecutionState() == 
executionState) {
+                            throw new ExpectedTestException();
+                        }
+                    }
+                };
+
+        testRecordWriterClosedOnError(
+                env ->
+                        taskBuilderWithConfiguredRecordWriter(env)
+                                .setTaskManagerActions(taskManagerActions)
+                                .build());
+    }
+
+    private void testRecordWriterClosedOnError(
+            FunctionWithException<NettyShuffleEnvironment, Task, Exception> 
taskProvider)
             throws Exception {
+        try (NettyShuffleEnvironment shuffleEnvironment =
+                new NettyShuffleEnvironmentBuilder().build()) {
+            Task task = taskProvider.apply(shuffleEnvironment);
+
+            task.startTaskThread();
+            task.getExecutingThread().join();
+
+            assertEquals(ExecutionState.FAILED, task.getExecutionState());
+            for (Thread thread : Thread.getAllStackTraces().keySet()) {
+                assertThat(
+                        thread.getName(),
+                        
CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME))));
+            }
+        }
+    }
+
+    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(
+            NettyShuffleEnvironment shuffleEnvironment) {
         Configuration taskConfiguration = new Configuration();
+        outputEdgeConfiguration(taskConfiguration);
+
+        ResultPartitionDeploymentDescriptor descriptor =
+                new ResultPartitionDeploymentDescriptor(
+                        PartitionDescriptorBuilder.newBuilder().build(),
+                        
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+                        1,
+                        false);
+        return new TestTaskBuilder(shuffleEnvironment)
+                .setInvokable(NoOpStreamTask.class)
+                .setTaskConfig(taskConfiguration)
+                .setResultPartitions(singletonList(descriptor));
+    }
+
+    /**
+     * Make sure that there is some output edge in the config so that some 
RecordWriter is created.
+     */
+    private void outputEdgeConfiguration(Configuration taskConfiguration) {
         StreamConfig streamConfig = new StreamConfig(taskConfiguration);
         streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
 
-        // Make sure that there is some output edge in the config so that some 
RecordWriter is
-        // created
         StreamConfigChainer cfg =
                 new StreamConfigChainer(new OperatorID(42, 42), streamConfig, 
this, 1);
+        cfg.setBufferTimeout(1);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to