rkhachatryan commented on a change in pull request #18324:
URL: https://github.com/apache/flink/pull/18324#discussion_r790604449
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -302,14 +302,16 @@ public void reset(SequenceNumber from, SequenceNumber to)
{
}
private static ChangelogStateHandleStreamImpl buildHandle(
- KeyGroupRange keyGroupRange, NavigableMap<SequenceNumber,
UploadResult> results) {
+ KeyGroupRange keyGroupRange,
+ NavigableMap<SequenceNumber, UploadResult> results,
+ long incrementalSize) {
List<Tuple2<StreamStateHandle, Long>> tuples = new ArrayList<>();
long size = 0;
for (UploadResult uploadResult : results.values()) {
tuples.add(Tuple2.of(uploadResult.getStreamStateHandle(),
uploadResult.getOffset()));
size += uploadResult.getSize();
}
- return new ChangelogStateHandleStreamImpl(tuples, keyGroupRange, size);
+ return new ChangelogStateHandleStreamImpl(tuples, keyGroupRange, size,
incrementalSize);
Review comment:
I'm afraid `incrementalSize` computed here doesn't take pre-emptively
uploaded state into account; it only computes the size of the last portion
before checkpoint.
And in most cases, most of the state should be pre-emptively uploaded.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
##########
@@ -205,7 +205,7 @@ public SequenceNumber lastAppendedSequenceNumber() {
SequenceNumberRange range = SequenceNumberRange.generic(from,
activeSequenceNumber);
if (range.size() == readyToReturn.size()) {
checkState(toUpload.isEmpty());
- return completedFuture(buildHandle(keyGroupRange,
readyToReturn));
+ return completedFuture(buildHandle(keyGroupRange,
readyToReturn, 0L));
Review comment:
Similar problem: in case if **all** state was pre-uploaded, the reported
size will be zero.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
##########
@@ -63,6 +93,7 @@
* @param totalSubtaskCount Total number of subtasks for the checkpoint.
* @param taskStats Task stats for each involved operator.
* @param numAcknowledgedSubtasks Number of acknowledged subtasks.
+ * @param checkpointedSize Total checkpointed size over all subtasks.
* @param stateSize Total checkpoint state size over all subtasks.
Review comment:
I think it makes sense to update the javadoc for `stateSize` (ditto
other stats classes).
##########
File path:
flink-runtime-web/web-dashboard/src/app/pages/job/checkpoints/detail/job-checkpoints-detail.component.html
##########
@@ -81,6 +82,9 @@
{{ checkPointDetail['tasks'][vertex.id]['end_to_end_duration'] |
humanizeDuration }}
</td>
<td
*ngIf="checkPointDetail['tasks'][vertex.id]['end_to_end_duration'] < 0">n/a</td>
+ <td>
+ {{ checkPointDetail['tasks'][vertex.id]['checkpointed_size'] |
humanizeBytes }}
+ </td>
<td>{{ checkPointDetail['tasks'][vertex.id]['state_size'] |
humanizeBytes }}</td>
Review comment:
I think instead of adding a new column, we could show both full and
incremental size separated by `/` or in parenthesis. That would give a bit more
space for future info, and **probably** be more readable.
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
##########
@@ -50,4 +50,14 @@
* @param stateRegistry The registry where shared states are registered.
*/
void registerSharedStates(SharedStateRegistry stateRegistry, long
checkpointID);
+
+ /**
+ * Returns the persisted data size during checkpoint execution in bytes.
If incremental
+ * checkpoint is enabled, this value represents the incremental persisted
data size, and
+ * ussually smaller than {@link #getStateSize()}. If the size is unknown,
this method would
Review comment:
typo: "ussually" -> "usually"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]