pnowojski commented on a change in pull request #15820:
URL: https://github.com/apache/flink/pull/15820#discussion_r624872007
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1619,6 +1617,50 @@ public void
testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep
}
}
+ @Test
+ public void testCleanUpResourcesWhenFailingDuringInit() {
+ // given: Configured SourceStreamTask with source which fails during
initialization.
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ try {
+ // when: The task initializing(restoring).
+ builder.setupOutputForSingletonOperatorChain(new
InitFailOperator<>()).build();
+ fail("The task should fail during the restore");
+ } catch (Exception ignore) {
+ // ignore.
Review comment:
shouldn't we assert the exception here?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -2508,4 +2550,27 @@ public void awaitRunning() throws InterruptedException {
runningLatch.await();
}
}
+
+ static class InitFailOperator<T> extends AbstractStreamOperator<T>
Review comment:
optional nitty nit: maybe `OpenFailingOperator`?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1619,6 +1617,50 @@ public void
testTaskAvoidHangingAfterSnapshotStateThrownException() throws Excep
}
}
+ @Test
+ public void testCleanUpResourcesWhenFailingDuringInit() {
+ // given: Configured SourceStreamTask with source which fails during
initialization.
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ try {
+ // when: The task initializing(restoring).
+ builder.setupOutputForSingletonOperatorChain(new
InitFailOperator<>()).build();
+ fail("The task should fail during the restore");
+ } catch (Exception ignore) {
+ // ignore.
+ }
+
+ // then: The task should clean up all resources even when it failed on
init.
+ assertTrue(InitFailOperator.wasClosed);
+ }
+
+ @Test
+ public void testRethrowExceptionFromRestoreInsideOfInvoke() {
+ // given: Configured SourceStreamTask with source which fails during
initialization.
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(
+ OneInputStreamTask::new,
BasicTypeInfo.INT_TYPE_INFO)
+ .addInput(BasicTypeInfo.INT_TYPE_INFO);
+ try {
+ // when: The task invocation without preceded restoring.
+ StreamTaskMailboxTestHarness<Integer> harness =
+ builder.setupOutputForSingletonOperatorChain(new
InitFailOperator<>())
+ .buildUnrestored();
+
+ harness.streamTask.invoke();
+
+ fail("The task should fail during the restore");
+ } catch (Exception ex) {
+ // then: The task should rethrow exception from initialization.
+ assertThat(ex.getMessage(), is(INIT_FAILED_MESSAGE));
Review comment:
nit/question: I guess here you are freezing a contract that this
exception is never wrapped, right? Is this intentional? If not our usual
pattern is:
```
if (!ExceptionUtils.findThrowable(e,
ExpectedTestException.class).isPresent()) {
throw e;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]