This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b167ae9764c02d4c77a41f2afd056ed8fd5f04f
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Nov 30 14:47:36 2021 +0100

    [FLINK-23532] Remove unnecessary StreamTask#finishTask
---
 .../runtime/tasks/SourceOperatorStreamTask.java     |  5 -----
 .../streaming/runtime/tasks/SourceStreamTask.java   | 21 ++++-----------------
 .../flink/streaming/runtime/tasks/StreamTask.java   | 11 -----------
 .../jobmaster/JobMasterStopWithSavepointITCase.java | 13 -------------
 4 files changed, 4 insertions(+), 46 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index b6b8c6c..f0715bc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -108,11 +108,6 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
     }
 
     @Override
-    protected void finishTask() throws Exception {
-        mailboxProcessor.allActionsCompleted();
-    }
-
-    @Override
     public CompletableFuture<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!isExternallyInducedSource) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 1a94c19..e10934c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -202,32 +202,19 @@ public class SourceStreamTask<
     @Override
     protected void cancelTask() {
         if (stopped.compareAndSet(false, true)) {
-            cancelOperator(true);
+            cancelOperator();
         }
     }
 
-    @Override
-    protected void finishTask() {
-        this.finishingReason = FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN;
-        /**
-         * Currently stop with savepoint relies on the EndOfPartitionEvents 
propagation and performs
-         * clean shutdown after the stop with savepoint (which can produce 
some records to process
-         * after the savepoint while stopping). If we interrupt source thread, 
we might leave the
-         * network stack in an inconsistent state. So, if we want to relay on 
the clean shutdown, we
-         * can not interrupt the source thread.
-         */
-        cancelOperator(false);
-    }
-
-    private void cancelOperator(boolean interruptThread) {
+    private void cancelOperator() {
         try {
             if (mainOperator != null) {
                 mainOperator.cancel();
             }
         } finally {
-            if (sourceThread.isAlive() && interruptThread) {
+            if (sourceThread.isAlive()) {
                 interruptSourceThread();
-            } else if (!sourceThread.isAlive() && 
!sourceThread.getCompletionFuture().isDone()) {
+            } else if (!sourceThread.getCompletionFuture().isDone()) {
                 // sourceThread not alive and completion future not done means 
source thread
                 // didn't start and we need to manually complete the future
                 sourceThread.getCompletionFuture().complete(null);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 558a60f..1731214 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -609,17 +609,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
      */
     protected void advanceToEndOfEventTime() throws Exception {}
 
-    /**
-     * Instructs the task to go through its normal termination routine, i.e. 
exit the run-loop and
-     * call {@link StreamOperator#finish()} and {@link StreamOperator#close()} 
on its operators.
-     *
-     * <p>This is used by the source task to get out of the run-loop when the 
job is stopped with a
-     * savepoint.
-     *
-     * <p>For tasks other than the source task, this method does nothing.
-     */
-    protected void finishTask() throws Exception {}
-
     // ------------------------------------------------------------------------
     //  Core work methods of the Stream Task
     // ------------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
index 1ecdce8..c9a1cb9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
@@ -338,11 +338,6 @@ public class JobMasterStopWithSavepointITCase extends 
AbstractTestBase {
                 long checkpointId, long latestCompletedCheckpointId) {
             return CompletableFuture.completedFuture(null);
         }
-
-        @Override
-        protected void finishTask() {
-            mailboxProcessor.allActionsCompleted();
-        }
     }
 
     /** A {@link StreamTask} that simply waits to be terminated normally. */
@@ -364,14 +359,6 @@ public class JobMasterStopWithSavepointITCase extends 
AbstractTestBase {
                 mailboxProcessor.suspend();
             }
         }
-
-        @Override
-        public void finishTask() throws Exception {
-            finishingLatch.await();
-            if (suspension != null) {
-                suspension.resume();
-            }
-        }
     }
 
     /**

Reply via email to