This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new ab196a6 [FLINK-24269][Runtime / Checkpointing] Rename methods around
final checkpoints
ab196a6 is described below
commit ab196a69b4a8ae31f0fb36996602b0afc8f12059
Author: liliwei <[email protected]>
AuthorDate: Fri Sep 17 23:17:21 2021 +0800
[FLINK-24269][Runtime / Checkpointing] Rename methods around final
checkpoints
This closes #17316
---
.../api/runtime/SavepointTaskStateManager.java | 2 +-
.../runtime/checkpoint/PendingCheckpoint.java | 9 +++--
.../runtime/checkpoint/TaskStateSnapshot.java | 40 +++++++++++-----------
.../flink/runtime/state/TaskStateManager.java | 2 +-
.../flink/runtime/state/TaskStateManagerImpl.java | 4 +--
.../CheckpointCoordinatorRestoringTest.java | 2 +-
.../CheckpointCoordinatorTestingUtils.java | 4 +--
.../checkpoint/StateAssignmentOperationTest.java | 4 +--
.../runtime/state/TaskStateManagerImplTest.java | 2 +-
.../flink/runtime/state/TestTaskStateManager.java | 4 +--
.../runtime/io/StreamTwoInputProcessorFactory.java | 2 +-
.../runtime/tasks/AsyncCheckpointRunnable.java | 18 +++++-----
.../runtime/tasks/FinishedOperatorChain.java | 4 +--
.../streaming/runtime/tasks/OperatorChain.java | 6 ++--
.../runtime/tasks/RegularOperatorChain.java | 2 +-
.../runtime/tasks/SourceOperatorStreamTask.java | 2 +-
.../streaming/runtime/tasks/SourceStreamTask.java | 4 +--
.../flink/streaming/runtime/tasks/StreamTask.java | 2 +-
.../tasks/SubtaskCheckpointCoordinator.java | 2 +-
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 14 ++++----
.../runtime/tasks/AsyncCheckpointRunnableTest.java | 4 +--
.../tasks/StreamTaskFinalCheckpointsTest.java | 4 +--
.../tasks/TestSubtaskCheckpointCoordinator.java | 2 +-
23 files changed, 69 insertions(+), 70 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index ac4db13..b51a1ec 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -63,7 +63,7 @@ final class SavepointTaskStateManager implements
TaskStateManager {
CheckpointMetaData checkpointMetaData, CheckpointMetrics
checkpointMetrics) {}
@Override
- public boolean isFinishedOnRestore() {
+ public boolean isTaskDeployedAsFinished() {
return false;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index be7fc5e..f64af16 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -399,16 +399,15 @@ public class PendingCheckpoint implements Checkpoint {
}
long ackTimestamp = System.currentTimeMillis();
- if (operatorSubtaskStates != null &&
operatorSubtaskStates.isFinishedOnRestore()) {
+ if (operatorSubtaskStates != null &&
operatorSubtaskStates.isTaskDeployedAsFinished()) {
checkpointPlan.reportTaskFinishedOnRestore(vertex);
} else {
List<OperatorIDPair> operatorIDs =
vertex.getJobVertex().getOperatorIDs();
for (OperatorIDPair operatorID : operatorIDs) {
- updateNonFinishedOnRestoreOperatorState(
- vertex, operatorSubtaskStates, operatorID);
+ updateOperatorState(vertex, operatorSubtaskStates,
operatorID);
}
- if (operatorSubtaskStates != null &&
operatorSubtaskStates.isOperatorsFinished()) {
+ if (operatorSubtaskStates != null &&
operatorSubtaskStates.isTaskFinished()) {
checkpointPlan.reportTaskHasFinishedOperators(vertex);
}
}
@@ -454,7 +453,7 @@ public class PendingCheckpoint implements Checkpoint {
}
}
- private void updateNonFinishedOnRestoreOperatorState(
+ private void updateOperatorState(
ExecutionVertex vertex,
TaskStateSnapshot operatorSubtaskStates,
OperatorIDPair operatorID) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
index aa65c98..aa31dc9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
@@ -63,16 +63,16 @@ public class TaskStateSnapshot implements
CompositeStateHandle {
/** Mapping from an operator id to the state of one subtask of this
operator. */
private final Map<OperatorID, OperatorSubtaskState>
subtaskStatesByOperatorID;
- private final boolean isFinishedOnRestore;
+ private final boolean isTaskDeployedAsFinished;
- private final boolean isOperatorsFinished;
+ private final boolean isTaskFinished;
public TaskStateSnapshot() {
this(10, false);
}
- public TaskStateSnapshot(int size, boolean isOperatorsFinished) {
- this(new HashMap<>(size), false, isOperatorsFinished);
+ public TaskStateSnapshot(int size, boolean isTaskFinished) {
+ this(new HashMap<>(size), false, isTaskFinished);
}
public TaskStateSnapshot(Map<OperatorID, OperatorSubtaskState>
subtaskStatesByOperatorID) {
@@ -81,21 +81,21 @@ public class TaskStateSnapshot implements
CompositeStateHandle {
private TaskStateSnapshot(
Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID,
- boolean isFinishedOnRestore,
- boolean isOperatorsFinished) {
+ boolean isTaskDeployedAsFinished,
+ boolean isTaskFinished) {
this.subtaskStatesByOperatorID =
Preconditions.checkNotNull(subtaskStatesByOperatorID);
- this.isFinishedOnRestore = isFinishedOnRestore;
- this.isOperatorsFinished = isOperatorsFinished;
+ this.isTaskDeployedAsFinished = isTaskDeployedAsFinished;
+ this.isTaskFinished = isTaskFinished;
}
/** Returns whether all the operators of the task are already finished on
restoring. */
- public boolean isFinishedOnRestore() {
- return isFinishedOnRestore;
+ public boolean isTaskDeployedAsFinished() {
+ return isTaskDeployedAsFinished;
}
/** Returns whether all the operators of the task have called finished
methods. */
- public boolean isOperatorsFinished() {
- return isOperatorsFinished;
+ public boolean isTaskFinished() {
+ return isTaskFinished;
}
/** Returns the subtask state for the given operator id (or null if not
contained). */
@@ -129,7 +129,7 @@ public class TaskStateSnapshot implements
CompositeStateHandle {
return true;
}
}
- return isFinishedOnRestore;
+ return isTaskDeployedAsFinished;
}
/**
@@ -187,13 +187,13 @@ public class TaskStateSnapshot implements
CompositeStateHandle {
TaskStateSnapshot that = (TaskStateSnapshot) o;
return subtaskStatesByOperatorID.equals(that.subtaskStatesByOperatorID)
- && isFinishedOnRestore == that.isFinishedOnRestore
- && isOperatorsFinished == that.isOperatorsFinished;
+ && isTaskDeployedAsFinished == that.isTaskDeployedAsFinished
+ && isTaskFinished == that.isTaskFinished;
}
@Override
public int hashCode() {
- return Objects.hash(subtaskStatesByOperatorID, isFinishedOnRestore,
isOperatorsFinished);
+ return Objects.hash(subtaskStatesByOperatorID,
isTaskDeployedAsFinished, isTaskFinished);
}
@Override
@@ -201,10 +201,10 @@ public class TaskStateSnapshot implements
CompositeStateHandle {
return "TaskOperatorSubtaskStates{"
+ "subtaskStatesByOperatorID="
+ subtaskStatesByOperatorID
- + ", isFinished="
- + isFinishedOnRestore
- + ", isOperatorsFinished="
- + isOperatorsFinished
+ + ", isTaskDeployedAsFinished="
+ + isTaskDeployedAsFinished
+ + ", isTaskFinished="
+ + isTaskFinished
+ '}';
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index f4dadf9..6004110 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -74,7 +74,7 @@ public interface TaskStateManager extends CheckpointListener,
AutoCloseable {
CheckpointMetaData checkpointMetaData, CheckpointMetrics
checkpointMetrics);
/** Whether all the operators of the task are finished on restore. */
- boolean isFinishedOnRestore();
+ boolean isTaskDeployedAsFinished();
/** Acquires the checkpoint id to restore from. */
Optional<Long> getRestoreCheckpointId();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index 1fbb81d..2d14d37 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -156,12 +156,12 @@ public class TaskStateManagerImpl implements
TaskStateManager {
return
jobManagerTaskRestore.getTaskStateSnapshot().getOutputRescalingDescriptor();
}
- public boolean isFinishedOnRestore() {
+ public boolean isTaskDeployedAsFinished() {
if (jobManagerTaskRestore == null) {
return false;
}
- return
jobManagerTaskRestore.getTaskStateSnapshot().isFinishedOnRestore();
+ return
jobManagerTaskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished();
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 51579a0..398162d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -1092,7 +1092,7 @@ public class CheckpointCoordinatorRestoringTest extends
TestLogger {
.getCurrentExecutionAttempt()
.getTaskRestore()
.getTaskStateSnapshot();
- assertTrue(restoredState.isFinishedOnRestore());
+ assertTrue(restoredState.isTaskDeployedAsFinished());
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 35a2e72..a178a4c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -424,8 +424,8 @@ public class CheckpointCoordinatorTestingUtils {
}
public static TaskStateSnapshot createSnapshotWithUnionListState(
- File stateFile, OperatorID operatorId, boolean
isOperatorsFinished) throws IOException {
- TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(1,
isOperatorsFinished);
+ File stateFile, OperatorID operatorId, boolean isTaskFinished)
throws IOException {
+ TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(1,
isTaskFinished);
taskStateSnapshot.putSubtaskStateByOperatorID(
operatorId, createSubtaskStateWithUnionListState(stateFile));
return taskStateSnapshot;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 17f8b2a..629150c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -756,14 +756,14 @@ public class StateAssignmentOperationTest extends
TestLogger {
ExecutionJobVertex jobVertexWithFinishedOperator =
vertices.get(operatorIds.get(0));
for (ExecutionVertex task :
jobVertexWithFinishedOperator.getTaskVertices()) {
JobManagerTaskRestore taskRestore =
task.getCurrentExecutionAttempt().getTaskRestore();
-
Assert.assertTrue(taskRestore.getTaskStateSnapshot().isFinishedOnRestore());
+
Assert.assertTrue(taskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished());
}
// Check the job vertex without finished operator.
ExecutionJobVertex jobVertexWithoutFinishedOperator =
vertices.get(operatorIds.get(1));
for (ExecutionVertex task :
jobVertexWithoutFinishedOperator.getTaskVertices()) {
JobManagerTaskRestore taskRestore =
task.getCurrentExecutionAttempt().getTaskRestore();
-
Assert.assertFalse(taskRestore.getTaskStateSnapshot().isFinishedOnRestore());
+
Assert.assertFalse(taskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index ed0a707..c487ea9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -281,7 +281,7 @@ public class TaskStateManagerImplTest extends TestLogger {
null,
jobManagerTaskRestore,
new TestCheckpointResponder());
- Assert.assertTrue(stateManager.isFinishedOnRestore());
+ Assert.assertTrue(stateManager.isTaskDeployedAsFinished());
}
public void testAcquringRestoreCheckpointId() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 22a6882..e75d4fa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -147,10 +147,10 @@ public class TestTaskStateManager implements
TaskStateManager {
}
@Override
- public boolean isFinishedOnRestore() {
+ public boolean isTaskDeployedAsFinished() {
TaskStateSnapshot jmTaskStateSnapshot =
getLastJobManagerTaskStateSnapshot();
if (jmTaskStateSnapshot != null) {
- return jmTaskStateSnapshot.isFinishedOnRestore();
+ return jmTaskStateSnapshot.isTaskDeployedAsFinished();
}
return false;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
index 199f4b6..8b4330a 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessorFactory.java
@@ -174,7 +174,7 @@ public class StreamTwoInputProcessorFactory {
@Nullable
FinishedOnRestoreWatermarkBypass watermarkBypass =
- operatorChain.isFinishedOnRestore()
+ operatorChain.isTaskDeployedAsFinished()
? new
FinishedOnRestoreWatermarkBypass(operatorChain.getStreamOutputs())
: null;
StreamTaskNetworkOutput<IN1> output1 =
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index c077914..0381f62 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -52,8 +52,8 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(AsyncCheckpointRunnable.class);
private final String taskName;
private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
- private final boolean isFinishedOnRestore;
- private final boolean isOperatorsFinished;
+ private final boolean isTaskDeployedAsFinished;
+ private final boolean isTaskFinished;
private final Supplier<Boolean> isTaskRunning;
private final Environment taskEnvironment;
private final CompletableFuture<Void> finishedFuture = new
CompletableFuture<>();
@@ -85,8 +85,8 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
Consumer<AsyncCheckpointRunnable> unregister,
Environment taskEnvironment,
AsyncExceptionHandler asyncExceptionHandler,
- boolean isFinishedOnRestore,
- boolean isOperatorsFinished,
+ boolean isTaskDeployedAsFinished,
+ boolean isTaskFinished,
Supplier<Boolean> isTaskRunning) {
this.operatorSnapshotsInProgress =
checkNotNull(operatorSnapshotsInProgress);
@@ -97,8 +97,8 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
this.unregisterConsumer = unregister;
this.taskEnvironment = checkNotNull(taskEnvironment);
this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler);
- this.isFinishedOnRestore = isFinishedOnRestore;
- this.isOperatorsFinished = isOperatorsFinished;
+ this.isTaskDeployedAsFinished = isTaskDeployedAsFinished;
+ this.isTaskFinished = isTaskFinished;
this.isTaskRunning = isTaskRunning;
}
@@ -116,7 +116,7 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
try {
SnapshotsFinalizeResult snapshotsFinalizeResult =
- isFinishedOnRestore
+ isTaskDeployedAsFinished
? new SnapshotsFinalizeResult(
TaskStateSnapshot.FINISHED_ON_RESTORE,
TaskStateSnapshot.FINISHED_ON_RESTORE,
@@ -162,9 +162,9 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws
Exception {
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
- new TaskStateSnapshot(operatorSnapshotsInProgress.size(),
isOperatorsFinished);
+ new TaskStateSnapshot(operatorSnapshotsInProgress.size(),
isTaskFinished);
TaskStateSnapshot localTaskOperatorSubtaskStates =
- new TaskStateSnapshot(operatorSnapshotsInProgress.size(),
isOperatorsFinished);
+ new TaskStateSnapshot(operatorSnapshotsInProgress.size(),
isTaskFinished);
long bytesPersistedDuringAlignment = 0;
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry :
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
index 53dd7e8..f22086f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
@@ -38,7 +38,7 @@ import java.util.function.Supplier;
/**
* The {@link OperatorChain} that is used for restoring tasks that are {@link
- * TaskStateManager#isFinishedOnRestore()}.
+ * TaskStateManager#isTaskDeployedAsFinished()}.
*/
@Internal
public class FinishedOperatorChain<OUT, OP extends StreamOperator<OUT>>
@@ -51,7 +51,7 @@ public class FinishedOperatorChain<OUT, OP extends
StreamOperator<OUT>>
}
@Override
- public boolean isFinishedOnRestore() {
+ public boolean isTaskDeployedAsFinished() {
return true;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 26380ab..a009452 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -165,7 +165,7 @@ public abstract class OperatorChain<OUT, OP extends
StreamOperator<OUT>>
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
this.finishedOnRestoreInput =
- this.isFinishedOnRestore()
+ this.isTaskDeployedAsFinished()
? new FinishedOnRestoreInput(
streamOutputs,
configuration.getInputs(userCodeClassloader).length)
: null;
@@ -273,7 +273,7 @@ public abstract class OperatorChain<OUT, OP extends
StreamOperator<OUT>>
firstOperatorWrapper = linkOperatorWrappers(allOperatorWrappers);
}
- public abstract boolean isFinishedOnRestore();
+ public abstract boolean isTaskDeployedAsFinished();
public abstract void dispatchOperatorEvent(
OperatorID operator, SerializedValue<OperatorEvent> event) throws
FlinkException;
@@ -564,7 +564,7 @@ public abstract class OperatorChain<OUT, OP extends
StreamOperator<OUT>>
sourceInput,
new ChainedSource(
chainedSourceOutput,
- this.isFinishedOnRestore()
+ this.isTaskDeployedAsFinished()
? new
StreamTaskFinishedOnRestoreSourceInput<>(
sourceOperator,
sourceInputGateIndex++, inputId)
: new StreamTaskSourceInput<>(
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
index 56ccbac..7a267a7 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/RegularOperatorChain.java
@@ -73,7 +73,7 @@ public class RegularOperatorChain<OUT, OP extends
StreamOperator<OUT>>
}
@Override
- public boolean isFinishedOnRestore() {
+ public boolean isTaskDeployedAsFinished() {
return false;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index ce9de79..8d0bf97 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -73,7 +73,7 @@ public class SourceOperatorStreamTask<T> extends
StreamTask<T, SourceOperator<T,
final StreamTaskInput<T> input;
// TODO: should the input be constructed inside the `OperatorChain`
class?
- if (operatorChain.isFinishedOnRestore()) {
+ if (operatorChain.isTaskDeployedAsFinished()) {
input = new
StreamTaskFinishedOnRestoreSourceInput<>(sourceOperator, 0, 0);
} else if (sourceReader instanceof ExternallyInducedSourceReader) {
isExternallyInducedSource = true;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e4908ae..d0436ec 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -231,7 +231,7 @@ public class SourceStreamTask<
private void interruptSourceThread(boolean interrupt) {
// Nothing need to do if the source is finished on restore
- if (operatorChain != null && operatorChain.isFinishedOnRestore()) {
+ if (operatorChain != null && operatorChain.isTaskDeployedAsFinished())
{
return;
}
@@ -316,7 +316,7 @@ public class SourceStreamTask<
@Override
public void run() {
try {
- if (!operatorChain.isFinishedOnRestore()) {
+ if (!operatorChain.isTaskDeployedAsFinished()) {
LOG.debug(
"Legacy source {} skip execution since the task is
finished on restore",
getTaskNameWithSubtaskAndId());
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2872bee..bd8ece4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -663,7 +663,7 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
LOG.debug("Initializing {}.", getName());
operatorChain =
- getEnvironment().getTaskStateManager().isFinishedOnRestore()
+
getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()
? new FinishedOperatorChain<>(this, recordWriter)
: new RegularOperatorChain<>(this, recordWriter);
mainOperator = operatorChain.getMainOperator();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
index c786f0a..33bc1cc 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinator.java
@@ -61,7 +61,7 @@ public interface SubtaskCheckpointCoordinator extends
Closeable {
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics,
OperatorChain<?, ?> operatorChain,
- boolean isOperatorsFinished,
+ boolean isTaskFinished,
Supplier<Boolean> isRunning)
throws Exception;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index fb0342f..e06bb3a 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -245,7 +245,7 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
CheckpointOptions options,
CheckpointMetricsBuilder metrics,
OperatorChain<?, ?> operatorChain,
- boolean isOperatorsFinished,
+ boolean isTaskFinished,
Supplier<Boolean> isRunning)
throws Exception {
@@ -318,8 +318,8 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
snapshotFutures,
metadata,
metrics,
- operatorChain.isFinishedOnRestore(),
- isOperatorsFinished,
+ operatorChain.isTaskDeployedAsFinished(),
+ isTaskFinished,
isRunning);
} else {
cleanup(snapshotFutures, metadata, metrics, new
Exception("Checkpoint declined"));
@@ -545,8 +545,8 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,
CheckpointMetaData metadata,
CheckpointMetricsBuilder metrics,
- boolean isFinishedOnRestore,
- boolean isOperatorsFinished,
+ boolean isTaskDeployedAsFinished,
+ boolean isTaskFinished,
Supplier<Boolean> isRunning)
throws IOException {
AsyncCheckpointRunnable asyncCheckpointRunnable =
@@ -559,8 +559,8 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
unregisterConsumer(),
env,
asyncExceptionHandler,
- isFinishedOnRestore,
- isOperatorsFinished,
+ isTaskDeployedAsFinished,
+ isTaskFinished,
isRunning);
registerAsyncCheckpointRunnable(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
index 8725b65..ca61ed0 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -152,7 +152,7 @@ public class AsyncCheckpointRunnableTest {
private AsyncCheckpointRunnable createAsyncRunnable(
Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress,
TestEnvironment environment,
- boolean isFinishedOnRestore,
+ boolean isTaskDeployedAsFinished,
boolean isTaskRunning) {
return new AsyncCheckpointRunnable(
snapshotsInProgress,
@@ -165,7 +165,7 @@ public class AsyncCheckpointRunnableTest {
r -> {},
environment,
(msg, ex) -> {},
- isFinishedOnRestore,
+ isTaskDeployedAsFinished,
false,
() -> isTaskRunning);
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 5a60391..efd0ea1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -753,7 +753,7 @@ public class StreamTaskFinalCheckpointsTest {
.getTaskStateManager()
.getJobManagerTaskStateSnapshotsByCheckpointId()
.get(2L)
- .isOperatorsFinished());
+ .isTaskFinished());
// Trigger the first checkpoint after we call operators'
finish method.
// The checkpoint is added to the mailbox and will be
processed in the
@@ -776,7 +776,7 @@ public class StreamTaskFinalCheckpointsTest {
.getTaskStateManager()
.getJobManagerTaskStateSnapshotsByCheckpointId()
.get(4L)
- .isOperatorsFinished());
+ .isTaskFinished());
}
} finally {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
index dc83d84..c6012ca 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java
@@ -79,7 +79,7 @@ public class TestSubtaskCheckpointCoordinator implements
SubtaskCheckpointCoordi
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics,
OperatorChain<?, ?> operatorChain,
- boolean isOperatorsFinished,
+ boolean isTaskFinished,
Supplier<Boolean> isRunning) {}
@Override