rkhachatryan commented on a change in pull request #18391:
URL: https://github.com/apache/flink/pull/18391#discussion_r803516082
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -322,19 +335,20 @@ void serializeKeyedStateHandle(KeyedStateHandle
stateHandle, DataOutputStream do
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(),
dos);
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(),
dos);
+
+
dos.writeUTF(incrementalKeyedStateHandle.getStateHandleId().toString());
Review comment:
Replace `toString` with `getKeyString` ?
ditto all other places - maybe extract a method `writeStateHandleId`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -298,19 +306,24 @@ void serializeKeyedStateHandle(KeyedStateHandle
stateHandle, DataOutputStream do
if (stateHandle instanceof KeyGroupsSavepointStateHandle) {
dos.writeByte(SAVEPOINT_KEY_GROUPS_HANDLE);
} else {
- dos.writeByte(KEY_GROUPS_HANDLE);
+ dos.writeByte(KEY_GROUPS_HANDLE_V2);
}
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
}
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+
+ // savepoint state handle would not need to persist state handle
id out.
+ if (!(stateHandle instanceof KeyGroupsSavepointStateHandle)) {
+ dos.writeUTF(stateHandle.getStateHandleId().toString());
+ }
Review comment:
Could you please explain why `KeyGroupsSavepointStateHandle` is special?
Savepoints are becoming incremental with the latest efforts; I'm wondering
whether this may pose any difficulties with savepoints.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -415,7 +440,7 @@ KeyedStateHandle deserializeKeyedStateHandle(
int baseSize = dis.readInt();
List<KeyedStateHandle> base = new ArrayList<>(baseSize);
for (int i = 0; i < baseSize; i++) {
- base.add(deserializeKeyedStateHandle(dis, context));
+
base.add(Preconditions.checkNotNull(deserializeKeyedStateHandle(dis, context)));
Review comment:
Why was this check aded?
`deserializeKeyedStateHandle` clearly can return null and is marked as
`Nullable`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java
##########
@@ -44,23 +46,42 @@
private final long size;
private final long incrementalSize;
+ private final StateHandleID stateHandleID;
public ChangelogStateHandleStreamImpl(
List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets,
KeyGroupRange keyGroupRange,
- long size) {
- this(handlesAndOffsets, keyGroupRange, size, size);
+ long size,
+ long incrementalSize) {
+ this(
+ handlesAndOffsets,
+ keyGroupRange,
+ size,
+ incrementalSize,
+ new StateHandleID(UUID.randomUUID().toString()));
Review comment:
Creating a random ID here doesn't seem very straightforward to me, when
looking at the call sites.
Especially, `MetadataV2V3SerializerBase` - why doesn't it store this ID? I
guess it's fine because currently it's only used inside this handle itself; but
if ID will be used for other purposes then it may become an issue.
So I'd propose to have an explicit contructor argument (at least for
production code); and store this ID in the metadata.
WDYT?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
##########
@@ -320,7 +383,10 @@ public boolean equals(Object o) {
if (!getPrivateState().equals(that.getPrivateState())) {
return false;
}
- return getMetaStateHandle().equals(that.getMetaStateHandle());
+ if (!getMetaStateHandle().equals(that.getMetaStateHandle())) {
+ return false;
+ }
+ return getStateHandleId().equals(that.getStateHandleId());
Review comment:
nit: move this to the beginning (as it's likely much faster than other
checks)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
##########
@@ -495,14 +526,19 @@ private IncrementalRemoteKeyedStateHandle
deserializeIncrementalStateHandle(
uuid =
UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8));
}
- return new IncrementalRemoteKeyedStateHandle(
+ StateHandleID stateHandleId =
+ isV2Format
+ ? new StateHandleID(dis.readUTF())
+ : new StateHandleID(UUID.randomUUID().toString());
Review comment:
Maybe move this to a static method in `StateHandleID`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]