akalash commented on a change in pull request #17187:
URL: https://github.com/apache/flink/pull/17187#discussion_r710184536
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1318,58 +1320,94 @@ public void testThreadInvariants() throws Throwable {
}
}
- /**
- * This test ensures that {@link RecordWriter} is correctly closed even if
we fail to construct
- * {@link OperatorChain}, for example because of user class
deserialization error.
- */
@Test
- public void
testRecordWriterClosedOnStreamOperatorFactoryDeserializationError()
+ public void testRecordWriterClosedOnTransitDeployingStateError() throws
Exception {
+ testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
+ }
+
+ @Test
+ public void testRecordWriterClosedOnTransitInitializingStateError() throws
Exception {
+ testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
+ }
+
+ @Test
+ public void testRecordWriterClosedOnTransitRunningStateError() throws
Exception {
+ testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
+ }
+
+ private void testRecordWriterClosedOnTransitStateError(ExecutionState
executionState)
+ throws Exception {
+ // Throw the exception when the state updating to the expected one.
+ NoOpTaskManagerActions taskManagerActions =
+ new NoOpTaskManagerActions() {
+ @Override
+ public void updateTaskExecutionState(TaskExecutionState
taskExecutionState) {
+ if (taskExecutionState.getExecutionState() ==
executionState) {
+ throw new ExpectedTestException();
+ }
+ }
+ };
+
+ testRecordWriterClosedOnError(
+ env ->
+ taskBuilderWithConfiguredRecordWriter(env)
+ .setTaskManagerActions(taskManagerActions)
+ .build());
+ }
+
+ private void testRecordWriterClosedOnError(
+ FunctionWithException<NettyShuffleEnvironment, Task, Exception>
taskProvider)
throws Exception {
+ try (NettyShuffleEnvironment shuffleEnvironment =
+ new NettyShuffleEnvironmentBuilder().build()) {
+ Task task = taskProvider.apply(shuffleEnvironment);
+
+ task.startTaskThread();
+ task.getExecutingThread().join();
+
+ assertEquals(ExecutionState.FAILED, task.getExecutionState());
+ for (Thread thread : Thread.getAllStackTraces().keySet()) {
+ assertThat(
+ thread.getName(),
+
CoreMatchers.is(not(containsString(DEFAULT_OUTPUT_FLUSH_THREAD_NAME))));
+ }
+ }
+ }
+
+ private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(
+ NettyShuffleEnvironment shuffleEnvironment) {
Configuration taskConfiguration = new Configuration();
+ outputEdgeConfiguration(taskConfiguration);
+
+ ResultPartitionDeploymentDescriptor descriptor =
+ new ResultPartitionDeploymentDescriptor(
+ PartitionDescriptorBuilder.newBuilder().build(),
+
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
+ 1,
+ false);
+ return new TestTaskBuilder(shuffleEnvironment)
+ .setInvokable(NoOpStreamTask.class)
+ .setTaskConfig(taskConfiguration)
+ .setResultPartitions(singletonList(descriptor));
+ }
+
+ /**
+ * Make sure that there is some output edge in the config so that some
RecordWriter is created.
+ */
+ private void outputEdgeConfiguration(Configuration taskConfiguration) {
StreamConfig streamConfig = new StreamConfig(taskConfiguration);
streamConfig.setStreamOperatorFactory(new UnusedOperatorFactory());
- // Make sure that there is some output edge in the config so that some
RecordWriter is
- // created
StreamConfigChainer cfg =
new StreamConfigChainer(new OperatorID(42, 42), streamConfig,
this, 1);
+ cfg.setBufferTimeout(1);
Review comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]