rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r784301442



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -50,17 +50,28 @@
 
     class ChangelogStateBackendHandleImpl implements 
ChangelogStateBackendHandle {
         private static final long serialVersionUID = 1L;
+        private static final long undefinedIncrementalMaterializeSize = -1L;

Review comment:
       Why don't we store this size in checkpoint metadata? So that we can get 
rid of unknown size and show the correct size after recovery?
   
   Nit: `UNDEFINED_INCREMENTAL_MATERIALIZE_SIZE` ?

##########
File path: 
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
##########
@@ -45,12 +45,13 @@
   >
     <thead>
       <tr>
-        <th nzWidth="60px"></th>
+        <th nzWidth="70px"></th>
         <th><strong>Name</strong></th>
         <th><strong>Acknowledged</strong></th>
         <th><strong>Latest Acknowledgment</strong></th>
         <th><strong>End to End Duration</strong></th>
-        <th><strong>Checkpointed Data Size</strong></th>
+        <th><strong>Incremental Checkpoint Data Size</strong></th>
+        <th><strong>Full Checkpoint Data Size</strong></th>

Review comment:
       WDYT about adding a tooltip here and/or in other added tags? (maybe 
copying the javadoc).
   I think `title` attribute should work.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+        long incrementalMaterializeSize = 0L;
         if (delta != null && delta.getStateSize() > 0) {
             prevDeltaCopy.add(delta);
+            incrementalMaterializeSize += delta.getIncrementalStateSize();

Review comment:
       :+1: 
   We should **not** count `prevDeltaCopy.getIncrementalStateSize()` in 
`incrementalMaterializeSize`.
   
   Could you add a comment that it's inttentional?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -113,6 +124,19 @@ public long getStateSize() {
                     + 
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
         }
 
+        @Override
+        public long getIncrementalStateSize() {
+            long incrementalStateSize =
+                    incrementalMaterializeSize == 
undefinedIncrementalMaterializeSize
+                            ? materialized.stream()
+                                    
.mapToLong(StateObject::getIncrementalStateSize)
+                                    .sum()
+                            : incrementalMaterializeSize;

Review comment:
       Depending on how we define "incremental state size", materialized part 
should be included or not:
   1. if it's everything that was uploaded for **this** checkpoint, then it 
should
   1. if it's the difference from the previous checkpoint, it should **not** be 
included
   Right?
   
   And it's problematic to find out what exactly was uploaded for **this** 
checkpoint because multiple checkpoints will likely include the same 
materialized state, and therefore report the same incremental state multiple 
times.
   Besides that, the 2nd option seems more intuitive to me personally.
   
   WDYT?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean 
deregisterKeySelectionListener(KeySelectionListener<K> listener)
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+        long incrementalMaterializeSize = 0L;
         if (delta != null && delta.getStateSize() > 0) {
             prevDeltaCopy.add(delta);
+            incrementalMaterializeSize += delta.getIncrementalStateSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
         } else {
+            List<KeyedStateHandle> materializedSnapshot =
+                    changelogStateBackendStateCopy.getMaterializedSnapshot();
+            for (KeyedStateHandle keyedStateHandle : materializedSnapshot) {
+                if (!lastCompletedHandles.contains(keyedStateHandle)) {
+                    incrementalMaterializeSize += 
keyedStateHandle.getStateSize();

Review comment:
       (ditto 1st comment): I think we should NOT count materialized state size 
in incremental state size

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -113,6 +124,19 @@ public long getStateSize() {
                     + 
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
         }
 
+        @Override
+        public long getIncrementalStateSize() {
+            long incrementalStateSize =
+                    incrementalMaterializeSize == 
undefinedIncrementalMaterializeSize
+                            ? materialized.stream()
+                                    
.mapToLong(StateObject::getIncrementalStateSize)
+                                    .sum()
+                            : incrementalMaterializeSize;
+            incrementalStateSize +=
+                    
nonMaterialized.stream().mapToLong(StateObject::getIncrementalStateSize).sum();
+            return incrementalStateSize;

Review comment:
       This method can be **potentially** called for every rendering (though I 
think currently it is cached on JM).
   Maybe we should we compute the value once in constructor?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
##########
@@ -204,6 +203,21 @@ public long getStateSize() {
         return size;
     }
 
+    @Override
+    public long getIncrementalStateSize() {
+        long size = StateUtil.getStateSize(metaStateHandle);
+
+        for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+            size += sharedStateHandle.getIncrementalStateSize();

Review comment:
       I guess this only works because 
`PlaceholderStreamStateHandle.getIncrementalStateSize` returns `0`.
   But backend isn't requried to return placeholder; in fact, it currently 
doesn't - without FLINK-25395/ #18297 (In the future, the latter PR quite 
likely will be reverted I think).
   
   WDYT about computing incremental state size externally (in 
`RocksIncrementalSnapshotStrategy`) and storing it in metadata?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
##########
@@ -85,6 +88,7 @@
         this.subtaskIndex = subtaskIndex;
         checkArgument(stateSize >= 0, "Negative state size");
         this.stateSize = stateSize;
+        this.incrementalStateSize = incrementalStateSize;

Review comment:
       `checkState` similar to `stateSize`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
##########
@@ -63,4 +63,14 @@
      * @return Size of the state in bytes.
      */
     long getStateSize();
+
+    /**
+     * Returns the incremental state size in bytes. If the size is unknown, 
this method would return
+     * same result as {@link #getStateSize()}.
+     *
+     * @return Size of incremental state in bytes.
+     */
+    default long getIncrementalStateSize() {
+        return getStateSize();
+    }

Review comment:
       1. Conceptually, incremental state is only relevant to 
`CompositeStateHandle` (which defines `registerSharedStates` method). WDYT 
about moving this method there?
   2. Then we could force non-default implementation
   3. In javadoc, could you clarify what "incremental" means (please see 1st 
comment)
   4. In javadoc, could you clarify the relation to channel state? Or maybe in 
some other place, like `OperatorSubtaskState`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
##########
@@ -38,5 +38,5 @@
      * created.
      */
     @Nonnull
-    Set<StateHandleID> getSharedStateHandleIDs();
+    Map<StateHandleID, StreamStateHandle> getSharedStateHandleIDs();

Review comment:
       I guess `ID` has to be removed from the method name now..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to