rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r716640216



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -531,18 +539,76 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
         return is;
     }
 
-    private void completeRestore(Collection<ChangelogStateBackendHandle> 
stateHandles) {
-        if (!stateHandles.isEmpty()) {
-            synchronized (materialized) { // ensure visibility
-                for (ChangelogStateBackendHandle h : stateHandles) {
-                    if (h != null) {
-                        materialized.addAll(h.getMaterializedStateHandles());
-                        
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
-                    }
-                }
+    private ChangelogSnapshotState completeRestore(
+            Collection<ChangelogStateBackendHandle> stateHandles) {
+
+        List<KeyedStateHandle> materialized = new ArrayList<>();
+        List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
+
+        for (ChangelogStateBackendHandle h : stateHandles) {
+            if (h != null) {
+                materialized.addAll(h.getMaterializedStateHandles());
+                
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
             }
         }
+
         changelogStates.clear();
+        return new ChangelogSnapshotState(
+                materialized,
+                restoredNonMaterialized,
+                stateChangelogWriter.initialSequenceNumber());
+    }
+
+    /**
+     * Initialize state materialization so that materialized data can be 
persisted durably and
+     * included into the checkpoint.
+     *
+     * <p>This method is not thread safe. It should be called either under a 
lock or through task
+     * mailbox executor.
+     *
+     * @return a tuple of - future snapshot result from the underlying state 
backend - a {@link
+     *     SequenceNumber} identifying the latest change in the changelog
+     */
+    public Optional<MaterializationRunnable> initMaterialization() throws 
Exception {
+        SequenceNumber upTo = 
stateChangelogWriter.lastAppendedSequenceNumber();
+
+        if (upTo.compareTo(changelogSnapshotState.lastMaterializedTo()) > 0) {
+            return Optional.of(
+                    new MaterializationRunnable(
+                            keyedStateBackend.snapshot(
+                                    // This ID is not needed for 
materialization;
+                                    // But since we are re-using the 
streamFactory
+                                    // that is designed for state backend 
snapshot,
+                                    // which requires unique checkpoint ID.
+                                    // A faked materialized Id is provided 
here.
+                                    // TODO: implement its own streamFactory.
+                                    materializedId++,
+                                    System.currentTimeMillis(),
+                                    streamFactory,
+                                    CHECKPOINT_OPTIONS),
+                            // TODO: add metadata to log FLINK-23170
+                            upTo));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * This method is not thread safe. It should be called either under a lock 
or through task
+     * mailbox executor.
+     */
+    public void updateChangelogSnapshotState(
+            SnapshotResult<KeyedStateHandle> materializedSnapshot, 
SequenceNumber upTo) {
+        changelogSnapshotState =
+                new ChangelogSnapshotState(
+                        getMaterializedResult(materializedSnapshot), new 
ArrayList<>(), upTo);

Review comment:
       I mean a method like this in `ChangelogSnapshotState` class:
   ```
           public static ChangelogSnapshotState materialized(
                   SnapshotResult<KeyedStateHandle> snapshot, SequenceNumber 
upTo) {
               return new ChangelogSnapshotState(
                       getMaterializedResult(snapshot), 
Collections.emptyList(), upTo);
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to