Myasuka commented on a change in pull request #18382:
URL: https://github.com/apache/flink/pull/18382#discussion_r787749960



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -453,7 +465,10 @@ public void notifyCheckpointAborted(long checkpointId) 
throws Exception {
             // This might change if the log ownership changes (the method 
won't likely be needed).
             stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
         }
-        keyedStateBackend.notifyCheckpointAborted(checkpointId);
+        Long materializationID = 
materializationIdByCheckpointId.remove(checkpointId);
+        if (materializationID != null) {

Review comment:
       I do not think this logic is correct. Take materialization id as 1 for 
example, maybe chk-11 did not complete, but chk-12 could complete to involve 
the materialization-1. If we abort the materialization-1 at chk-11, RocksDB 
state backend would have to reupload at next materialization.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -272,7 +273,16 @@ public StateBackend configure(ReadableConfig config, 
ClassLoader classLoader)
                                         : new ChangelogStateBackendHandleImpl(
                                                 
singletonList(keyedStateHandle),
                                                 emptyList(),
-                                                
keyedStateHandle.getKeyGroupRange()))
+                                                
keyedStateHandle.getKeyGroupRange(),
+                                                
getMaterializationID(keyedStateHandle)))
                 .collect(Collectors.toList());
     }
+
+    private long getMaterializationID(KeyedStateHandle keyedStateHandle) {
+        if (keyedStateHandle instanceof IncrementalKeyedStateHandle) {
+            return ((IncrementalKeyedStateHandle) 
keyedStateHandle).getCheckpointId();

Review comment:
       I think we can introduce a new interface to include `getCheckpointId()` 
instead of still use the `IncrementalKeyedStateHandle`, so that all 3rd-party 
state handles could extend that handle to make it compatiable with changelog 
state-backend. 

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -586,19 +606,20 @@ private ChangelogSnapshotState completeRestore(
 
             LOG.info("Starting materialization from {} : {}", 
lastMaterializedTo, upTo);
 
+            // This ID is not needed for materialization; But since we are 
re-using the
+            // streamFactory that is designed for state backend snapshot, 
which requires unique
+            // checkpoint ID. A faked materialized Id is provided here.
+            // TODO: implement its own streamFactory.

Review comment:
       I think this TODO for a stream factory should be commented on the 
`streamFactory`.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackendTest.java
##########
@@ -0,0 +1,124 @@
+package org.apache.flink.state.changelog;

Review comment:
       I think in general, we would have the license on top of each file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to