Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4353#discussion_r129277176
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
@@ -867,81 +845,60 @@ public String toString() {
AsyncCheckpointRunnable(
StreamTask<?, ?> owner,
- List<StreamStateHandle>
nonPartitionedStateHandles,
- List<OperatorSnapshotResult>
snapshotInProgressList,
+ Map<OperatorID, StreamStateHandle>
nonPartitionedStateHandles,
+ Map<OperatorID, OperatorSnapshotResult>
operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
long asyncStartNanos) {
this.owner = Preconditions.checkNotNull(owner);
- this.snapshotInProgressList =
Preconditions.checkNotNull(snapshotInProgressList);
+ this.operatorSnapshotsInProgress =
Preconditions.checkNotNull(operatorSnapshotsInProgress);
this.checkpointMetaData =
Preconditions.checkNotNull(checkpointMetaData);
this.checkpointMetrics =
Preconditions.checkNotNull(checkpointMetrics);
this.nonPartitionedStateHandles =
nonPartitionedStateHandles;
this.asyncStartNanos = asyncStartNanos;
-
- if (!snapshotInProgressList.isEmpty()) {
- // TODO Currently only the head operator of a
chain can have keyed state, so simply access it directly.
- int headIndex = snapshotInProgressList.size() -
1;
- OperatorSnapshotResult snapshotInProgress =
snapshotInProgressList.get(headIndex);
- if (null != snapshotInProgress) {
- this.futureKeyedBackendStateHandles =
snapshotInProgress.getKeyedStateManagedFuture();
- this.futureKeyedStreamStateHandles =
snapshotInProgress.getKeyedStateRawFuture();
- }
- }
}
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
- // Keyed state handle future, currently only
one (the head) operator can have this
- KeyedStateHandle keyedStateHandleBackend =
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
- KeyedStateHandle keyedStateHandleStream =
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
-
- List<OperatorStateHandle> operatorStatesBackend
= new ArrayList<>(snapshotInProgressList.size());
- List<OperatorStateHandle> operatorStatesStream
= new ArrayList<>(snapshotInProgressList.size());
-
- for (OperatorSnapshotResult snapshotInProgress
: snapshotInProgressList) {
- if (null != snapshotInProgress) {
- operatorStatesBackend.add(
-
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
- operatorStatesStream.add(
-
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
- } else {
- operatorStatesBackend.add(null);
- operatorStatesStream.add(null);
- }
- }
+ boolean hasState = false;
+ final TaskStateSnapshot
taskOperatorSubtaskStates =
+ new
TaskStateSnapshot(operatorSnapshotsInProgress.size());
- final long asyncEndNanos = System.nanoTime();
- final long asyncDurationMillis = (asyncEndNanos
- asyncStartNanos) / 1_000_000;
+ for (Map.Entry<OperatorID,
OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) {
-
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
+ OperatorID operatorID = entry.getKey();
+ OperatorSnapshotResult
snapshotInProgress = entry.getValue();
- ChainedStateHandle<StreamStateHandle>
chainedNonPartitionedOperatorsState =
- new
ChainedStateHandle<>(nonPartitionedStateHandles);
+ OperatorSubtaskState
operatorSubtaskState = new OperatorSubtaskState(
+
nonPartitionedStateHandles.get(operatorID),
+
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
+
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
+
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
+
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture())
+ );
- ChainedStateHandle<OperatorStateHandle>
chainedOperatorStateBackend =
- new
ChainedStateHandle<>(operatorStatesBackend);
+ hasState |=
operatorSubtaskState.hasState();
+
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID,
operatorSubtaskState);
+ }
- ChainedStateHandle<OperatorStateHandle>
chainedOperatorStateStream =
- new
ChainedStateHandle<>(operatorStatesStream);
+ final long asyncEndNanos = System.nanoTime();
+ final long asyncDurationMillis = (asyncEndNanos
- asyncStartNanos) / 1_000_000;
- SubtaskState subtaskState =
createSubtaskStateFromSnapshotStateHandles(
-
chainedNonPartitionedOperatorsState,
- chainedOperatorStateBackend,
- chainedOperatorStateStream,
- keyedStateHandleBackend,
- keyedStateHandleStream);
+
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
+ // we signal a stateless task by
reporting null, so that there are no attempts to assign empty state
+ // to stateless tasks on restore. This
enables simple job modifications that only concern
+ // stateless without the need to assign
them uids to match their (always empty) states.
--- End diff --
stateless **tasks**
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---