pnowojski commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568753925



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -184,13 +189,93 @@
 
     @Rule public final Timeout timeoutPerTest = Timeout.seconds(30);
 
+    @Test
+    public void testSyncSavepointCompleted() throws Exception {
+        
testSyncSavepointWithEndInput(StreamTask::notifyCheckpointCompleteAsync, false);
+    }
+
+    @Test
+    public void testSyncSavepointAborted() throws Exception {
+        testSyncSavepointWithEndInput(
+                (task, id) -> task.abortCheckpointOnBarrier(id, new 
RuntimeException()), true);
+    }

Review comment:
       I think this is a bit fishy, as it's encoding incorrect contract. 
`abortCheckpointOnBarrier ` can not happen after `triggerCheckpointOnBarrier` 
in this scenario.  Maybe it's better to drop this test?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -165,7 +165,14 @@ protected void 
processInput(MailboxDefaultAction.Controller controller) throws E
                                         new 
CancelTaskException(sourceThreadThrowable));
                             } else if (!isFinished && sourceThreadThrowable != 
null) {
                                 
mailboxProcessor.reportThrowable(sourceThreadThrowable);
+                            } else if (sourceThreadThrowable != null
+                                    || isCanceled()
+                                    || isFinished) {

Review comment:
       `isFinished` is a very confusing name in this context (to differentiate 
whether to issue `endOfInput` or not).
   
   `wasStoppedExternally`? 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void 
testTriggerSavepointWithCheckpointingDisabled() throws Exception {
         }
     }
 
+    static class BoundedPassThroughOperator<T> extends 
AbstractStreamOperator<T>
+            implements OneInputStreamOperator<T, T>, BoundedOneInput {
+        static volatile CountDownLatch progressLatch;
+        static volatile CountDownLatch snapshotLatch;
+        static volatile boolean inputEnded;
+
+        private transient boolean processed;
+
+        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+            this.chainingStrategy = chainingStrategy;
+        }
+
+        private static void allowSnapshots() {
+            snapshotLatch.countDown();
+        }
+
+        @Override
+        public void endInput() throws Exception {
+            inputEnded = true;
+        }
+
+        @Override
+        public void processElement(StreamRecord<T> element) throws Exception {
+            output.collect(element);
+            if (!processed) {
+                processed = true;
+                progressLatch.countDown();
+            }
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            snapshotLatch.await();
+            super.snapshotState(context);
+        }
+
+        // --------------------------------------------------------------------
+
+        static CountDownLatch getProgressLatch() {
+            return progressLatch;
+        }
+
+        static void resetForTest(int parallelism, boolean allowSnapshots) {
+            progressLatch = new CountDownLatch(parallelism);
+            snapshotLatch = new CountDownLatch(allowSnapshots ? 0 : 
parallelism);
+            inputEnded = false;
+        }
+    }
+
+    @Test
+    public void testStopSavepointWithBoundedInputConcurrently() throws 
Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        while (true) {
+
+            final MiniClusterResourceFactory clusterFactory =
+                    new MiniClusterResourceFactory(
+                            numTaskManagers,
+                            numSlotsPerTaskManager,
+                            getFileBasedCheckpointsConfig());
+
+            StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+            env.setParallelism(1);
+
+            // It's only possible to test this with chaining. Without it, JM 
fails the job before
+            // the downstream gets the abort notification
+            BoundedPassThroughOperator<Integer> operator =
+                    new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+            InfiniteTestSource source = new InfiniteTestSource();
+            DataStream<Integer> stream =
+                    env.addSource(source)
+                            .transform("pass-through", 
BasicTypeInfo.INT_TYPE_INFO, operator);
+
+            stream.addSink(new DiscardingSink<>());
+
+            final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+            final JobID jobId = jobGraph.getJobID();
+
+            MiniClusterWithClientResource cluster = clusterFactory.get();
+            cluster.before();
+            ClusterClient<?> client = cluster.getClusterClient();
+
+            try {
+                BoundedPassThroughOperator.resetForTest(1, false);
+                InfiniteTestSource.resetForTest();
+
+                client.submitJob(jobGraph).get();
+
+                BoundedPassThroughOperator.getProgressLatch().await();
+                InfiniteTestSource.suspendAll(); // prevent deadlock in 
cancelAllAndAwait
+                CompletableFuture<String> stop = 
client.stopWithSavepoint(jobId, false, null);
+                // await checkpoint start (not explicit signals to avoid 
deadlocks)
+                Thread.sleep(500);
+                InfiniteTestSource.cancelAllAndAwait(); // emulate end of input

Review comment:
       Are you sure that this is the right way to test it? Why do we need this 
`sleep(500)` and why we can not use some latches instead? For example counting 
down latch in `BoundedPassThroughOperator#snapshotState`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to