rkhachatryan commented on code in PR #19679:
URL: https://github.com/apache/flink/pull/19679#discussion_r938686145
##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -782,21 +847,57 @@ public KeyGroupRange getKeyGroupRange() {
}
@Override
+ @SuppressWarnings("unchecked")
public <N, S extends State, V> S createKeyedState(
TypeSerializer<N> namespaceSerializer, StateDescriptor<S,
V> stateDescriptor)
throws Exception {
- return ChangelogKeyedStateBackend.this.getOrCreateKeyedState(
- namespaceSerializer, stateDescriptor);
+ InternalKvState<K, N, V> kvState =
+ keyedStateBackend.createOrUpdateInternalState(
+ namespaceSerializer, stateDescriptor,
noTransform(), true);
+ ChangelogState changelogState =
+ changelogStateFactory.getExistingState(
+ stateDescriptor.getName(),
+
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
+ if (changelogState == null) {
+ changelogState =
+ changelogStateFactory.create(
+ stateDescriptor,
+ kvState,
+ getKvStateChangeLogger(kvState,
stateDescriptor, noTransform()),
+ keyedStateBackend /* pass the nested
backend as key context so that it get key updates on recovery*/);
+ } else {
+ updateChangelogState(changelogState, kvState,
stateDescriptor, noTransform());
+ }
Review Comment:
I'm afraid this branch is not covered by tests.
`ChangelogStateBackendMigrationTest` only has one metadata change and one
recovery, so it only triggers `changelogState == null` branch.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java:
##########
@@ -259,25 +305,55 @@ public <N> Stream<Tuple2<K, N>>
getKeysAndNamespaces(String state) {
@Override
@Nonnull
- public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
+ public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
- StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getType());
- if (stateFactory == null) {
- String message =
- String.format(
- "State %s is not supported by %s",
- stateDesc.getClass(), this.getClass());
- throw new FlinkRuntimeException(message);
- }
+ return createOrUpdateInternalState(
+ namespaceSerializer, stateDesc, snapshotTransformFactory,
false);
+ }
+
+ @Override
+ @Nonnull
+ public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull StateDescriptor<S, SV> stateDesc,
+ @Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory,
+ boolean allowFutureMetadataUpdates)
+ throws Exception {
StateTable<K, N, SV> stateTable =
tryRegisterStateTable(
namespaceSerializer,
stateDesc,
- getStateSnapshotTransformFactory(stateDesc,
snapshotTransformFactory));
- return stateFactory.createState(stateDesc, stateTable,
getKeySerializer());
+ getStateSnapshotTransformFactory(stateDesc,
snapshotTransformFactory),
+ allowFutureMetadataUpdates);
+
+ @SuppressWarnings("unchecked")
+ IS createdState = (IS) createdKVStates.get(stateDesc.getName());
+ if (createdState == null) {
+ StateCreateFactory stateCreateFactory =
STATE_CREATE_FACTORIES.get(stateDesc.getType());
+ if (stateCreateFactory == null) {
+ throw new
FlinkRuntimeException(stateNotSupportedMessage(stateDesc));
+ }
+ createdState =
+ stateCreateFactory.createState(stateDesc, stateTable,
getKeySerializer());
+ } else {
+ StateUpdateFactory stateUpdateFactory =
STATE_UPDATE_FACTORIES.get(stateDesc.getType());
+ if (stateUpdateFactory == null) {
+ throw new
FlinkRuntimeException(stateNotSupportedMessage(stateDesc));
+ }
+ createdState = stateUpdateFactory.updateState(stateDesc,
stateTable, createdState);
+ }
Review Comment:
This branch is also not covered by tests.
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java:
##########
@@ -820,23 +862,69 @@ private static <UK> boolean
checkMapStateKeySchemaCompatibility(
@Override
@Nonnull
- public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
+ public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
@Nonnull TypeSerializer<N> namespaceSerializer,
@Nonnull StateDescriptor<S, SV> stateDesc,
@Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory)
throws Exception {
- StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getType());
- if (stateFactory == null) {
- String message =
- String.format(
- "State %s is not supported by %s",
- stateDesc.getClass(), this.getClass());
- throw new FlinkRuntimeException(message);
- }
+ return createOrUpdateInternalState(
+ namespaceSerializer, stateDesc, snapshotTransformFactory,
false);
+ }
+
+ @Nonnull
+ @Override
+ public <N, SV, SEV, S extends State, IS extends S> IS
createOrUpdateInternalState(
+ @Nonnull TypeSerializer<N> namespaceSerializer,
+ @Nonnull StateDescriptor<S, SV> stateDesc,
+ @Nonnull StateSnapshotTransformFactory<SEV>
snapshotTransformFactory,
+ boolean allowFutureMetadataUpdates)
+ throws Exception {
Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N,
SV>> registerResult =
tryRegisterKvStateInformation(
- stateDesc, namespaceSerializer,
snapshotTransformFactory);
- return stateFactory.createState(stateDesc, registerResult,
RocksDBKeyedStateBackend.this);
+ stateDesc,
+ namespaceSerializer,
+ snapshotTransformFactory,
+ allowFutureMetadataUpdates);
+ if (!allowFutureMetadataUpdates) {
+ // Config compact filter only when no future metadata updates
+ ttlCompactFiltersManager.configCompactFilter(
+ stateDesc, registerResult.f1.getStateSerializer());
+ }
+
+ return createState(stateDesc, registerResult);
+ }
+
+ private <N, SV, S extends State, IS extends S> IS createState(
+ StateDescriptor<S, SV> stateDesc,
+ Tuple2<ColumnFamilyHandle,
RegisteredKeyValueStateBackendMetaInfo<N, SV>>
+ registerResult)
+ throws Exception {
+ @SuppressWarnings("unchecked")
+ IS createdState = (IS) createdKVStates.get(stateDesc.getName());
+ if (createdState == null) {
+ StateCreateFactory stateCreateFactory =
STATE_CREATE_FACTORIES.get(stateDesc.getType());
+ if (stateCreateFactory == null) {
+ throw new
FlinkRuntimeException(stateNotSupportedMessage(stateDesc));
+ }
+ createdState =
+ stateCreateFactory.createState(
+ stateDesc, registerResult,
RocksDBKeyedStateBackend.this);
+ } else {
+ StateUpdateFactory stateUpdateFactory =
STATE_UPDATE_FACTORIES.get(stateDesc.getType());
+ if (stateUpdateFactory == null) {
+ throw new
FlinkRuntimeException(stateNotSupportedMessage(stateDesc));
+ }
+ createdState = stateUpdateFactory.updateState(stateDesc,
registerResult, createdState);
+ }
Review Comment:
This branch is also not covered by tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]