This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52519a8eb695c9523c546439c66910b15f19be20
Author: fredia <[email protected]>
AuthorDate: Tue Jun 14 12:28:25 2022 +0800

    [FLINK-27692][changelog] Refactor ChangelogSnapshotState
---
 .../changelog/ChangelogKeyedStateBackend.java      | 140 ++++++++++++++++-----
 .../changelog/ChangelogKeyedStateBackendTest.java  |   2 +-
 2 files changed, 111 insertions(+), 31 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index ba0fb9427c7..1935600fd4e 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -92,7 +92,6 @@ import java.util.stream.Stream;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static java.util.Collections.unmodifiableList;
 import static 
org.apache.flink.state.changelog.PeriodicMaterializationManager.MaterializationRunnable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -405,10 +404,12 @@ public class ChangelogKeyedStateBackend<K>
                         .whenComplete(
                                 (snapshotResult, throwable) ->
                                         
metrics.reportSnapshotResult(snapshotResult))
-                        .thenApply(
-                                snapshotResult ->
-                                        SnapshotResult.of(
-                                                
snapshotResult.getJobManagerOwnedSnapshot())));
+                        .thenApply(this::castSnapshotResult));
+    }
+
+    @SuppressWarnings("unchecked")
+    private SnapshotResult<KeyedStateHandle> 
castSnapshotResult(SnapshotResult<?> snapshotResult) {
+        return (SnapshotResult<KeyedStateHandle>) snapshotResult;
     }
 
     private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult(
@@ -428,6 +429,26 @@ public class ChangelogKeyedStateBackend<K>
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
+        } else if 
(!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) {
+            return SnapshotResult.withLocalState(
+                    new ChangelogStateBackendHandleImpl(
+                            
changelogStateBackendStateCopy.getMaterializedSnapshot(),
+                            prevDeltaCopy,
+                            getKeyGroupRange(),
+                            checkpointId,
+                            changelogStateBackendStateCopy.materializationID,
+                            persistedSizeOfThisCheckpoint),
+                    new ChangelogStateBackendHandleImpl(
+                            
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
+                            // TODO: Restore ChangelogStateHandles from remote 
temporarily, because
+                            // ChangelogStateHandles are small(about 10MB).
+                            //  In the future, the double-stream option may be 
implemented according
+                            // to the test results.
+                            prevDeltaCopy,
+                            getKeyGroupRange(),
+                            checkpointId,
+                            changelogStateBackendStateCopy.materializationID,
+                            persistedSizeOfThisCheckpoint));
         } else {
             return SnapshotResult.of(
                     new ChangelogStateBackendHandleImpl(
@@ -618,7 +639,7 @@ public class ChangelogKeyedStateBackend<K>
             }
         }
         this.materializedId = materializationId + 1;
-
+        // Todo: distinguish whether the handle is local or remote
         return new ChangelogSnapshotState(
                 materialized,
                 restoredNonMaterialized,
@@ -693,11 +714,20 @@ public class ChangelogKeyedStateBackend<K>
                 upTo,
                 materializedSnapshot);
         changelogSnapshotState =
-                new ChangelogSnapshotState(
-                        getMaterializedResult(materializedSnapshot),
-                        Collections.emptyList(),
-                        upTo,
-                        materializationID);
+                materializedSnapshot.getTaskLocalSnapshot() == null
+                        ? new ChangelogSnapshotState(
+                                getMaterializedResult(materializedSnapshot),
+                                Collections.emptyList(),
+                                upTo,
+                                materializationID)
+                        : new ChangelogSnapshotState(
+                                getMaterializedResult(materializedSnapshot),
+                                
getLocalMaterializedResult(materializedSnapshot),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                upTo,
+                                materializationID);
+
         changelogTruncateHelper.materialized(upTo);
     }
 
@@ -708,6 +738,12 @@ public class ChangelogKeyedStateBackend<K>
         return jobManagerOwned == null ? emptyList() : 
singletonList(jobManagerOwned);
     }
 
+    private List<KeyedStateHandle> getLocalMaterializedResult(
+            @Nonnull SnapshotResult<KeyedStateHandle> materializedSnapshot) {
+        KeyedStateHandle taskLocalSnapshot = 
materializedSnapshot.getTaskLocalSnapshot();
+        return taskLocalSnapshot == null ? emptyList() : 
singletonList(taskLocalSnapshot);
+    }
+
     @Override
     public KeyedStateBackend<K> getDelegatedKeyedStateBackend(boolean 
recursive) {
         return keyedStateBackend.getDelegatedKeyedStateBackend(recursive);
@@ -791,18 +827,16 @@ public class ChangelogKeyedStateBackend<K>
     }
 
     /**
-     * Snapshot State for ChangelogKeyedStatebackend.
+     * Snapshot State for ChangelogKeyedStatebackend, a wrapper over {@link 
SnapshotResult}.
      *
      * <p>It includes three parts: - materialized snapshot from the underlying 
delegated state
      * backend - non-materialized part in the current changelog - 
non-materialized changelog, from
      * previous logs (before failover or rescaling)
      */
-    private static class ChangelogSnapshotState {
-        /**
-         * Materialized snapshot from the underlying delegated state backend. 
Set initially on
-         * restore and later upon materialization.
-         */
-        private final List<KeyedStateHandle> materializedSnapshot;
+    private class ChangelogSnapshotState {
+
+        /** Set initially on restore and later upon materialization. */
+        private final SnapshotResult<ChangelogStateBackendHandle> 
changelogSnapshot;
 
         /**
          * The {@link SequenceNumber} up to which the state is materialized, 
exclusive. This
@@ -810,12 +844,6 @@ public class ChangelogKeyedStateBackend<K>
          */
         private final SequenceNumber materializedTo;
 
-        /**
-         * Non-materialized changelog, from previous logs. Set initially on 
restore and later
-         * cleared upon materialization.
-         */
-        private final List<ChangelogStateHandle> restoredNonMaterialized;
-
         /** ID of this materialization corresponding to the nested backend 
checkpoint ID. */
         private final long materializationID;
 
@@ -824,22 +852,74 @@ public class ChangelogKeyedStateBackend<K>
                 List<ChangelogStateHandle> restoredNonMaterialized,
                 SequenceNumber materializedTo,
                 long materializationID) {
-            this.materializedSnapshot = 
unmodifiableList((materializedSnapshot));
-            this.restoredNonMaterialized = 
unmodifiableList(restoredNonMaterialized);
+            this.changelogSnapshot =
+                    SnapshotResult.of(
+                            new ChangelogStateBackendHandleImpl(
+                                    materializedSnapshot,
+                                    restoredNonMaterialized,
+                                    getKeyGroupRange(),
+                                    lastCheckpointId,
+                                    materializationID,
+                                    0L));
+            this.materializedTo = materializedTo;
+            this.materializationID = materializationID;
+        }
+
+        public ChangelogSnapshotState(
+                List<KeyedStateHandle> materializedSnapshot,
+                List<KeyedStateHandle> localMaterializedSnapshot,
+                List<ChangelogStateHandle> restoredNonMaterialized,
+                List<ChangelogStateHandle> localRestoredNonMaterialized,
+                SequenceNumber materializedTo,
+                long materializationID) {
+            this.changelogSnapshot =
+                    SnapshotResult.withLocalState(
+                            new ChangelogStateBackendHandleImpl(
+                                    materializedSnapshot,
+                                    restoredNonMaterialized,
+                                    getKeyGroupRange(),
+                                    lastCheckpointId,
+                                    materializationID,
+                                    0L),
+                            new ChangelogStateBackendHandleImpl(
+                                    localMaterializedSnapshot,
+                                    localRestoredNonMaterialized,
+                                    getKeyGroupRange(),
+                                    lastCheckpointId,
+                                    materializationID,
+                                    0L));
             this.materializedTo = materializedTo;
             this.materializationID = materializationID;
         }
 
         public List<KeyedStateHandle> getMaterializedSnapshot() {
-            return materializedSnapshot;
+            return changelogSnapshot.getJobManagerOwnedSnapshot() != null
+                    ? 
changelogSnapshot.getJobManagerOwnedSnapshot().getMaterializedStateHandles()
+                    : Collections.emptyList();
         }
 
-        public SequenceNumber lastMaterializedTo() {
-            return materializedTo;
+        public List<KeyedStateHandle> getLocalMaterializedSnapshot() {
+            return changelogSnapshot.getTaskLocalSnapshot() != null
+                    ? 
changelogSnapshot.getTaskLocalSnapshot().getMaterializedStateHandles()
+                    : Collections.emptyList();
         }
 
         public List<ChangelogStateHandle> getRestoredNonMaterialized() {
-            return restoredNonMaterialized;
+            return changelogSnapshot.getJobManagerOwnedSnapshot() != null
+                    ? changelogSnapshot
+                            .getJobManagerOwnedSnapshot()
+                            .getNonMaterializedStateHandles()
+                    : Collections.emptyList();
+        }
+
+        public List<ChangelogStateHandle> getLocalRestoredNonMaterialized() {
+            return changelogSnapshot.getTaskLocalSnapshot() != null
+                    ? 
changelogSnapshot.getTaskLocalSnapshot().getNonMaterializedStateHandles()
+                    : Collections.emptyList();
+        }
+
+        public SequenceNumber lastMaterializedTo() {
+            return materializedTo;
         }
 
         public long getMaterializationID() {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
index fa15341322e..5bd491743b0 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
@@ -91,7 +91,7 @@ public class ChangelogKeyedStateBackendTest {
                         IntSerializer.INSTANCE,
                         getClass().getClassLoader(),
                         1,
-                        KeyGroupRange.EMPTY_KEY_GROUP_RANGE,
+                        KeyGroupRange.of(0, 0),
                         new ExecutionConfig(),
                         TtlTimeProvider.DEFAULT,
                         LatencyTrackingStateConfig.disabled(),

Reply via email to