This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4e9a5  [FLINK-25816][state] Remove checkpoint abortion notification 
of notify backend
8c4e9a5 is described below

commit 8c4e9a5540e468e92829be32de41545eab7a8cba
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jan 26 10:30:49 2022 +0100

    [FLINK-25816][state] Remove checkpoint abortion notification of notify 
backend
    
    The notification currently causes an exception and adds complexity.
    It's also not necessary, unlikely to be delivered (because of the
    difference in checkpoint/materialization intervals) and unlikely to be
    utilized (it will arrive only after the nested snapshot has completed
    and most likely do the same GC as in completion notification).
---
 .../changelog/ChangelogKeyedStateBackend.java      | 51 ++--------------------
 1 file changed, 4 insertions(+), 47 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index c746b2b..6bb710d 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -78,13 +78,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -209,19 +207,6 @@ public class ChangelogKeyedStateBackend<K>
 
     /** Checkpoint ID mapped to Materialization ID - used to notify nested 
backend of completion. */
     private final NavigableMap<Long, Long> materializationIdByCheckpointId = 
new TreeMap<>();
-    /**
-     * Materialization ID mapped to Checkpoint IDs - used to notify nested 
backend of abortion.
-     * Entry is removed when:
-     *
-     * <ol>
-     *   <li>some checkpoint of a Set completes (in which case {@link 
#keyedStateBackend} is {@link
-     *       CheckpointListener#notifyCheckpointComplete(long) notified of 
completion}.
-     *   <li>a newer checkpoint completes
-     *   <li>all checkpoints of a Set are aborted (in which case {@link 
#keyedStateBackend} is
-     *       {@link CheckpointListener#notifyCheckpointAborted(long) notified 
of abortion}.
-     * </ol>
-     */
-    private final Map<Long, Set<Long>> pendingMaterializationConfirmations = 
new HashMap<>();
 
     private long lastConfirmedMaterializationId = -1L;
 
@@ -400,15 +385,8 @@ public class ChangelogKeyedStateBackend<K>
 
         ChangelogSnapshotState changelogStateBackendStateCopy = 
changelogSnapshotState;
 
-        if (changelogStateBackendStateCopy.materializationID > 
lastConfirmedMaterializationId) {
-            materializationIdByCheckpointId.put(
-                    checkpointId, 
changelogStateBackendStateCopy.materializationID);
-            pendingMaterializationConfirmations
-                    .computeIfAbsent(
-                            changelogStateBackendStateCopy.materializationID,
-                            ign -> new HashSet<>())
-                    .add(checkpointId);
-        }
+        materializationIdByCheckpointId.put(
+                checkpointId, 
changelogStateBackendStateCopy.materializationID);
 
         return toRunnableFuture(
                 stateChangelogWriter
@@ -511,14 +489,8 @@ public class ChangelogKeyedStateBackend<K>
                 keyedStateBackend.notifyCheckpointComplete(materializationID);
                 lastConfirmedMaterializationId = materializationID;
             }
-            pendingMaterializationConfirmations.remove(materializationID);
         }
-        // there is a chance that nested backend will miss the abort 
notification
-        // but there is no other way to cleanup this map
-        Map<Long, Long> olderCheckpoints =
-                materializationIdByCheckpointId.headMap(checkpointId, true);
-        
olderCheckpoints.values().forEach(pendingMaterializationConfirmations::remove);
-        olderCheckpoints.clear();
+        materializationIdByCheckpointId.headMap(checkpointId, true).clear();
     }
 
     @Override
@@ -530,22 +502,7 @@ public class ChangelogKeyedStateBackend<K>
             // This might change if the log ownership changes (the method 
won't likely be needed).
             stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
         }
-        Long materializationID = 
materializationIdByCheckpointId.remove(checkpointId);
-        if (materializationID != null) {
-            Set<Long> checkpoints = 
pendingMaterializationConfirmations.get(materializationID);
-            checkpoints.remove(checkpointId);
-            if (checkpoints.isEmpty()) {
-                if (materializationID < 
changelogSnapshotState.materializationID) {
-                    // Notification is not strictly required and will arrive 
only after the nested
-                    // snapshot has completed. It's also unlikely to be sent 
because of the
-                    // difference in checkpoint/materialization intervals. But 
it can still be
-                    // useful
-                    // for some backends.
-                    
keyedStateBackend.notifyCheckpointAborted(materializationID);
-                }
-                pendingMaterializationConfirmations.remove(materializationID);
-            }
-        }
+        // TODO: Consider notifying nested state backend about checkpoint 
abortion (FLINK-25850)
     }
 
     // -------- Methods not simply delegating to wrapped state backend 
---------

Reply via email to