This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 54beaa3  [FLINK-21028][task] Do not interrupt the source thread on 
stop with savepoint
54beaa3 is described below

commit 54beaa3f891e6fabbd92693bbdc3d12d919b38c5
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Feb 19 17:47:40 2021 +0100

    [FLINK-21028][task] Do not interrupt the source thread on stop with 
savepoint
    
    Currently stop with savepoint relies on the EndOfPartitionEvents 
propagation and performs
    clean shutdown after the stop with savepoint (which can produce some 
records to process after
    the savepoint while stopping). If we interrupt source thread, we might 
leave the newtork stack
    in an inconsitent state. So, if we want to relay on the clean shutdown, we 
can not interrupt
    the source thread.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 27 ++++++--
 .../runtime/tasks/SourceStreamTaskTest.java        | 81 ++++++++++++++++++++++
 2 files changed, 101 insertions(+), 7 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 6c9bf23..53becae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -174,13 +174,32 @@ public class SourceStreamTask<
 
     @Override
     protected void cancelTask() {
+        cancelTask(true);
+    }
+
+    @Override
+    protected void finishTask() {
+        wasStoppedExternally = true;
+        /**
+         * Currently stop with savepoint relies on the EndOfPartitionEvents 
propagation and performs
+         * clean shutdown after the stop with savepoint (which can produce 
some records to process
+         * after the savepoint while stopping). If we interrupt source thread, 
we might leave the
+         * network stack in an inconsistent state. So, if we want to relay on 
the clean shutdown, we
+         * can not interrupt the source thread.
+         */
+        cancelTask(false);
+    }
+
+    private void cancelTask(boolean interrupt) {
         try {
             if (mainOperator != null) {
                 mainOperator.cancel();
             }
         } finally {
             if (sourceThread.isAlive()) {
-                sourceThread.interrupt();
+                if (interrupt) {
+                    sourceThread.interrupt();
+                }
             } else if (!sourceThread.getCompletionFuture().isDone()) {
                 // source thread didn't start
                 sourceThread.getCompletionFuture().complete(null);
@@ -189,12 +208,6 @@ public class SourceStreamTask<
     }
 
     @Override
-    protected void finishTask() throws Exception {
-        wasStoppedExternally = true;
-        cancelTask();
-    }
-
-    @Override
     protected CompletableFuture<Void> getCompletionFuture() {
         return sourceThread.getCompletionFuture();
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index fa95939..42f6d06 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -71,6 +72,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
@@ -569,6 +571,45 @@ public class SourceStreamTaskTest {
         harness.waitForTaskCompletion(Long.MAX_VALUE, true);
     }
 
+    @Test
+    public void testStopWithSavepointShouldNotInterruptTheSource() throws 
Exception {
+        long checkpointId = 1;
+        WasInterruptedTestingSource interruptedTestingSource = new 
WasInterruptedTestingSource();
+        try (StreamTaskMailboxTestHarness<String> harness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .setupOutputForSingletonOperatorChain(
+                                new StreamSource<>(interruptedTestingSource))
+                        .build()) {
+
+            harness.processAll();
+
+            Future<Boolean> triggerFuture =
+                    harness.streamTask.triggerCheckpointAsync(
+                            new CheckpointMetaData(checkpointId, 1),
+                            new CheckpointOptions(SYNC_SAVEPOINT, 
getDefault()),
+                            false);
+            while (!triggerFuture.isDone()) {
+                harness.streamTask.runMailboxStep();
+            }
+            triggerFuture.get();
+
+            Future<Void> notifyFuture =
+                    
harness.streamTask.notifyCheckpointCompleteAsync(checkpointId);
+            while (!notifyFuture.isDone()) {
+                harness.streamTask.runMailboxStep();
+            }
+            notifyFuture.get();
+
+            WasInterruptedTestingSource.allowExit();
+
+            harness.waitForTaskCompletion();
+            harness.finishProcessing();
+
+            assertTrue(notifyFuture.isDone());
+            assertFalse(interruptedTestingSource.wasInterrupted());
+        }
+    }
+
     private static class MockSource
             implements SourceFunction<Tuple2<Long, Integer>>, 
ListCheckpointed<Serializable> {
         private static final long serialVersionUID = 1;
@@ -854,4 +895,44 @@ public class SourceStreamTaskTest {
             output.collect(new StreamRecord<>(record));
         }
     }
+
+    /**
+     * This source sleeps a little bit before processing cancellation and 
records whether it was
+     * interrupted by the {@link SourceStreamTask} or not.
+     */
+    private static class WasInterruptedTestingSource implements 
SourceFunction<String> {
+        private static final long serialVersionUID = 1L;
+
+        private static final OneShotLatch ALLOW_EXIT = new OneShotLatch();
+        private static final AtomicBoolean WAS_INTERRUPTED = new 
AtomicBoolean();
+
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            ALLOW_EXIT.reset();
+            WAS_INTERRUPTED.set(false);
+
+            try {
+                while (running || !ALLOW_EXIT.isTriggered()) {
+                    Thread.sleep(1);
+                }
+            } catch (InterruptedException e) {
+                WAS_INTERRUPTED.set(true);
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+
+        public static boolean wasInterrupted() {
+            return WAS_INTERRUPTED.get();
+        }
+
+        public static void allowExit() {
+            ALLOW_EXIT.trigger();
+        }
+    }
 }

Reply via email to