rkhachatryan commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r703330721



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -329,37 +332,47 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // materialization may truncate only a part of the previous result and 
the backend would
         // have to split it somehow for the former option, so the latter is 
used.
         lastCheckpointId = checkpointId;
-        lastUploadedFrom = materializedTo;
+        lastUploadedFrom = 
periodicMaterializer.getMaterializedState().lastMaterializedTo();
         lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
 
         LOG.debug(
                 "snapshot for checkpoint {}, change range: {}..{}",
                 checkpointId,
                 lastUploadedFrom,
                 lastUploadedTo);
+
+        MaterializedState materializedStateCopy = 
periodicMaterializer.getMaterializedState();
+
         return toRunnableFuture(
                 stateChangelogWriter
                         .persist(lastUploadedFrom)
-                        .thenApply(this::buildSnapshotResult));
+                        .thenApply(delta -> buildSnapshotResult(delta, 
materializedStateCopy)));
     }
 
-    private SnapshotResult<KeyedStateHandle> 
buildSnapshotResult(ChangelogStateHandle delta) {
-        // Can be called by either task thread during the sync checkpoint 
phase (if persist future
-        // was already completed); or by the writer thread otherwise. So need 
to synchronize.
-        // todo: revisit after FLINK-21357 - use mailbox action?
-        synchronized (materialized) {
-            // collections don't change once started and handles are immutable
-            List<ChangelogStateHandle> prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
-            if (delta != null && delta.getStateSize() > 0) {
-                prevDeltaCopy.add(delta);
-            }
-            if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
-                return SnapshotResult.empty();
-            } else {
-                return SnapshotResult.of(
-                        new ChangelogStateBackendHandleImpl(
-                                materialized, prevDeltaCopy, 
getKeyGroupRange()));
-            }
+    @Override
+    @VisibleForTesting
+    public void triggerMaterialization() {
+        periodicMaterializer.triggerMaterialization();
+    }

Review comment:
       As we discussed offline, this only works because a direct executor is 
used when creating materilizier (by "works" I mean the call waits for the 
materialization).
   Please add a comment at the very least.
   
   However, I still think the test or production code structure is wrong. With 
a proper separation between materializer and backend, we should be able to 
listen for materialization completion and take a snapshot only after that 
(instead of relying on the internal materializer behavior).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to