rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r785523680



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
##########
@@ -204,6 +203,21 @@ public long getStateSize() {
         return size;
     }
 
+    @Override
+    public long getIncrementalStateSize() {
+        long size = StateUtil.getStateSize(metaStateHandle);
+
+        for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+            size += sharedStateHandle.getIncrementalStateSize();

Review comment:
       I mean `IncrementalRemoteKeyedStateHandle` could have a final variable 
`incrementalSize` and return it from `getIncrementalStateSize`.
   To create a handle:
   - On TM, it would be computed by `RocksIncrementalSnapshotStrategy`.
   - On JM, it would be read from checkpoint metadata by 
`MetadataV2V3SerializerBase#serializeKeyedStateHandle` (and written).
   
   It shouldn't change after the creation of 
`IncrementalRemoteKeyedStateHandle`, right?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+        long incrementalMaterializeSize = 0L;
         if (delta != null && delta.getStateSize() > 0) {
             prevDeltaCopy.add(delta);
+            incrementalMaterializeSize += delta.getIncrementalStateSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
         } else {
+            List<KeyedStateHandle> materializedSnapshot =
+                    changelogStateBackendStateCopy.getMaterializedSnapshot();
+            for (KeyedStateHandle keyedStateHandle : materializedSnapshot) {
+                if (!lastCompletedHandles.contains(keyedStateHandle)) {
+                    incrementalMaterializeSize += 
keyedStateHandle.getStateSize();

Review comment:
       Let me expand my concerns directly in this comment:
   1. If sending handles fails (or aborted) then UI will show inconsistent 
sizes (missing aborted info).
   2. It requires handles to properly implement hashCode/equals. Otherwise, 
wrapped backend or writer can construct a new handle pointing to the same 
files; I think this is currently the case
   3. If two checkpoints finish out-of-order and call `buildSnapshotResult` 
then the size will be incorrect 
   4. Incremental size is computed in two places: inside 
`getIncrementalStateSize`/backend and in `buildSnapshotResult` which seems 
excessive and fragile
   5. After materialization, incremental size will be much bigger and async 
duration smaller. Wouldn't it be confusing for the users?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
##########
@@ -85,6 +88,7 @@
         this.subtaskIndex = subtaskIndex;
         checkArgument(stateSize >= 0, "Negative state size");
         this.stateSize = stateSize;
+        this.incrementalStateSize = incrementalStateSize;

Review comment:
       Thanks :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to