This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2b167ae9764c02d4c77a41f2afd056ed8fd5f04f Author: Dawid Wysakowicz <[email protected]> AuthorDate: Tue Nov 30 14:47:36 2021 +0100 [FLINK-23532] Remove unnecessary StreamTask#finishTask --- .../runtime/tasks/SourceOperatorStreamTask.java | 5 ----- .../streaming/runtime/tasks/SourceStreamTask.java | 21 ++++----------------- .../flink/streaming/runtime/tasks/StreamTask.java | 11 ----------- .../jobmaster/JobMasterStopWithSavepointITCase.java | 13 ------------- 4 files changed, 4 insertions(+), 46 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index b6b8c6c..f0715bc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -108,11 +108,6 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, } @Override - protected void finishTask() throws Exception { - mailboxProcessor.allActionsCompleted(); - } - - @Override public CompletableFuture<Boolean> triggerCheckpointAsync( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) { if (!isExternallyInducedSource) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 1a94c19..e10934c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -202,32 +202,19 @@ public class SourceStreamTask< @Override protected void cancelTask() { if (stopped.compareAndSet(false, true)) { - cancelOperator(true); + cancelOperator(); } } - @Override - protected void finishTask() { - this.finishingReason = FinishingReason.STOP_WITH_SAVEPOINT_NO_DRAIN; - /** - * Currently stop with savepoint relies on the EndOfPartitionEvents propagation and performs - * clean shutdown after the stop with savepoint (which can produce some records to process - * after the savepoint while stopping). If we interrupt source thread, we might leave the - * network stack in an inconsistent state. So, if we want to relay on the clean shutdown, we - * can not interrupt the source thread. - */ - cancelOperator(false); - } - - private void cancelOperator(boolean interruptThread) { + private void cancelOperator() { try { if (mainOperator != null) { mainOperator.cancel(); } } finally { - if (sourceThread.isAlive() && interruptThread) { + if (sourceThread.isAlive()) { interruptSourceThread(); - } else if (!sourceThread.isAlive() && !sourceThread.getCompletionFuture().isDone()) { + } else if (!sourceThread.getCompletionFuture().isDone()) { // sourceThread not alive and completion future not done means source thread // didn't start and we need to manually complete the future sourceThread.getCompletionFuture().complete(null); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 558a60f..1731214 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -609,17 +609,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> */ protected void advanceToEndOfEventTime() throws Exception {} - /** - * Instructs the task to go through its normal termination routine, i.e. exit the run-loop and - * call {@link StreamOperator#finish()} and {@link StreamOperator#close()} on its operators. - * - * <p>This is used by the source task to get out of the run-loop when the job is stopped with a - * savepoint. - * - * <p>For tasks other than the source task, this method does nothing. - */ - protected void finishTask() throws Exception {} - // ------------------------------------------------------------------------ // Core work methods of the Stream Task // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java index 1ecdce8..c9a1cb9 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java @@ -338,11 +338,6 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase { long checkpointId, long latestCompletedCheckpointId) { return CompletableFuture.completedFuture(null); } - - @Override - protected void finishTask() { - mailboxProcessor.allActionsCompleted(); - } } /** A {@link StreamTask} that simply waits to be terminated normally. */ @@ -364,14 +359,6 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase { mailboxProcessor.suspend(); } } - - @Override - public void finishTask() throws Exception { - finishingLatch.await(); - if (suspension != null) { - suspension.resume(); - } - } } /**
