Myasuka commented on a change in pull request #14943:
URL: https://github.com/apache/flink/pull/14943#discussion_r592887784



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.TestableKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link KeyedStateBackend} that keeps state on the underlying delegated 
keyed state backend as
+ * well as on the state change log.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+@Internal
+class ChangelogKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>,
+                CheckpointListener,
+                TestableKeyedStateBackend {
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> 
STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) 
ChangelogValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    /** delegated keyedStateBackend. */
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    /**
+     * This is the cache maintained by the DelegateKeyedStateBackend itself. 
It is not the same as
+     * the underlying delegated keyedStateBackend. InternalKvState is a 
delegated state.
+     */
+    private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
+
+    private final ExecutionConfig executionConfig;
+
+    private final TtlTimeProvider ttlTimeProvider;
+
+    /** last accessed partitioned state. */
+    @SuppressWarnings("rawtypes")
+    private InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    private String lastName;
+
+    public ChangelogKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N> Stream<K> getKeys(String state, N namespace) {
+        return keyedStateBackend.getKeys(state, namespace);
+    }
+
+    @Override
+    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+        return keyedStateBackend.getKeysAndNamespaces(state);
+    }
+
+    @Override
+    public void dispose() {
+        keyedStateBackend.dispose();
+        lastName = null;
+        lastState = null;
+        keyValueStatesByName.clear();
+    }
+
+    @Override
+    public void registerKeySelectionListener(KeySelectionListener<K> listener) 
{
+        keyedStateBackend.registerKeySelectionListener(listener);
+    }
+
+    @Override
+    public boolean deregisterKeySelectionListener(KeySelectionListener<K> 
listener) {
+        return keyedStateBackend.deregisterKeySelectionListener(listener);
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+
+        keyedStateBackend.applyToAllKeys(
+                namespace,
+                namespaceSerializer,
+                stateDescriptor,
+                function,
+                this::getPartitionedState);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State> S getPartitionedState(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, ?> stateDescriptor)
+            throws Exception {
+
+        checkNotNull(namespace, "Namespace");
+
+        if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+            lastState.setCurrentNamespace(namespace);
+            return (S) lastState;
+        }
+
+        final InternalKvState<K, ?, ?> previous =
+                keyValueStatesByName.get(stateDescriptor.getName());
+        if (previous != null) {
+            lastState = previous;
+            lastState.setCurrentNamespace(namespace);
+            lastName = stateDescriptor.getName();
+            return (S) previous;
+        }
+
+        final S state = getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
+        final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) 
state;
+
+        lastName = stateDescriptor.getName();
+        lastState = kvState;
+        kvState.setCurrentNamespace(namespace);
+
+        return state;
+    }
+
+    @Nonnull
+    @Override
+    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions)
+            throws Exception {
+        return keyedStateBackend.snapshot(
+                checkpointId, timestamp, streamFactory, checkpointOptions);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> 
& Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        return keyedStateBackend.create(stateName, 
byteOrderedElementSerializer);
+    }
+
+    @VisibleForTesting
+    @Override
+    public int numKeyValueStateEntries() {
+        return keyedStateBackend.numKeyValueStateEntries();
+    }
+
+    @Override
+    public boolean isStateImmutableInStateBackend(CheckpointType 
checkpointOptions) {
+        return 
keyedStateBackend.isStateImmutableInStateBackend(checkpointOptions);
+    }
+
+    @Nonnull
+    @Override
+    public SavepointResources<K> savepoint() throws Exception {
+        return keyedStateBackend.savepoint();
+    }
+
+    // -------------------- CheckpointListener --------------------------------
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        keyedStateBackend.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        keyedStateBackend.notifyCheckpointAborted(checkpointId);
+    }
+
+    // -------- Methods not simply delegating to wrapped state backend 
---------
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State, T> S getOrCreateKeyedState(

Review comment:
       Since we always use `#createInternalState` to create wrapper of 
changelog state, I think we would not need to rewrite `getOrCreateKeyedState()` 
method but a simple 
   ~~~ java
   return keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
   ~~~
   If so, we no longer need to add another private fields such as 
`keyValueStatesByName`, `executionConfig` and `ttlTimeProvider`.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * This state backend holds the working state in the underlying 
delegatedStateBackend, and forwards
+ * state changes to State Changelog.
+ */
+public class ChangelogStateBackend implements DelegatingStateBackend, 
ConfigurableStateBackend {
+
+    private static final long serialVersionUID = 1000L;
+
+    private final StateBackend delegatedStateBackend;
+
+    public ChangelogStateBackend(StateBackend stateBackend) {
+        this.delegatedStateBackend = Preconditions.checkNotNull(stateBackend);
+
+        Preconditions.checkArgument(
+                !(stateBackend instanceof ChangelogStateBackend),
+                "Recursive Delegation on ChangelogStateBackend is not 
supported.");
+    }
+
+    @Override
+    public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(

Review comment:
       I think this method could be simplified to 
   ~~~java
       @Override
       public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
               Environment env,
               JobID jobID,
               String operatorIdentifier,
               TypeSerializer<K> keySerializer,
               int numberOfKeyGroups,
               KeyGroupRange keyGroupRange,
               TaskKvStateRegistry kvStateRegistry,
               TtlTimeProvider ttlTimeProvider,
               MetricGroup metricGroup,
               @Nonnull Collection<KeyedStateHandle> stateHandles,
               CloseableRegistry cancelStreamRegistry)
               throws Exception {
           return this.createKeyedStateBackend(
                   env,
                   jobID,
                   operatorIdentifier,
                   keySerializer,
                   numberOfKeyGroups,
                   keyGroupRange,
                   kvStateRegistry,
                   ttlTimeProvider,
                   metricGroup,
                   stateHandles,
                   cancelStreamRegistry,
                   1.0);
       }
   ~~~

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.TestableKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link KeyedStateBackend} that keeps state on the underlying delegated 
keyed state backend as
+ * well as on the state change log.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+@Internal
+class ChangelogKeyedStateBackend<K>
+        implements CheckpointableKeyedStateBackend<K>,
+                CheckpointListener,
+                TestableKeyedStateBackend {
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> 
STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) 
ChangelogValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    /** delegated keyedStateBackend. */
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    /**
+     * This is the cache maintained by the DelegateKeyedStateBackend itself. 
It is not the same as
+     * the underlying delegated keyedStateBackend. InternalKvState is a 
delegated state.
+     */
+    private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
+
+    private final ExecutionConfig executionConfig;
+
+    private final TtlTimeProvider ttlTimeProvider;
+
+    /** last accessed partitioned state. */
+    @SuppressWarnings("rawtypes")
+    private InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    private String lastName;
+
+    public ChangelogKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N> Stream<K> getKeys(String state, N namespace) {
+        return keyedStateBackend.getKeys(state, namespace);
+    }
+
+    @Override
+    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+        return keyedStateBackend.getKeysAndNamespaces(state);
+    }
+
+    @Override
+    public void dispose() {
+        keyedStateBackend.dispose();
+        lastName = null;
+        lastState = null;
+        keyValueStatesByName.clear();
+    }
+
+    @Override
+    public void registerKeySelectionListener(KeySelectionListener<K> listener) 
{
+        keyedStateBackend.registerKeySelectionListener(listener);
+    }
+
+    @Override
+    public boolean deregisterKeySelectionListener(KeySelectionListener<K> 
listener) {
+        return keyedStateBackend.deregisterKeySelectionListener(listener);
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+
+        keyedStateBackend.applyToAllKeys(
+                namespace,
+                namespaceSerializer,
+                stateDescriptor,
+                function,
+                this::getPartitionedState);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State> S getPartitionedState(

Review comment:
       Since we always use #createInternalState to create wrapper of changelog 
state,  I think we would not need to rewrite `getPartitionedState()` method but 
a simple
   ~~~java
   return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, 
stateDescriptor);
   ~~~
   If so, we no longer need private fields such as `lastName`, `lastState`.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+/**
+ * Base class for changelog state wrappers of state objects.
+ *
+ * @param <K> The type of key the state is associated to
+ * @param <N> The type of the namespace
+ * @param <V> The type of values kept internally in state without changelog 
wrapper
+ * @param <S> Type of originally wrapped state object
+ */
+abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, 
V>>
+        implements InternalKvState<K, N, V> {
+    protected final S delegatedState;
+
+    AbstractChangelogState(S state) {
+        this.delegatedState = state;
+    }
+
+    public S getDelegatedState() {
+        return delegatedState;
+    }
+

Review comment:
       We could add 
   ~~~java
       @Override
       public void clear() {
           this.delegatedState.clear();
       }
   ~~~
   so that all extended classes could avoid adding #clear() method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -249,6 +306,46 @@ public static StateBackend 
fromApplicationOrConfigOrDefault(
         return backend;
     }
 
+    /**
+     * This is the state backend loader that loads a {@link 
DelegatingStateBackend} wrapping the
+     * state backend loaded from {@link
+     * StateBackendLoader#loadFromApplicationOrConfigOrDefaultInternal} when 
delegation is enabled.
+     * If delegation is not enabled, the underlying wrapped state backend is 
returned instead.
+     *
+     * @param fromApplication StateBackend defined from application
+     * @param config The configuration to load the state backend from
+     * @param classLoader The class loader that should be used to load the 
state backend
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated state backend.
+     * @throws DynamicCodeLoadingException Thrown if a state backend (factory) 
is configured and the
+     *     (factory) class was not found or could not be instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
StateBackendFactory when creating
+     *     / configuring the state backend in the factory
+     * @throws IOException May be thrown by the StateBackendFactory when 
instantiating the state
+     *     backend
+     */
+    public static StateBackend fromApplicationOrConfigOrDefault(
+            @Nullable StateBackend fromApplication,
+            Configuration config,
+            ClassLoader classLoader,
+            @Nullable Logger logger)
+            throws IllegalConfigurationException, DynamicCodeLoadingException, 
IOException {
+
+        final StateBackend backend =
+                loadFromApplicationOrConfigOrDefaultInternal(
+                        fromApplication, config, classLoader, logger);
+
+        if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)
+                && !(fromApplication instanceof DelegatingStateBackend)) {
+            return loadChangelogStateBackend(backend, classLoader);
+        } else {
+            LOG.info(
+                    "Delegate State Backend is not used, and the State Backend 
is {}",

Review comment:
       This information might not be accurate as user could pass a 
`ChangelogStateBackend` here.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateStateTest.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.createKeyedBackend;
+import static org.junit.Assert.assertSame;
+
+/** Tests for {@link ChangelogStateBackend} delegating state accesses. */
+@RunWith(Parameterized.class)
+public class ChangelogDelegateStateTest {
+    private MockEnvironment env;
+
+    @Parameterized.Parameters
+    public static List<Supplier<AbstractStateBackend>> delegatedStateBackend() 
{
+        return Arrays.asList(HashMapStateBackend::new, 
EmbeddedRocksDBStateBackend::new);
+    }
+
+    @Parameterized.Parameter public Supplier<AbstractStateBackend> backend;
+
+    @Before
+    public void before() {
+        env = MockEnvironment.builder().build();
+    }
+
+    @After
+    public void after() {
+        IOUtils.closeQuietly(env);
+    }
+
+    @Test
+    public void testDelegatingValueState() throws Exception {
+        testDelegatingState(
+                new ValueStateDescriptor<>("id", String.class), 
ChangelogValueState.class);
+    }
+
+    @Test
+    public void testDelegatingListState() throws Exception {
+        testDelegatingState(
+                new ListStateDescriptor<>("id", String.class), 
ChangelogListState.class);
+    }
+
+    @Test
+    public void testDelegatingMapState() throws Exception {
+        testDelegatingState(
+                new MapStateDescriptor<>("id", Integer.class, String.class),
+                ChangelogMapState.class);
+    }
+
+    @Test
+    public void testDelegatingReducingState() throws Exception {
+        testDelegatingState(
+                new ReducingStateDescriptor<>(
+                        "id", (value1, value2) -> value1 + "," + value2, 
String.class),
+                ChangelogReducingState.class);
+    }
+
+    @Test
+    public void testDelegatingAggregatingState() throws Exception {
+        testDelegatingState(
+                new AggregatingStateDescriptor<>(
+                        "my-state",
+                        new 
StateBackendTestBase.MutableAggregatingAddingFunction(),
+                        StateBackendTestBase.MutableLong.class),
+                ChangelogAggregatingState.class);
+    }
+
+    private void testDelegatingState(StateDescriptor descriptor, Class<?> 
stateClass)
+            throws Exception {
+        KeyedStateBackend<Integer> delegatedBackend = 
createKeyedBackend(backend.get(), env);
+        KeyedStateBackend<Integer> changelogBackend =
+                createKeyedBackend(new ChangelogStateBackend(backend.get()), 
env);
+
+        try {
+            State state =
+                    changelogBackend.getPartitionedState(
+                            VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, descriptor);
+
+            assertSame(state.getClass(), stateClass);
+            assertSame(
+                    ((AbstractChangelogState<?, ?, ?, ?>) 
state).getDelegatedState().getClass(),
+                    delegatedBackend
+                            .getPartitionedState(
+                                    VoidNamespace.INSTANCE,
+                                    VoidNamespaceSerializer.INSTANCE,
+                                    descriptor)
+                            .getClass());
+        } finally {
+            delegatedBackend.dispose();

Review comment:
       We'd better add not null check and then disposing those keyed state 
backends.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateStateTest.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.createKeyedBackend;
+import static org.junit.Assert.assertSame;
+
+/** Tests for {@link ChangelogStateBackend} delegating state accesses. */
+@RunWith(Parameterized.class)
+public class ChangelogDelegateStateTest {
+    private MockEnvironment env;
+
+    @Parameterized.Parameters
+    public static List<Supplier<AbstractStateBackend>> delegatedStateBackend() 
{
+        return Arrays.asList(HashMapStateBackend::new, 
EmbeddedRocksDBStateBackend::new);
+    }
+
+    @Parameterized.Parameter public Supplier<AbstractStateBackend> backend;
+
+    @Before
+    public void before() {
+        env = MockEnvironment.builder().build();
+    }
+
+    @After
+    public void after() {
+        IOUtils.closeQuietly(env);
+    }
+
+    @Test
+    public void testDelegatingValueState() throws Exception {
+        testDelegatingState(
+                new ValueStateDescriptor<>("id", String.class), 
ChangelogValueState.class);
+    }
+
+    @Test
+    public void testDelegatingListState() throws Exception {
+        testDelegatingState(
+                new ListStateDescriptor<>("id", String.class), 
ChangelogListState.class);
+    }
+
+    @Test
+    public void testDelegatingMapState() throws Exception {
+        testDelegatingState(
+                new MapStateDescriptor<>("id", Integer.class, String.class),
+                ChangelogMapState.class);
+    }
+
+    @Test
+    public void testDelegatingReducingState() throws Exception {
+        testDelegatingState(
+                new ReducingStateDescriptor<>(
+                        "id", (value1, value2) -> value1 + "," + value2, 
String.class),
+                ChangelogReducingState.class);
+    }
+
+    @Test
+    public void testDelegatingAggregatingState() throws Exception {
+        testDelegatingState(
+                new AggregatingStateDescriptor<>(
+                        "my-state",
+                        new 
StateBackendTestBase.MutableAggregatingAddingFunction(),
+                        StateBackendTestBase.MutableLong.class),
+                ChangelogAggregatingState.class);
+    }
+
+    private void testDelegatingState(StateDescriptor descriptor, Class<?> 
stateClass)
+            throws Exception {
+        KeyedStateBackend<Integer> delegatedBackend = 
createKeyedBackend(backend.get(), env);
+        KeyedStateBackend<Integer> changelogBackend =
+                createKeyedBackend(new ChangelogStateBackend(backend.get()), 
env);

Review comment:
       Why we have to create the `delegatedBackend` explicitly? Can we just get 
the delegated keyed backend from `changelogBackend` directly?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to