This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8c4e9a5 [FLINK-25816][state] Remove checkpoint abortion notification
of notify backend
8c4e9a5 is described below
commit 8c4e9a5540e468e92829be32de41545eab7a8cba
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jan 26 10:30:49 2022 +0100
[FLINK-25816][state] Remove checkpoint abortion notification of notify
backend
The notification currently causes an exception and adds complexity.
It's also not necessary, unlikely to be delivered (because of the
difference in checkpoint/materialization intervals) and unlikely to be
utilized (it will arrive only after the nested snapshot has completed
and most likely do the same GC as in completion notification).
---
.../changelog/ChangelogKeyedStateBackend.java | 51 ++--------------------
1 file changed, 4 insertions(+), 47 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index c746b2b..6bb710d 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -78,13 +78,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.Optional;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -209,19 +207,6 @@ public class ChangelogKeyedStateBackend<K>
/** Checkpoint ID mapped to Materialization ID - used to notify nested
backend of completion. */
private final NavigableMap<Long, Long> materializationIdByCheckpointId =
new TreeMap<>();
- /**
- * Materialization ID mapped to Checkpoint IDs - used to notify nested
backend of abortion.
- * Entry is removed when:
- *
- * <ol>
- * <li>some checkpoint of a Set completes (in which case {@link
#keyedStateBackend} is {@link
- * CheckpointListener#notifyCheckpointComplete(long) notified of
completion}.
- * <li>a newer checkpoint completes
- * <li>all checkpoints of a Set are aborted (in which case {@link
#keyedStateBackend} is
- * {@link CheckpointListener#notifyCheckpointAborted(long) notified
of abortion}.
- * </ol>
- */
- private final Map<Long, Set<Long>> pendingMaterializationConfirmations =
new HashMap<>();
private long lastConfirmedMaterializationId = -1L;
@@ -400,15 +385,8 @@ public class ChangelogKeyedStateBackend<K>
ChangelogSnapshotState changelogStateBackendStateCopy =
changelogSnapshotState;
- if (changelogStateBackendStateCopy.materializationID >
lastConfirmedMaterializationId) {
- materializationIdByCheckpointId.put(
- checkpointId,
changelogStateBackendStateCopy.materializationID);
- pendingMaterializationConfirmations
- .computeIfAbsent(
- changelogStateBackendStateCopy.materializationID,
- ign -> new HashSet<>())
- .add(checkpointId);
- }
+ materializationIdByCheckpointId.put(
+ checkpointId,
changelogStateBackendStateCopy.materializationID);
return toRunnableFuture(
stateChangelogWriter
@@ -511,14 +489,8 @@ public class ChangelogKeyedStateBackend<K>
keyedStateBackend.notifyCheckpointComplete(materializationID);
lastConfirmedMaterializationId = materializationID;
}
- pendingMaterializationConfirmations.remove(materializationID);
}
- // there is a chance that nested backend will miss the abort
notification
- // but there is no other way to cleanup this map
- Map<Long, Long> olderCheckpoints =
- materializationIdByCheckpointId.headMap(checkpointId, true);
-
olderCheckpoints.values().forEach(pendingMaterializationConfirmations::remove);
- olderCheckpoints.clear();
+ materializationIdByCheckpointId.headMap(checkpointId, true).clear();
}
@Override
@@ -530,22 +502,7 @@ public class ChangelogKeyedStateBackend<K>
// This might change if the log ownership changes (the method
won't likely be needed).
stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
}
- Long materializationID =
materializationIdByCheckpointId.remove(checkpointId);
- if (materializationID != null) {
- Set<Long> checkpoints =
pendingMaterializationConfirmations.get(materializationID);
- checkpoints.remove(checkpointId);
- if (checkpoints.isEmpty()) {
- if (materializationID <
changelogSnapshotState.materializationID) {
- // Notification is not strictly required and will arrive
only after the nested
- // snapshot has completed. It's also unlikely to be sent
because of the
- // difference in checkpoint/materialization intervals. But
it can still be
- // useful
- // for some backends.
-
keyedStateBackend.notifyCheckpointAborted(materializationID);
- }
- pendingMaterializationConfirmations.remove(materializationID);
- }
- }
+ // TODO: Consider notifying nested state backend about checkpoint
abortion (FLINK-25850)
}
// -------- Methods not simply delegating to wrapped state backend
---------