This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new ab196a6  [FLINK-24269][Runtime / Checkpointing] Rename methods around 
final checkpoints
ab196a6 is described below

commit ab196a69b4a8ae31f0fb36996602b0afc8f12059
Author: liliwei <[email protected]>
AuthorDate: Fri Sep 17 23:17:21 2021 +0800

    [FLINK-24269][Runtime / Checkpointing] Rename methods around final 
checkpoints
    
    This closes #17316
---
 .../api/runtime/SavepointTaskStateManager.java     |  2 +-
 .../runtime/checkpoint/PendingCheckpoint.java      |  9 +++--
 .../runtime/checkpoint/TaskStateSnapshot.java      | 40 +++++++++++-----------
 .../flink/runtime/state/TaskStateManager.java      |  2 +-
 .../flink/runtime/state/TaskStateManagerImpl.java  |  4 +--
 .../CheckpointCoordinatorRestoringTest.java        |  2 +-
 .../CheckpointCoordinatorTestingUtils.java         |  4 +--
 .../checkpoint/StateAssignmentOperationTest.java   |  4 +--
 .../runtime/state/TaskStateManagerImplTest.java    |  2 +-
 .../flink/runtime/state/TestTaskStateManager.java  |  4 +--
 .../runtime/io/StreamTwoInputProcessorFactory.java |  2 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java     | 18 +++++-----
 .../runtime/tasks/FinishedOperatorChain.java       |  4 +--
 .../streaming/runtime/tasks/OperatorChain.java     |  6 ++--
 .../runtime/tasks/RegularOperatorChain.java        |  2 +-
 .../runtime/tasks/SourceOperatorStreamTask.java    |  2 +-
 .../streaming/runtime/tasks/SourceStreamTask.java  |  4 +--
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../tasks/SubtaskCheckpointCoordinator.java        |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 14 ++++----
 .../runtime/tasks/AsyncCheckpointRunnableTest.java |  4 +--
 .../tasks/StreamTaskFinalCheckpointsTest.java      |  4 +--
 .../tasks/TestSubtaskCheckpointCoordinator.java    |  2 +-
 23 files changed, 69 insertions(+), 70 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index ac4db13..b51a1ec 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -63,7 +63,7 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
             CheckpointMetaData checkpointMetaData, CheckpointMetrics 
checkpointMetrics) {}
 
     @Override
-    public boolean isFinishedOnRestore() {
+    public boolean isTaskDeployedAsFinished() {
         return false;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index be7fc5e..f64af16 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -399,16 +399,15 @@ public class PendingCheckpoint implements Checkpoint {
             }
 
             long ackTimestamp = System.currentTimeMillis();
-            if (operatorSubtaskStates != null && 
operatorSubtaskStates.isFinishedOnRestore()) {
+            if (operatorSubtaskStates != null && 
operatorSubtaskStates.isTaskDeployedAsFinished()) {
                 checkpointPlan.reportTaskFinishedOnRestore(vertex);
             } else {
                 List<OperatorIDPair> operatorIDs = 
vertex.getJobVertex().getOperatorIDs();
                 for (OperatorIDPair operatorID : operatorIDs) {
-                    updateNonFinishedOnRestoreOperatorState(
-                            vertex, operatorSubtaskStates, operatorID);
+                    updateOperatorState(vertex, operatorSubtaskStates, 
operatorID);
                 }
 
-                if (operatorSubtaskStates != null && 
operatorSubtaskStates.isOperatorsFinished()) {
+                if (operatorSubtaskStates != null && 
operatorSubtaskStates.isTaskFinished()) {
                     checkpointPlan.reportTaskHasFinishedOperators(vertex);
                 }
             }
@@ -454,7 +453,7 @@ public class PendingCheckpoint implements Checkpoint {
         }
     }
 
-    private void updateNonFinishedOnRestoreOperatorState(
+    private void updateOperatorState(
             ExecutionVertex vertex,
             TaskStateSnapshot operatorSubtaskStates,
             OperatorIDPair operatorID) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
index aa65c98..aa31dc9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
@@ -63,16 +63,16 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
     /** Mapping from an operator id to the state of one subtask of this 
operator. */
     private final Map<OperatorID, OperatorSubtaskState> 
subtaskStatesByOperatorID;
 
-    private final boolean isFinishedOnRestore;
+    private final boolean isTaskDeployedAsFinished;
 
-    private final boolean isOperatorsFinished;
+    private final boolean isTaskFinished;
 
     public TaskStateSnapshot() {
         this(10, false);
     }
 
-    public TaskStateSnapshot(int size, boolean isOperatorsFinished) {
-        this(new HashMap<>(size), false, isOperatorsFinished);
+    public TaskStateSnapshot(int size, boolean isTaskFinished) {
+        this(new HashMap<>(size), false, isTaskFinished);
     }
 
     public TaskStateSnapshot(Map<OperatorID, OperatorSubtaskState> 
subtaskStatesByOperatorID) {
@@ -81,21 +81,21 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
 
     private TaskStateSnapshot(
             Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID,
-            boolean isFinishedOnRestore,
-            boolean isOperatorsFinished) {
+            boolean isTaskDeployedAsFinished,
+            boolean isTaskFinished) {
         this.subtaskStatesByOperatorID = 
Preconditions.checkNotNull(subtaskStatesByOperatorID);
-        this.isFinishedOnRestore = isFinishedOnRestore;
-        this.isOperatorsFinished = isOperatorsFinished;
+        this.isTaskDeployedAsFinished = isTaskDeployedAsFinished;
+        this.isTaskFinished = isTaskFinished;
     }
 
     /** Returns whether all the operators of the task are already finished on 
restoring. */
-    public boolean isFinishedOnRestore() {
-        return isFinishedOnRestore;
+    public boolean isTaskDeployedAsFinished() {
+        return isTaskDeployedAsFinished;
     }
 
     /** Returns whether all the operators of the task have called finished 
methods. */
-    public boolean isOperatorsFinished() {
-        return isOperatorsFinished;
+    public boolean isTaskFinished() {
+        return isTaskFinished;
     }
 
     /** Returns the subtask state for the given operator id (or null if not 
contained). */
@@ -129,7 +129,7 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
                 return true;
             }
         }
-        return isFinishedOnRestore;
+        return isTaskDeployedAsFinished;
     }
 
     /**
@@ -187,13 +187,13 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
         TaskStateSnapshot that = (TaskStateSnapshot) o;
 
         return subtaskStatesByOperatorID.equals(that.subtaskStatesByOperatorID)
-                && isFinishedOnRestore == that.isFinishedOnRestore
-                && isOperatorsFinished == that.isOperatorsFinished;
+                && isTaskDeployedAsFinished == that.isTaskDeployedAsFinished
+                && isTaskFinished == that.isTaskFinished;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(subtaskStatesByOperatorID, isFinishedOnRestore, 
isOperatorsFinished);
+        return Objects.hash(subtaskStatesByOperatorID, 
isTaskDeployedAsFinished, isTaskFinished);
     }
 
     @Override
@@ -201,10 +201,10 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
         return "TaskOperatorSubtaskStates{"
                 + "subtaskStatesByOperatorID="
                 + subtaskStatesByOperatorID
-                + ", isFinished="
-                + isFinishedOnRestore
-                + ", isOperatorsFinished="
-                + isOperatorsFinished
+                + ", isTaskDeployedAsFinished="
+                + isTaskDeployedAsFinished
+                + ", isTaskFinished="
+                + isTaskFinished
                 + '}';
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index f4dadf9..6004110 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -74,7 +74,7 @@ public interface TaskStateManager extends CheckpointListener, 
AutoCloseable {
             CheckpointMetaData checkpointMetaData, CheckpointMetrics 
checkpointMetrics);
 
     /** Whether all the operators of the task are finished on restore. */
-    boolean isFinishedOnRestore();
+    boolean isTaskDeployedAsFinished();
 
     /** Acquires the checkpoint id to restore from. */
     Optional<Long> getRestoreCheckpointId();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 1fbb81d..2d14d37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -156,12 +156,12 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
         return 
jobManagerTaskRestore.getTaskStateSnapshot().getOutputRescalingDescriptor();
     }
 
-    public boolean isFinishedOnRestore() {
+    public boolean isTaskDeployedAsFinished() {
         if (jobManagerTaskRestore == null) {
             return false;
         }
 
-        return 
jobManagerTaskRestore.getTaskStateSnapshot().isFinishedOnRestore();
+        return 
jobManagerTaskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished();
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 51579a0..398162d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -1092,7 +1092,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                         .getCurrentExecutionAttempt()
                         .getTaskRestore()
                         .getTaskStateSnapshot();
-        assertTrue(restoredState.isFinishedOnRestore());
+        assertTrue(restoredState.isTaskDeployedAsFinished());
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 35a2e72..a178a4c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -424,8 +424,8 @@ public class CheckpointCoordinatorTestingUtils {
     }
 
     public static TaskStateSnapshot createSnapshotWithUnionListState(
-            File stateFile, OperatorID operatorId, boolean 
isOperatorsFinished) throws IOException {
-        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(1, 
isOperatorsFinished);
+            File stateFile, OperatorID operatorId, boolean isTaskFinished) 
throws IOException {
+        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(1, 
isTaskFinished);
         taskStateSnapshot.putSubtaskStateByOperatorID(
                 operatorId, createSubtaskStateWithUnionListState(stateFile));
         return taskStateSnapshot;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 17f8b2a..629150c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -756,14 +756,14 @@ public class StateAssignmentOperationTest extends 
TestLogger {
         ExecutionJobVertex jobVertexWithFinishedOperator = 
vertices.get(operatorIds.get(0));
         for (ExecutionVertex task : 
jobVertexWithFinishedOperator.getTaskVertices()) {
             JobManagerTaskRestore taskRestore = 
task.getCurrentExecutionAttempt().getTaskRestore();
-            
Assert.assertTrue(taskRestore.getTaskStateSnapshot().isFinishedOnRestore());
+            
Assert.assertTrue(taskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished());
         }
 
         // Check the job vertex without finished operator.
         ExecutionJobVertex jobVertexWithoutFinishedOperator = 
vertices.get(operatorIds.get(1));
         for (ExecutionVertex task : 
jobVertexWithoutFinishedOperator.getTaskVertices()) {
             JobManagerTaskRestore taskRestore = 
task.getCurrentExecutionAttempt().getTaskRestore();
-            
Assert.assertFalse(taskRestore.getTaskStateSnapshot().isFinishedOnRestore());
+            
Assert.assertFalse(taskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index ed0a707..c487ea9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -281,7 +281,7 @@ public class TaskStateManagerImplTest extends TestLogger {
                         null,
                         jobManagerTaskRestore,
                         new TestCheckpointResponder());
-        Assert.assertTrue(stateManager.isFinishedOnRestore());
+        Assert.assertTrue(stateManager.isTaskDeployedAsFinished());
     }
 
     public void testAcquringRestoreCheckpointId() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 22a6882..e75d4fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -147,10 +147,10 @@ public class TestTaskStateManager implements 
TaskStateManager {
     }
 
     @Override
-    public boolean isFinishedOnRestore() {
+    public boolean isTaskDeployedAsFinished() {
         TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
         if (jmTaskStateSnapshot != null) {
-            return jmTaskStateSnapshot.isFinishedOnRestore();
+            return jmTaskStateSnapshot.isTaskDeployedAsFinished();
         }
 
         return false;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 199f4b6..8b4330a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -174,7 +174,7 @@ public class StreamTwoInputProcessorFactory {
 
         @Nullable
         FinishedOnRestoreWatermarkBypass watermarkBypass =
-                operatorChain.isFinishedOnRestore()
+                operatorChain.isTaskDeployedAsFinished()
                         ? new 
FinishedOnRestoreWatermarkBypass(operatorChain.getStreamOutputs())
                         : null;
         StreamTaskNetworkOutput<IN1> output1 =
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index c077914..0381f62 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -52,8 +52,8 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
     public static final Logger LOG = 
LoggerFactory.getLogger(AsyncCheckpointRunnable.class);
     private final String taskName;
     private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
-    private final boolean isFinishedOnRestore;
-    private final boolean isOperatorsFinished;
+    private final boolean isTaskDeployedAsFinished;
+    private final boolean isTaskFinished;
     private final Supplier<Boolean> isTaskRunning;
     private final Environment taskEnvironment;
     private final CompletableFuture<Void> finishedFuture = new 
CompletableFuture<>();
@@ -85,8 +85,8 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
             Consumer<AsyncCheckpointRunnable> unregister,
             Environment taskEnvironment,
             AsyncExceptionHandler asyncExceptionHandler,
-            boolean isFinishedOnRestore,
-            boolean isOperatorsFinished,
+            boolean isTaskDeployedAsFinished,
+            boolean isTaskFinished,
             Supplier<Boolean> isTaskRunning) {
 
         this.operatorSnapshotsInProgress = 
checkNotNull(operatorSnapshotsInProgress);
@@ -97,8 +97,8 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
         this.unregisterConsumer = unregister;
         this.taskEnvironment = checkNotNull(taskEnvironment);
         this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
-        this.isFinishedOnRestore = isFinishedOnRestore;
-        this.isOperatorsFinished = isOperatorsFinished;
+        this.isTaskDeployedAsFinished = isTaskDeployedAsFinished;
+        this.isTaskFinished = isTaskFinished;
         this.isTaskRunning = isTaskRunning;
     }
 
@@ -116,7 +116,7 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
         try {
 
             SnapshotsFinalizeResult snapshotsFinalizeResult =
-                    isFinishedOnRestore
+                    isTaskDeployedAsFinished
                             ? new SnapshotsFinalizeResult(
                                     TaskStateSnapshot.FINISHED_ON_RESTORE,
                                     TaskStateSnapshot.FINISHED_ON_RESTORE,
@@ -162,9 +162,9 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
 
     private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws 
Exception {
         TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
-                new TaskStateSnapshot(operatorSnapshotsInProgress.size(), 
isOperatorsFinished);
+                new TaskStateSnapshot(operatorSnapshotsInProgress.size(), 
isTaskFinished);
         TaskStateSnapshot localTaskOperatorSubtaskStates =
-                new TaskStateSnapshot(operatorSnapshotsInProgress.size(), 
isOperatorsFinished);
+                new TaskStateSnapshot(operatorSnapshotsInProgress.size(), 
isTaskFinished);
 
         long bytesPersistedDuringAlignment = 0;
         for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry :
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
index 53dd7e8..f22086f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
@@ -38,7 +38,7 @@ import java.util.function.Supplier;
 
 /**
  * The {@link OperatorChain} that is used for restoring tasks that are {@link
- * TaskStateManager#isFinishedOnRestore()}.
+ * TaskStateManager#isTaskDeployedAsFinished()}.
  */
 @Internal
 public class FinishedOperatorChain<OUT, OP extends StreamOperator<OUT>>
@@ -51,7 +51,7 @@ public class FinishedOperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     @Override
-    public boolean isFinishedOnRestore() {
+    public boolean isTaskDeployedAsFinished() {
         return true;
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 26380ab..a009452 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -165,7 +165,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                 new HashMap<>(outEdgesInOrder.size());
         this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
         this.finishedOnRestoreInput =
-                this.isFinishedOnRestore()
+                this.isTaskDeployedAsFinished()
                         ? new FinishedOnRestoreInput(
                                 streamOutputs, 
configuration.getInputs(userCodeClassloader).length)
                         : null;
@@ -273,7 +273,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
     }
 
-    public abstract boolean isFinishedOnRestore();
+    public abstract boolean isTaskDeployedAsFinished();
 
     public abstract void dispatchOperatorEvent(
             OperatorID operator, SerializedValue<OperatorEvent> event) throws 
FlinkException;
@@ -564,7 +564,7 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
                     sourceInput,
                     new ChainedSource(
                             chainedSourceOutput,
-                            this.isFinishedOnRestore()
+                            this.isTaskDeployedAsFinished()
                                     ? new 
StreamTaskFinishedOnRestoreSourceInput<>(
                                             sourceOperator, 
sourceInputGateIndex++, inputId)
                                     : new StreamTaskSourceInput<>(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
index 56ccbac..7a267a7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
@@ -73,7 +73,7 @@ public class RegularOperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     @Override
-    public boolean isFinishedOnRestore() {
+    public boolean isTaskDeployedAsFinished() {
         return false;
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index ce9de79..8d0bf97 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -73,7 +73,7 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
         final StreamTaskInput<T> input;
 
         // TODO: should the input be constructed inside the `OperatorChain` 
class?
-        if (operatorChain.isFinishedOnRestore()) {
+        if (operatorChain.isTaskDeployedAsFinished()) {
             input = new 
StreamTaskFinishedOnRestoreSourceInput<>(sourceOperator, 0, 0);
         } else if (sourceReader instanceof ExternallyInducedSourceReader) {
             isExternallyInducedSource = true;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e4908ae..d0436ec 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -231,7 +231,7 @@ public class SourceStreamTask<
 
     private void interruptSourceThread(boolean interrupt) {
         // Nothing need to do if the source is finished on restore
-        if (operatorChain != null && operatorChain.isFinishedOnRestore()) {
+        if (operatorChain != null && operatorChain.isTaskDeployedAsFinished()) 
{
             return;
         }
 
@@ -316,7 +316,7 @@ public class SourceStreamTask<
         @Override
         public void run() {
             try {
-                if (!operatorChain.isFinishedOnRestore()) {
+                if (!operatorChain.isTaskDeployedAsFinished()) {
                     LOG.debug(
                             "Legacy source {} skip execution since the task is 
finished on restore",
                             getTaskNameWithSubtaskAndId());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2872bee..bd8ece4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -663,7 +663,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         LOG.debug("Initializing {}.", getName());
 
         operatorChain =
-                getEnvironment().getTaskStateManager().isFinishedOnRestore()
+                
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
                         ? new FinishedOperatorChain<>(this, recordWriter)
                         : new RegularOperatorChain<>(this, recordWriter);
         mainOperator = operatorChain.getMainOperator();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
index c786f0a..33bc1cc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
@@ -61,7 +61,7 @@ public interface SubtaskCheckpointCoordinator extends 
Closeable {
             CheckpointOptions checkpointOptions,
             CheckpointMetricsBuilder checkpointMetrics,
             OperatorChain<?, ?> operatorChain,
-            boolean isOperatorsFinished,
+            boolean isTaskFinished,
             Supplier<Boolean> isRunning)
             throws Exception;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index fb0342f..e06bb3a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -245,7 +245,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             CheckpointOptions options,
             CheckpointMetricsBuilder metrics,
             OperatorChain<?, ?> operatorChain,
-            boolean isOperatorsFinished,
+            boolean isTaskFinished,
             Supplier<Boolean> isRunning)
             throws Exception {
 
@@ -318,8 +318,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                         snapshotFutures,
                         metadata,
                         metrics,
-                        operatorChain.isFinishedOnRestore(),
-                        isOperatorsFinished,
+                        operatorChain.isTaskDeployedAsFinished(),
+                        isTaskFinished,
                         isRunning);
             } else {
                 cleanup(snapshotFutures, metadata, metrics, new 
Exception("Checkpoint declined"));
@@ -545,8 +545,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,
             CheckpointMetaData metadata,
             CheckpointMetricsBuilder metrics,
-            boolean isFinishedOnRestore,
-            boolean isOperatorsFinished,
+            boolean isTaskDeployedAsFinished,
+            boolean isTaskFinished,
             Supplier<Boolean> isRunning)
             throws IOException {
         AsyncCheckpointRunnable asyncCheckpointRunnable =
@@ -559,8 +559,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                         unregisterConsumer(),
                         env,
                         asyncExceptionHandler,
-                        isFinishedOnRestore,
-                        isOperatorsFinished,
+                        isTaskDeployedAsFinished,
+                        isTaskFinished,
                         isRunning);
 
         registerAsyncCheckpointRunnable(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 8725b65..ca61ed0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -152,7 +152,7 @@ public class AsyncCheckpointRunnableTest {
     private AsyncCheckpointRunnable createAsyncRunnable(
             Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress,
             TestEnvironment environment,
-            boolean isFinishedOnRestore,
+            boolean isTaskDeployedAsFinished,
             boolean isTaskRunning) {
         return new AsyncCheckpointRunnable(
                 snapshotsInProgress,
@@ -165,7 +165,7 @@ public class AsyncCheckpointRunnableTest {
                 r -> {},
                 environment,
                 (msg, ex) -> {},
-                isFinishedOnRestore,
+                isTaskDeployedAsFinished,
                 false,
                 () -> isTaskRunning);
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 5a60391..efd0ea1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -753,7 +753,7 @@ public class StreamTaskFinalCheckpointsTest {
                                 .getTaskStateManager()
                                 
.getJobManagerTaskStateSnapshotsByCheckpointId()
                                 .get(2L)
-                                .isOperatorsFinished());
+                                .isTaskFinished());
 
                 // Trigger the first checkpoint after we call operators' 
finish method.
                 // The checkpoint is added to the mailbox and will be 
processed in the
@@ -776,7 +776,7 @@ public class StreamTaskFinalCheckpointsTest {
                                 .getTaskStateManager()
                                 
.getJobManagerTaskStateSnapshotsByCheckpointId()
                                 .get(4L)
-                                .isOperatorsFinished());
+                                .isTaskFinished());
             }
 
         } finally {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
index dc83d84..c6012ca 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
@@ -79,7 +79,7 @@ public class TestSubtaskCheckpointCoordinator implements 
SubtaskCheckpointCoordi
             CheckpointOptions checkpointOptions,
             CheckpointMetricsBuilder checkpointMetrics,
             OperatorChain<?, ?> operatorChain,
-            boolean isOperatorsFinished,
+            boolean isTaskFinished,
             Supplier<Boolean> isRunning) {}
 
     @Override

Reply via email to