Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r165315625
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
---
@@ -859,56 +861,77 @@ public void run() {
if
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
- TaskStateSnapshot acknowledgedState =
hasState ? taskOperatorSubtaskStates : null;
-
- TaskStateManager taskStateManager =
owner.getEnvironment().getTaskStateManager();
-
- // we signal stateless tasks by
reporting null, so that there are no attempts to assign empty state
- // to stateless tasks on restore. This
enables simple job modifications that only concern
- // stateless without the need to assign
them uids to match their (always empty) states.
-
taskStateManager.reportTaskStateSnapshot(
- checkpointMetaData,
- checkpointMetrics,
- acknowledgedState);
-
- LOG.debug("{} - finished asynchronous
part of checkpoint {}. Asynchronous duration: {} ms",
- owner.getName(),
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-
- LOG.trace("{} - reported the following
states in snapshot for checkpoint {}: {}.",
- owner.getName(),
checkpointMetaData.getCheckpointId(), acknowledgedState);
+ reportCompletedSnapshotStates(
+
jobManagerTaskOperatorSubtaskStates,
+ localTaskOperatorSubtaskStates,
+ asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of
checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
- // the state is completed if an exception
occurred in the acknowledgeCheckpoint call
- // in order to clean up, we have to set it to
RUNNING again.
- asyncCheckpointState.compareAndSet(
-
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-
CheckpointingOperation.AsynCheckpointState.RUNNING);
-
- try {
- cleanup();
- } catch (Exception cleanupException) {
- e.addSuppressed(cleanupException);
- }
-
- Exception checkpointException = new Exception(
- "Could not materialize checkpoint " +
checkpointId + " for operator " +
- owner.getName() + '.',
- e);
-
-
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
- checkpointMetaData,
- checkpointException);
+ handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
+ private void reportCompletedSnapshotStates(
+ TaskStateSnapshot acknowledgedTaskStateSnapshot,
+ TaskStateSnapshot localTaskStateSnapshot,
+ long asyncDurationMillis) {
+
+ TaskStateManager taskStateManager =
owner.getEnvironment().getTaskStateManager();
+
+ boolean hasAckState =
acknowledgedTaskStateSnapshot.hasState();
+ boolean hasLocalState =
localTaskStateSnapshot.hasState();
+
+ Preconditions.checkState(hasAckState || !hasLocalState,
+ "Found cached state but no corresponding
primary state is reported to the job " +
+ "manager. This indicates a problem.");
+
+ // we signal stateless tasks by reporting null, so that
there are no attempts to assign empty state
+ // to stateless tasks on restore. This enables simple
job modifications that only concern
+ // stateless without the need to assign them uids to
match their (always empty) states.
+ taskStateManager.reportTaskStateSnapshots(
+ checkpointMetaData,
+ checkpointMetrics,
+ hasAckState ? acknowledgedTaskStateSnapshot :
null,
+ hasLocalState ? localTaskStateSnapshot : null);
+
+ LOG.debug("{} - finished asynchronous part of
checkpoint {}. Asynchronous duration: {} ms",
+ owner.getName(),
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+
+ LOG.trace("{} - reported the following states in
snapshot for checkpoint {}: {}.",
+ owner.getName(),
checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot);
+ }
+
+ private void handleExecutionException(Exception e) {
+ // the state is completed if an exception occurred in
the acknowledgeCheckpoint call
--- End diff --
comment needs to be adapted.
---