rkhachatryan commented on code in PR #19679:
URL: https://github.com/apache/flink/pull/19679#discussion_r925990350
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapAggregatingState.java:
##########
@@ -136,18 +144,29 @@ public ACC apply(ACC accumulator, IN value) {
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
static <T, K, N, SV, S extends State, IS extends S> IS create(
StateDescriptor<S, SV> stateDesc,
StateTable<K, N, SV> stateTable,
- TypeSerializer<K> keySerializer) {
- return (IS)
- new HeapAggregatingState<>(
- stateTable,
- keySerializer,
- stateTable.getStateSerializer(),
- stateTable.getNamespaceSerializer(),
- stateDesc.getDefaultValue(),
- ((AggregatingStateDescriptor<T, SV, ?>)
stateDesc).getAggregateFunction());
+ TypeSerializer<K> keySerializer,
+ @Nullable IS existingState) {
+ return existingState == null
+ ? (IS)
+ new HeapAggregatingState<>(
Review Comment:
Do we actually need to combine `create` and `update` here?
It looks like a separate method would make both caller and caller cleaner.
If not, should it be named createOrUpdate?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +169,34 @@ KeyGroupedInternalPriorityQueue<T> create(
return priorityQueuesManager.createOrUpdate(stateName,
byteOrderedElementSerializer);
}
+ @Override
+ public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDescriptor,
+ @Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
+ throws Exception {
+ IS state =
+ createInternalState(namespaceSerializer, stateDescriptor,
snapshotTransformFactory);
+ // rebuild metaInfo which created from previous snapshot to make it
can continue to migrate
Review Comment:
How about
```
// Metadata was just created or updated, but its StateSerializerProvider
will not allow
// further updates. So we replace it with a new one that contains a fresh
// LazilyRegisteredStateSerializerProvider.
```
?
##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -231,8 +239,15 @@ public <N, SV, SEV, S extends State, IS extends S> IS
createInternalState(
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
- return keyedStateBackend.createInternalState(
- namespaceSerializer, stateDesc,
snapshotTransformFactory);
+ ChangelogState changelogState =
+ changelogStateFactory.getExistingState(
+ stateDesc.getName(),
+
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+ return changelogState == null
+ ? keyedStateBackend.createInternalState(
+ namespaceSerializer, stateDesc,
snapshotTransformFactory)
+ : keyedStateBackend.upgradeKeyedState(
+ namespaceSerializer, stateDesc,
snapshotTransformFactory);
Review Comment:
Could you explain why we need to call `upgradeKeyedState` here?
Isn't this a call by a user program allready? (so no further metadata
updates are expected)
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +169,34 @@ KeyGroupedInternalPriorityQueue<T> create(
return priorityQueuesManager.createOrUpdate(stateName,
byteOrderedElementSerializer);
}
+ @Override
+ public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDescriptor,
+ @Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
+ throws Exception {
+ IS state =
+ createInternalState(namespaceSerializer, stateDescriptor,
snapshotTransformFactory);
+ // rebuild metaInfo which created from previous snapshot to make it
can continue to migrate
+ registeredKVStates.computeIfPresent(
+ stateDescriptor.getName(),
+ (stateName, stateTable) -> {
+ stateTable.setMetaInfo(
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ stateTable.getMetaInfo().snapshot()));
Review Comment:
I'd add an explicit method to update serializer provider in metadata to make
the contract of using Lazy serializer provider explicit.
Something like this:
```
stateTable.getMetaInfo().withSerializerUpgradesAllowed(); // create a new
metadata object with Lazy serializer provider using existing one as
```
WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +169,34 @@ KeyGroupedInternalPriorityQueue<T> create(
return priorityQueuesManager.createOrUpdate(stateName,
byteOrderedElementSerializer);
}
+ @Override
+ public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
Review Comment:
Looking at the final implementation of this method I see it only differs
from `KeyedStateFactory.createInternalState`
by additionally setting Serializer Provider to Lazy.
Setting to lazy seems hacky because this requires a lot of assumptions about
`createInternalState` and metadata.
WDYT about adding a new version of `createInternalState` with a boolean
argument `allowFutureMetadataUpdates` instead?
That flag can then be checked in `tryRegisterStateTable` or
`createInternalStateInternal` when creating/updating metadata.
By default, the method could
- `allowFutureMetadataUpdates=false`: call existing `createInternalState`
method
- `allowFutureMetadataUpdates=true`: throw `UnsupportedOperationException`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -163,6 +169,34 @@ KeyGroupedInternalPriorityQueue<T> create(
return priorityQueuesManager.createOrUpdate(stateName,
byteOrderedElementSerializer);
}
+ @Override
+ public <N, SV, SEV, S extends State, IS extends S> IS upgradeKeyedState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDescriptor,
+ @Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
+ throws Exception {
+ IS state =
+ createInternalState(namespaceSerializer, stateDescriptor,
snapshotTransformFactory);
+ // rebuild metaInfo which created from previous snapshot to make it
can continue to migrate
+ registeredKVStates.computeIfPresent(
Review Comment:
Here, we assume that the state was created or updated, but we still need to
update the metadata.
I'd make this assumption explicit to detect programming errors, i.e.
```
StateTable stateTable =
checkNotNull(registeredKVStates.get(stateDescriptor.getName()));
stateTable.setMetaInfo(...);
```
That also simplifies the code.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]