Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r168169207 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -859,56 +861,77 @@ public void run() { if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.COMPLETED)) { - TaskStateSnapshot acknowledgedState = hasState ? taskOperatorSubtaskStates : null; - - TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); - - // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state - // to stateless tasks on restore. This enables simple job modifications that only concern - // stateless without the need to assign them uids to match their (always empty) states. - taskStateManager.reportTaskStateSnapshot( - checkpointMetaData, - checkpointMetrics, - acknowledgedState); - - LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", - owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); - - LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", - owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedState); + reportCompletedSnapshotStates( + jobManagerTaskOperatorSubtaskStates, + localTaskOperatorSubtaskStates, + asyncDurationMillis); } else { LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", owner.getName(), checkpointMetaData.getCheckpointId()); } } catch (Exception e) { - // the state is completed if an exception occurred in the acknowledgeCheckpoint call - // in order to clean up, we have to set it to RUNNING again. - asyncCheckpointState.compareAndSet( - CheckpointingOperation.AsynCheckpointState.COMPLETED, - CheckpointingOperation.AsynCheckpointState.RUNNING); - - try { - cleanup(); - } catch (Exception cleanupException) { - e.addSuppressed(cleanupException); - } - - Exception checkpointException = new Exception( - "Could not materialize checkpoint " + checkpointId + " for operator " + - owner.getName() + '.', - e); - - owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException( - checkpointMetaData, - checkpointException); + handleExecutionException(e); } finally { owner.cancelables.unregisterCloseable(this); FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread(); } } + private void reportCompletedSnapshotStates( + TaskStateSnapshot acknowledgedTaskStateSnapshot, + TaskStateSnapshot localTaskStateSnapshot, + long asyncDurationMillis) { + + TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager(); + + boolean hasAckState = acknowledgedTaskStateSnapshot.hasState(); + boolean hasLocalState = localTaskStateSnapshot.hasState(); + + Preconditions.checkState(hasAckState || !hasLocalState, + "Found cached state but no corresponding primary state is reported to the job " + + "manager. This indicates a problem."); + + // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state + // to stateless tasks on restore. This enables simple job modifications that only concern + // stateless without the need to assign them uids to match their (always empty) states. + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + hasAckState ? acknowledgedTaskStateSnapshot : null, + hasLocalState ? localTaskStateSnapshot : null); + + LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", + owner.getName(), checkpointMetaData.getCheckpointId(), asyncDurationMillis); + + LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", + owner.getName(), checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot); + } + + private void handleExecutionException(Exception e) { + // the state is completed if an exception occurred in the acknowledgeCheckpoint call --- End diff -- ð
---