rkhachatryan commented on code in PR #19142:
URL: https://github.com/apache/flink/pull/19142#discussion_r854517171


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageView.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
+/**
+ * A storage view for changelog. Could produce {@link 
StateChangelogHandleReader} for read. Please
+ * use {@link StateChangelogStorageLoader} to obtain an instance.
+ */
+@Internal
+public interface StateChangelogStorageView<Handle extends ChangelogStateHandle>
+        extends AutoCloseable {
+
+    StateChangelogHandleReader<Handle> createReader();
+
+    @Override
+    default void close() throws Exception {}
+
+    default AvailabilityProvider getAvailabilityProvider() {
+        return () -> AvailabilityProvider.AVAILABLE;
+    }

Review Comment:
   This method is only relevant for writing, so it can be pushed down to 
`StateChangelogStorage` interface.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java:
##########
@@ -358,26 +364,57 @@ public static boolean 
stateBackendFromApplicationOrConfigOrDefaultUseManagedMemo
         return false;
     }
 
+    /**
+     * Load state backend which may wrap the original state backend for 
recovery.
+     *
+     * @param originalStateBackend StateBackend loaded from application or 
config.
+     * @param classLoader User code classloader.
+     * @param keyedStateHandles The state handles for restore.
+     * @return Wrapped state backend for recovery.
+     * @throws DynamicCodeLoadingException Thrown if keyed state handles of 
wrapped state backend
+     *     are found and the class was not found or could not be instantiated.
+     */
+    public static StateBackend loadStateBackendFromKeyedStateHandles(
+            StateBackend originalStateBackend,
+            ClassLoader classLoader,
+            Collection<KeyedStateHandle> keyedStateHandles)
+            throws DynamicCodeLoadingException {
+        // Wrapping ChangelogStateBackend or ChangelogStateBackendHandle is 
not supported currently.
+        if (!isChangelogStateBackend(originalStateBackend)
+                && keyedStateHandles.stream()
+                        .anyMatch(
+                                stateHandle ->
+                                        stateHandle instanceof 
ChangelogStateBackendHandle)) {
+            return loadChangelogStateBackend(
+                    originalStateBackend, classLoader, 
CHANGELOG_STATE_BACKEND_FOR_RECOVERY);
+        }
+        return originalStateBackend;
+    }
+
+    private static boolean isChangelogStateBackend(StateBackend backend) {
+        return CHANGELOG_STATE_BACKEND.equals(backend.getClass().getName());
+    }
+
     private static StateBackend loadChangelogStateBackend(
-            StateBackend backend, ClassLoader classLoader) throws 
DynamicCodeLoadingException {
+            StateBackend backend, ClassLoader classLoader, String className)

Review Comment:
   NIT: now the method is not specific to changelog (but to 
`DelegatingStateBackend`). So it can be renamed to something like 
`wrapStateBackend`



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated 
keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements 
ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> 
stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) 
keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, 
keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> 
& Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, 
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, 
byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, 
byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create 
consistent with
+        //  the method of getOrCreateKeyedState currently which could be 
removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the 
delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+

Review Comment:
   I'm wondering whether it makes sense to extract this constructor call into 
static function; that would prevent leaking of `this` into the created object. 
   WDYT?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateFactory.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** Maintains the lifecycle of all {@link ChangelogState}s. */
+public class ChangelogStateFactory {
+
+    /**
+     * Unwrapped changelog states used for recovery (not wrapped into e.g. 
TTL, latency tracking).
+     */
+    private final Map<String, ChangelogState> changelogStates;
+
+    private final Map<String, ChangelogKeyGroupedPriorityQueue<?>> 
priorityQueueStatesByName;
+
+    public ChangelogStateFactory() {
+        this.changelogStates = new HashMap<>();
+        this.priorityQueueStatesByName = new HashMap<>();
+    }
+
+    private static final Map<StateDescriptor.Type, StateFactory> 
STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    StateDescriptor.Type.VALUE,
+                                    (StateFactory) 
ChangelogValueState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.LIST,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.REDUCING,
+                                    (StateFactory) 
ChangelogReducingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.AGGREGATING,
+                                    (StateFactory) 
ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    StateDescriptor.Type.MAP,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        return create(
+                stateDescriptor, internalKvState, 
VoidStateChangeLogger.getInstance(), keyContext);
+    }
+
+    public <K, N, V, S extends State> ChangelogState create(
+            StateDescriptor<S, V> stateDescriptor,
+            InternalKvState<K, N, V> internalKvState,
+            KvStateChangeLogger<V, N> kvStateChangeLogger,
+            InternalKeyContext<K> keyContext)
+            throws Exception {
+        ChangelogState changelogState =
+                getStateFactory(stateDescriptor)
+                        .create(internalKvState, kvStateChangeLogger, 
keyContext);
+        changelogStates.put(stateDescriptor.getName(), changelogState);
+        return changelogState;
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            TypeSerializer<T> serializer) {
+        return create(
+                stateName, internalPriorityQueue, 
VoidStateChangeLogger.getInstance(), serializer);
+    }
+
+    public <T> ChangelogKeyGroupedPriorityQueue<T> create(
+            String stateName,
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue,
+            StateChangeLogger<T, Void> logger,
+            TypeSerializer<T> serializer) {
+        ChangelogKeyGroupedPriorityQueue<T> changelogKeyGroupedPriorityQueue =
+                new ChangelogKeyGroupedPriorityQueue<>(internalPriorityQueue, 
logger, serializer);
+        priorityQueueStatesByName.put(stateName, 
changelogKeyGroupedPriorityQueue);
+        return changelogKeyGroupedPriorityQueue;
+    }
+
+    /**
+     * @param name state name
+     * @param type state type (the only supported type currently are: {@link
+     *     StateMetaInfoSnapshot.BackendStateType#KEY_VALUE key value}, {@link
+     *     StateMetaInfoSnapshot.BackendStateType#PRIORITY_QUEUE priority 
queue})
+     * @return an existing state, i.e. the one that was already created. The 
returned state will not
+     *     apply TTL to the passed values, regardless of the TTL settings. 
This prevents double
+     *     applying of TTL (recovered values are TTL values if TTL was 
enabled). The state will,
+     *     however, use TTL serializer if TTL is enabled. WARN: only valid 
during the recovery.
+     * @throws UnsupportedOperationException if state type is not supported
+     */
+    public ChangelogState getExistingState(String name, 
StateMetaInfoSnapshot.BackendStateType type)
+            throws UnsupportedOperationException {
+        ChangelogState state;
+        switch (type) {
+            case KEY_VALUE:
+                state = changelogStates.get(name);
+                break;
+            case PRIORITY_QUEUE:
+                state = priorityQueueStatesByName.get(name);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unknown state type %s (%s)", type, 
name));
+        }
+        return state;
+    }
+
+    public void resetAllWritingMetaFlags() {
+        for (ChangelogState changelogState : changelogStates.values()) {
+            changelogState.resetWritingMetaFlag();
+        }
+
+        for (ChangelogKeyGroupedPriorityQueue<?> priorityQueueState :
+                priorityQueueStatesByName.values()) {
+            priorityQueueState.resetWritingMetaFlag();
+        }
+    }
+
+    public void dispose() {
+        changelogStates.clear();
+        priorityQueueStatesByName.clear();
+    }
+
+    private <S extends State, V> StateFactory getStateFactory(
+            StateDescriptor<S, V> stateDescriptor) {
+        StateFactory stateFactory = 
STATE_FACTORIES.get(stateDescriptor.getType());
+        if (stateFactory == null) {
+            String message =
+                    String.format(
+                            "State %s is not supported by %s",
+                            stateDescriptor.getClass(), 
ChangelogKeyedStateBackend.class);
+            throw new FlinkRuntimeException(message);
+        }
+        return stateFactory;
+    }
+
+    private static class VoidStateChangeLogger<Value, Namespace>
+            implements KvStateChangeLogger<Value, Namespace>, 
StateChangeLogger<Value, Namespace> {

Review Comment:
   This logger is only used by `ChangelogMigrationRestoreTarget`, right?
   If so, I'd move it there and inline the methods that are using it (`create`).



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.ChangelogStateFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Stream;
+
+/** A {@link ChangelogRestoreTarget} supports to migrate to the delegated 
keyed state backend. */
+public class ChangelogMigrationRestoreTarget<K> implements 
ChangelogRestoreTarget<K> {
+
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    private final ChangelogStateFactory changelogStateFactory;
+
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
+    public ChangelogMigrationRestoreTarget(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ChangelogStateFactory changelogStateFactory) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.changelogStateFactory = changelogStateFactory;
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, V> S createKeyedState(
+            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> 
stateDescriptor)
+            throws Exception {
+        S keyedState =
+                keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
+        final InternalKvState<K, N, V> kvState = (InternalKvState<K, N, V>) 
keyedState;
+        ChangelogState changelogState =
+                changelogStateFactory.create(stateDescriptor, kvState, 
keyedStateBackend);
+        return (S) changelogState;
+    }
+
+    @Nonnull
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> 
& Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> createPqState(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>)
+                        changelogStateFactory.getExistingState(
+                                stateName, 
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+        if (queue == null) {
+            KeyGroupedInternalPriorityQueue<T> internalPriorityQueue =
+                    keyedStateBackend.create(stateName, 
byteOrderedElementSerializer);
+            queue =
+                    changelogStateFactory.create(
+                            stateName, internalPriorityQueue, 
byteOrderedElementSerializer);
+        }
+        return queue;
+    }
+
+    @Override
+    public ChangelogState getExistingState(
+            String name, StateMetaInfoSnapshot.BackendStateType type) {
+        return changelogStateFactory.getExistingState(name, type);
+    }
+
+    @Override
+    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
+        // TODO: This inner class make the behaviour of the method of create 
consistent with
+        //  the method of getOrCreateKeyedState currently which could be 
removed
+        //  after we support state migration (in FLINK-23143).
+        //  It is also used to maintain FunctionDelegationHelper in the 
delegated state backend.
+        return new AbstractKeyedStateBackend<K>(keyedStateBackend) {
+
+            @Override
+            public void setCurrentKey(K newKey) {
+                keyedStateBackend.setCurrentKey(newKey);
+            }
+
+            @Override
+            public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+                keyedStateBackend.notifyCheckpointComplete(checkpointId);
+            }
+
+            @Nonnull
+            @Override
+            public SavepointResources<K> savepoint() throws Exception {
+                return keyedStateBackend.savepoint();
+            }
+
+            @Override
+            public int numKeyValueStateEntries() {
+                return keyedStateBackend.numKeyValueStateEntries();
+            }
+
+            @Override
+            public <N> Stream<K> getKeys(String state, N namespace) {
+                return keyedStateBackend.getKeys(state, namespace);
+            }
+
+            @Override
+            public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) 
{
+                return keyedStateBackend.getKeysAndNamespaces(state);
+            }
+
+            @Nonnull
+            @Override
+            public <N, SV, SEV, S extends State, IS extends S> IS 
createInternalState(
+                    @Nonnull TypeSerializer<N> namespaceSerializer,
+                    @Nonnull StateDescriptor<S, SV> stateDesc,
+                    @Nonnull
+                            
StateSnapshotTransformer.StateSnapshotTransformFactory<SEV>
+                                    snapshotTransformFactory)
+                    throws Exception {
+                return keyedStateBackend.createInternalState(
+                        namespaceSerializer, stateDesc, 
snapshotTransformFactory);
+            }
+
+            @Override
+            public <N, S extends State> S getPartitionedState(
+                    N namespace,
+                    TypeSerializer<N> namespaceSerializer,
+                    StateDescriptor<S, ?> stateDescriptor)
+                    throws Exception {
+                S partitionedState =
+                        keyedStateBackend.getPartitionedState(
+                                namespace, namespaceSerializer, 
stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return partitionedState;
+            }
+
+            @Override
+            public <N, S extends State, V> S getOrCreateKeyedState(
+                    TypeSerializer<N> namespaceSerializer, StateDescriptor<S, 
V> stateDescriptor)
+                    throws Exception {
+                S keyedState =
+                        keyedStateBackend.getOrCreateKeyedState(
+                                namespaceSerializer, stateDescriptor);
+                functionDelegationHelper.addOrUpdate(stateDescriptor);
+                return keyedState;
+            }
+
+            @Nonnull
+            @Override
+            @SuppressWarnings("unchecked")
+            public <T extends HeapPriorityQueueElement & PriorityComparable<? 
super T> & Keyed<?>>
+                    KeyGroupedInternalPriorityQueue<T> create(
+                            @Nonnull String stateName,
+                            @Nonnull TypeSerializer<T> 
byteOrderedElementSerializer) {
+                ChangelogKeyGroupedPriorityQueue<T> existingState =
+                        (ChangelogKeyGroupedPriorityQueue<T>)
+                                changelogStateFactory.getExistingState(
+                                        stateName,
+                                        
StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
+                return existingState == null
+                        ? keyedStateBackend.create(stateName, 
byteOrderedElementSerializer)
+                        : existingState;
+            }
+
+            @Nonnull
+            @Override
+            public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+                    long checkpointId,
+                    long timestamp,
+                    @Nonnull CheckpointStreamFactory streamFactory,
+                    @Nonnull CheckpointOptions checkpointOptions)
+                    throws Exception {
+                return keyedStateBackend.snapshot(
+                        checkpointId, timestamp, streamFactory, 
checkpointOptions);
+            }
+
+            @Override
+            public void dispose() {
+                super.dispose();
+                changelogStateFactory.dispose();

Review Comment:
   Why it is not `keyedStateBackend.dispose()`, by analogy with other methods?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to