rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r784301442
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -50,17 +50,28 @@
class ChangelogStateBackendHandleImpl implements
ChangelogStateBackendHandle {
private static final long serialVersionUID = 1L;
+ private static final long undefinedIncrementalMaterializeSize = -1L;
Review comment:
Why don't we store this size in checkpoint metadata? So that we can get
rid of unknown size and show the correct size after recovery?
Nit: `UNDEFINED_INCREMENTAL_MATERIALIZE_SIZE` ?
##########
File path:
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
##########
@@ -45,12 +45,13 @@
>
<thead>
<tr>
- <th nzWidth="60px"></th>
+ <th nzWidth="70px"></th>
<th><strong>Name</strong></th>
<th><strong>Acknowledged</strong></th>
<th><strong>Latest Acknowledgment</strong></th>
<th><strong>End to End Duration</strong></th>
- <th><strong>Checkpointed Data Size</strong></th>
+ <th><strong>Incremental Checkpoint Data Size</strong></th>
+ <th><strong>Full Checkpoint Data Size</strong></th>
Review comment:
WDYT about adding a tooltip here and/or in other added tags? (maybe
copying the javadoc).
I think `title` attribute should work.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
// collections don't change once started and handles are immutable
List<ChangelogStateHandle> prevDeltaCopy =
new
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+ long incrementalMaterializeSize = 0L;
if (delta != null && delta.getStateSize() > 0) {
prevDeltaCopy.add(delta);
+ incrementalMaterializeSize += delta.getIncrementalStateSize();
Review comment:
:+1:
We should **not** count `prevDeltaCopy.getIncrementalStateSize()` in
`incrementalMaterializeSize`.
Could you add a comment that it's inttentional?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -113,6 +124,19 @@ public long getStateSize() {
+
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
}
+ @Override
+ public long getIncrementalStateSize() {
+ long incrementalStateSize =
+ incrementalMaterializeSize ==
undefinedIncrementalMaterializeSize
+ ? materialized.stream()
+
.mapToLong(StateObject::getIncrementalStateSize)
+ .sum()
+ : incrementalMaterializeSize;
Review comment:
Depending on how we define "incremental state size", materialized part
should be included or not:
1. if it's everything that was uploaded for **this** checkpoint, then it
should
1. if it's the difference from the previous checkpoint, it should **not** be
included
Right?
And it's problematic to find out what exactly was uploaded for **this**
checkpoint because multiple checkpoints will likely include the same
materialized state, and therefore report the same incremental state multiple
times.
Besides that, the 2nd option seems more intuitive to me personally.
WDYT?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -368,19 +372,34 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
// collections don't change once started and handles are immutable
List<ChangelogStateHandle> prevDeltaCopy =
new
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
+ long incrementalMaterializeSize = 0L;
if (delta != null && delta.getStateSize() > 0) {
prevDeltaCopy.add(delta);
+ incrementalMaterializeSize += delta.getIncrementalStateSize();
}
if (prevDeltaCopy.isEmpty()
&&
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
return SnapshotResult.empty();
} else {
+ List<KeyedStateHandle> materializedSnapshot =
+ changelogStateBackendStateCopy.getMaterializedSnapshot();
+ for (KeyedStateHandle keyedStateHandle : materializedSnapshot) {
+ if (!lastCompletedHandles.contains(keyedStateHandle)) {
+ incrementalMaterializeSize +=
keyedStateHandle.getStateSize();
Review comment:
(ditto 1st comment): I think we should NOT count materialized state size
in incremental state size
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -113,6 +124,19 @@ public long getStateSize() {
+
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
}
+ @Override
+ public long getIncrementalStateSize() {
+ long incrementalStateSize =
+ incrementalMaterializeSize ==
undefinedIncrementalMaterializeSize
+ ? materialized.stream()
+
.mapToLong(StateObject::getIncrementalStateSize)
+ .sum()
+ : incrementalMaterializeSize;
+ incrementalStateSize +=
+
nonMaterialized.stream().mapToLong(StateObject::getIncrementalStateSize).sum();
+ return incrementalStateSize;
Review comment:
This method can be **potentially** called for every rendering (though I
think currently it is cached on JM).
Maybe we should we compute the value once in constructor?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
##########
@@ -204,6 +203,21 @@ public long getStateSize() {
return size;
}
+ @Override
+ public long getIncrementalStateSize() {
+ long size = StateUtil.getStateSize(metaStateHandle);
+
+ for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+ size += sharedStateHandle.getIncrementalStateSize();
Review comment:
I guess this only works because
`PlaceholderStreamStateHandle.getIncrementalStateSize` returns `0`.
But backend isn't requried to return placeholder; in fact, it currently
doesn't - without FLINK-25395/ #18297 (In the future, the latter PR quite
likely will be reverted I think).
WDYT about computing incremental state size externally (in
`RocksIncrementalSnapshotStrategy`) and storing it in metadata?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
##########
@@ -85,6 +88,7 @@
this.subtaskIndex = subtaskIndex;
checkArgument(stateSize >= 0, "Negative state size");
this.stateSize = stateSize;
+ this.incrementalStateSize = incrementalStateSize;
Review comment:
`checkState` similar to `stateSize`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java
##########
@@ -63,4 +63,14 @@
* @return Size of the state in bytes.
*/
long getStateSize();
+
+ /**
+ * Returns the incremental state size in bytes. If the size is unknown,
this method would return
+ * same result as {@link #getStateSize()}.
+ *
+ * @return Size of incremental state in bytes.
+ */
+ default long getIncrementalStateSize() {
+ return getStateSize();
+ }
Review comment:
1. Conceptually, incremental state is only relevant to
`CompositeStateHandle` (which defines `registerSharedStates` method). WDYT
about moving this method there?
2. Then we could force non-default implementation
3. In javadoc, could you clarify what "incremental" means (please see 1st
comment)
4. In javadoc, could you clarify the relation to channel state? Or maybe in
some other place, like `OperatorSubtaskState`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
##########
@@ -38,5 +38,5 @@
* created.
*/
@Nonnull
- Set<StateHandleID> getSharedStateHandleIDs();
+ Map<StateHandleID, StreamStateHandle> getSharedStateHandleIDs();
Review comment:
I guess `ID` has to be removed from the method name now..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]