Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4353#discussion_r129277176
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -867,81 +845,60 @@ public String toString() {
     
                AsyncCheckpointRunnable(
                                StreamTask<?, ?> owner,
    -                           List<StreamStateHandle> 
nonPartitionedStateHandles,
    -                           List<OperatorSnapshotResult> 
snapshotInProgressList,
    +                           Map<OperatorID, StreamStateHandle> 
nonPartitionedStateHandles,
    +                           Map<OperatorID, OperatorSnapshotResult> 
operatorSnapshotsInProgress,
                                CheckpointMetaData checkpointMetaData,
                                CheckpointMetrics checkpointMetrics,
                                long asyncStartNanos) {
     
                        this.owner = Preconditions.checkNotNull(owner);
    -                   this.snapshotInProgressList = 
Preconditions.checkNotNull(snapshotInProgressList);
    +                   this.operatorSnapshotsInProgress = 
Preconditions.checkNotNull(operatorSnapshotsInProgress);
                        this.checkpointMetaData = 
Preconditions.checkNotNull(checkpointMetaData);
                        this.checkpointMetrics = 
Preconditions.checkNotNull(checkpointMetrics);
                        this.nonPartitionedStateHandles = 
nonPartitionedStateHandles;
                        this.asyncStartNanos = asyncStartNanos;
    -
    -                   if (!snapshotInProgressList.isEmpty()) {
    -                           // TODO Currently only the head operator of a 
chain can have keyed state, so simply access it directly.
    -                           int headIndex = snapshotInProgressList.size() - 
1;
    -                           OperatorSnapshotResult snapshotInProgress = 
snapshotInProgressList.get(headIndex);
    -                           if (null != snapshotInProgress) {
    -                                   this.futureKeyedBackendStateHandles = 
snapshotInProgress.getKeyedStateManagedFuture();
    -                                   this.futureKeyedStreamStateHandles = 
snapshotInProgress.getKeyedStateRawFuture();
    -                           }
    -                   }
                }
     
                @Override
                public void run() {
                        FileSystemSafetyNet.initializeSafetyNetForThread();
                        try {
    -                           // Keyed state handle future, currently only 
one (the head) operator can have this
    -                           KeyedStateHandle keyedStateHandleBackend = 
FutureUtil.runIfNotDoneAndGet(futureKeyedBackendStateHandles);
    -                           KeyedStateHandle keyedStateHandleStream = 
FutureUtil.runIfNotDoneAndGet(futureKeyedStreamStateHandles);
    -
    -                           List<OperatorStateHandle> operatorStatesBackend 
= new ArrayList<>(snapshotInProgressList.size());
    -                           List<OperatorStateHandle> operatorStatesStream 
= new ArrayList<>(snapshotInProgressList.size());
    -
    -                           for (OperatorSnapshotResult snapshotInProgress 
: snapshotInProgressList) {
    -                                   if (null != snapshotInProgress) {
    -                                           operatorStatesBackend.add(
    -                                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
    -                                           operatorStatesStream.add(
    -                                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
    -                                   } else {
    -                                           operatorStatesBackend.add(null);
    -                                           operatorStatesStream.add(null);
    -                                   }
    -                           }
    +                           boolean hasState = false;
    +                           final TaskStateSnapshot 
taskOperatorSubtaskStates =
    +                                   new 
TaskStateSnapshot(operatorSnapshotsInProgress.size());
     
    -                           final long asyncEndNanos = System.nanoTime();
    -                           final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
    +                           for (Map.Entry<OperatorID, 
OperatorSnapshotResult> entry : operatorSnapshotsInProgress.entrySet()) {
     
    -                           
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
    +                                   OperatorID operatorID = entry.getKey();
    +                                   OperatorSnapshotResult 
snapshotInProgress = entry.getValue();
     
    -                           ChainedStateHandle<StreamStateHandle> 
chainedNonPartitionedOperatorsState =
    -                                           new 
ChainedStateHandle<>(nonPartitionedStateHandles);
    +                                   OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
    +                                           
nonPartitionedStateHandles.get(operatorID),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateManagedFuture()),
    +                                           
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getKeyedStateRawFuture())
    +                                   );
     
    -                           ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateBackend =
    -                                           new 
ChainedStateHandle<>(operatorStatesBackend);
    +                                   hasState |= 
operatorSubtaskState.hasState();
    +                                   
taskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
    +                           }
     
    -                           ChainedStateHandle<OperatorStateHandle> 
chainedOperatorStateStream =
    -                                           new 
ChainedStateHandle<>(operatorStatesStream);
    +                           final long asyncEndNanos = System.nanoTime();
    +                           final long asyncDurationMillis = (asyncEndNanos 
- asyncStartNanos) / 1_000_000;
     
    -                           SubtaskState subtaskState = 
createSubtaskStateFromSnapshotStateHandles(
    -                                           
chainedNonPartitionedOperatorsState,
    -                                           chainedOperatorStateBackend,
    -                                           chainedOperatorStateStream,
    -                                           keyedStateHandleBackend,
    -                                           keyedStateHandleStream);
    +                           
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
     
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                                
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
     
    +                                   // we signal a stateless task by 
reporting null, so that there are no attempts to assign empty state
    +                                   // to stateless tasks on restore. This 
enables simple job modifications that only concern
    +                                   // stateless without the need to assign 
them uids to match their (always empty) states.
    --- End diff --
    
    stateless **tasks**


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to