pnowojski commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568753925
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -184,13 +189,93 @@
@Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
+ @Test
+ public void testSyncSavepointCompleted() throws Exception {
+
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+ }
+
+ @Test
+ public void testSyncSavepointAborted() throws Exception {
+ testSyncSavepointWithEndInput(
+ (task, id) -> task.abortCheckpointOnBarrier(id, new
RuntimeException()), true);
+ }
Review comment:
I think this is a bit fishy, as it's encoding incorrect contract.
`abortCheckpointOnBarrier ` can not happen after `triggerCheckpointOnBarrier`
in this scenario. Maybe it's better to drop this test?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -165,7 +165,14 @@ protected void
processInput(MailboxDefaultAction.Controller controller) throws E
new
CancelTaskException(sourceThreadThrowable));
} else if (!isFinished && sourceThreadThrowable !=
null) {
mailboxProcessor.reportThrowable(sourceThreadThrowable);
+ } else if (sourceThreadThrowable != null
+ || isCanceled()
+ || isFinished) {
Review comment:
`isFinished` is a very confusing name in this context (to differentiate
whether to issue `endOfInput` or not).
`wasStoppedExternally`?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void
testTriggerSavepointWithCheckpointingDisabled() throws Exception {
}
}
+ static class BoundedPassThroughOperator<T> extends
AbstractStreamOperator<T>
+ implements OneInputStreamOperator<T, T>, BoundedOneInput {
+ static volatile CountDownLatch progressLatch;
+ static volatile CountDownLatch snapshotLatch;
+ static volatile boolean inputEnded;
+
+ private transient boolean processed;
+
+ BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+ this.chainingStrategy = chainingStrategy;
+ }
+
+ private static void allowSnapshots() {
+ snapshotLatch.countDown();
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ inputEnded = true;
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ output.collect(element);
+ if (!processed) {
+ processed = true;
+ progressLatch.countDown();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ snapshotLatch.await();
+ super.snapshotState(context);
+ }
+
+ // --------------------------------------------------------------------
+
+ static CountDownLatch getProgressLatch() {
+ return progressLatch;
+ }
+
+ static void resetForTest(int parallelism, boolean allowSnapshots) {
+ progressLatch = new CountDownLatch(parallelism);
+ snapshotLatch = new CountDownLatch(allowSnapshots ? 0 :
parallelism);
+ inputEnded = false;
+ }
+ }
+
+ @Test
+ public void testStopSavepointWithBoundedInputConcurrently() throws
Exception {
+ final int numTaskManagers = 2;
+ final int numSlotsPerTaskManager = 2;
+
+ while (true) {
+
+ final MiniClusterResourceFactory clusterFactory =
+ new MiniClusterResourceFactory(
+ numTaskManagers,
+ numSlotsPerTaskManager,
+ getFileBasedCheckpointsConfig());
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+ env.setParallelism(1);
+
+ // It's only possible to test this with chaining. Without it, JM
fails the job before
+ // the downstream gets the abort notification
+ BoundedPassThroughOperator<Integer> operator =
+ new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+ InfiniteTestSource source = new InfiniteTestSource();
+ DataStream<Integer> stream =
+ env.addSource(source)
+ .transform("pass-through",
BasicTypeInfo.INT_TYPE_INFO, operator);
+
+ stream.addSink(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ MiniClusterWithClientResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
+ try {
+ BoundedPassThroughOperator.resetForTest(1, false);
+ InfiniteTestSource.resetForTest();
+
+ client.submitJob(jobGraph).get();
+
+ BoundedPassThroughOperator.getProgressLatch().await();
+ InfiniteTestSource.suspendAll(); // prevent deadlock in
cancelAllAndAwait
+ CompletableFuture<String> stop =
client.stopWithSavepoint(jobId, false, null);
+ // await checkpoint start (not explicit signals to avoid
deadlocks)
+ Thread.sleep(500);
+ InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
Review comment:
Are you sure that this is the right way to test it? Why do we need this
`sleep(500)` and why we can not use some latches instead? For example counting
down latch in `BoundedPassThroughOperator#snapshotState`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]