Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168169207
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -859,56 +861,77 @@ public void run() {
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                        
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
     
    -                                   TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
    -
    -                                   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
    -
    -                                   // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
    -                                   // to stateless tasks on restore. This 
enables simple job modifications that only concern
    -                                   // stateless without the need to assign 
them uids to match their (always empty) states.
    -                                   
taskStateManager.reportTaskStateSnapshot(
    -                                           checkpointMetaData,
    -                                           checkpointMetrics,
    -                                           acknowledgedState);
    -
    -                                   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
    -                                           owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
    -
    -                                   LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
    -                                           owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
    +                                   reportCompletedSnapshotStates(
    +                                           
jobManagerTaskOperatorSubtaskStates,
    +                                           localTaskOperatorSubtaskStates,
    +                                           asyncDurationMillis);
     
                                } else {
                                        LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
                                                owner.getName(),
                                                
checkpointMetaData.getCheckpointId());
                                }
                        } catch (Exception e) {
    -                           // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
    -                           // in order to clean up, we have to set it to 
RUNNING again.
    -                           asyncCheckpointState.compareAndSet(
    -                                   
CheckpointingOperation.AsynCheckpointState.COMPLETED,
    -                                   
CheckpointingOperation.AsynCheckpointState.RUNNING);
    -
    -                           try {
    -                                   cleanup();
    -                           } catch (Exception cleanupException) {
    -                                   e.addSuppressed(cleanupException);
    -                           }
    -
    -                           Exception checkpointException = new Exception(
    -                                   "Could not materialize checkpoint " + 
checkpointId + " for operator " +
    -                                           owner.getName() + '.',
    -                                   e);
    -
    -                           
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
    -                                   checkpointMetaData,
    -                                   checkpointException);
    +                           handleExecutionException(e);
                        } finally {
                                owner.cancelables.unregisterCloseable(this);
                                
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                        }
                }
     
    +           private void reportCompletedSnapshotStates(
    +                   TaskStateSnapshot acknowledgedTaskStateSnapshot,
    +                   TaskStateSnapshot localTaskStateSnapshot,
    +                   long asyncDurationMillis) {
    +
    +                   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
    +
    +                   boolean hasAckState = 
acknowledgedTaskStateSnapshot.hasState();
    +                   boolean hasLocalState = 
localTaskStateSnapshot.hasState();
    +
    +                   Preconditions.checkState(hasAckState || !hasLocalState,
    +                           "Found cached state but no corresponding 
primary state is reported to the job " +
    +                                   "manager. This indicates a problem.");
    +
    +                   // we signal stateless tasks by reporting null, so that 
there are no attempts to assign empty state
    +                   // to stateless tasks on restore. This enables simple 
job modifications that only concern
    +                   // stateless without the need to assign them uids to 
match their (always empty) states.
    +                   taskStateManager.reportTaskStateSnapshots(
    +                           checkpointMetaData,
    +                           checkpointMetrics,
    +                           hasAckState ? acknowledgedTaskStateSnapshot : 
null,
    +                           hasLocalState ? localTaskStateSnapshot : null);
    +
    +                   LOG.debug("{} - finished asynchronous part of 
checkpoint {}. Asynchronous duration: {} ms",
    +                           owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
    +
    +                   LOG.trace("{} - reported the following states in 
snapshot for checkpoint {}: {}.",
    +                           owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
    +           }
    +
    +           private void handleExecutionException(Exception e) {
    +                   // the state is completed if an exception occurred in 
the acknowledgeCheckpoint call
    --- End diff --
    
    👍 


---

Reply via email to