dawidwys commented on a change in pull request #16238:
URL: https://github.com/apache/flink/pull/16238#discussion_r671234850
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -109,52 +112,25 @@ public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
- TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
- new TaskStateSnapshot(operatorSnapshotsInProgress.size());
- TaskStateSnapshot localTaskOperatorSubtaskStates =
- new TaskStateSnapshot(operatorSnapshotsInProgress.size());
-
- long bytesPersistedDuringAlignment = 0;
- for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry :
- operatorSnapshotsInProgress.entrySet()) {
-
- OperatorID operatorID = entry.getKey();
- OperatorSnapshotFutures snapshotInProgress = entry.getValue();
-
- // finalize the async part of all by executing all snapshot
runnables
- OperatorSnapshotFinalizer finalizedSnapshots =
- new OperatorSnapshotFinalizer(snapshotInProgress);
-
-
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
- operatorID,
finalizedSnapshots.getJobManagerOwnedState());
-
- localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
- operatorID, finalizedSnapshots.getTaskLocalState());
-
- bytesPersistedDuringAlignment +=
- finalizedSnapshots
- .getJobManagerOwnedState()
- .getResultSubpartitionState()
- .getStateSize();
- bytesPersistedDuringAlignment +=
- finalizedSnapshots
- .getJobManagerOwnedState()
- .getInputChannelState()
- .getStateSize();
- }
+ SnapshotsFinalizeResult snapshotsFinalizeResult =
Review comment:
Nice! I really like extracting such parts of the code to separate
methods. It makes it so much easier to read!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]