rkhachatryan commented on a change in pull request #14943:
URL: https://github.com/apache/flink/pull/14943#discussion_r583599986



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
##########
@@ -136,6 +138,15 @@
      */
     boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);
 
+    /** Returns the total number of state entries across all keys/namespaces. 
*/
+    @VisibleForTesting
+    int numKeyValueStateEntries();

Review comment:
       I'd like to avoid adding methods exclusively for testing to public 
production interfaces.
   Instead, it can be in `AbstractKeyedStateBackend`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegateKeyedStateBackend.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.delegate;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+
+/**
+ * {@link DelegateKeyedStateBackend} wraps an underlying keyed state backend, 
and delegates its
+ * partitioned state access.
+ *
+ * @param <K> Type of the key by which states are keyed.
+ */
+public interface DelegateKeyedStateBackend<K>
+        extends CheckpointableKeyedStateBackend<K>, CheckpointListener {
+
+    AbstractKeyedStateBackend<K> getDelegatedKeyedStateBackend();

Review comment:
       1. As `StateConfigUtil.isStateImmutableInStateBackend` doesn't need to 
explicitly access delegated backend, this interface becomes unnecessary.
   2. If not, I think it should be `@Experimental`
   3. And `getDelegatedKeyedStateBackend` should return `KeyedStateBackend`

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/StateConfigUtil.java
##########
@@ -46,19 +46,24 @@ public static StateTtlConfig createTtlConfig(long 
retentionTime) {
         }
     }
 
-    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
stateBackend) {
+    public static boolean isStateImmutableInStateBackend(KeyedStateBackend<?> 
keyedStateBackend) {
         // TODO: remove the hard code check once FLINK-21027 is supported
         // state key and value is immutable only when using rocksdb state 
backend and timer
+        KeyedStateBackend<?> rootKeyedStateBackend =
+                keyedStateBackend instanceof DelegateKeyedStateBackend
+                        ? ((DelegateKeyedStateBackend<?>) keyedStateBackend)
+                                .getDelegatedKeyedStateBackend()
+                        : keyedStateBackend;
+
         boolean isRocksDbState =
-                
ROCKSDB_KEYED_STATE_BACKEND.equals(stateBackend.getClass().getCanonicalName());
-        boolean isHeapTimer = false;
-        if (stateBackend instanceof AbstractKeyedStateBackend) {
-            // currently, requiresLegacySynchronousTimerSnapshots()
-            // indicates the underlying uses heap-bsased timer
-            isHeapTimer =
-                    ((AbstractKeyedStateBackend<?>) stateBackend)
-                            
.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT);
-        }
+                ROCKSDB_KEYED_STATE_BACKEND.equals(
+                        rootKeyedStateBackend.getClass().getCanonicalName());

Review comment:
       I think this `getDelegatedKeyedStateBackend` is not necessary anymore as 
well as pre-existing class check (is rocks db) after you added 
`requiresLegacySynchronousTimerSnapshots` to `KeyedStateBackend`.
   
   So `isStateImmutableInStateBackend` can simply return 
`keyedStateBackend.requiresLegacySynchronousTimerSnapshots(CheckpointType.CHECKPOINT)`;

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -249,6 +266,66 @@ public static StateBackend 
fromApplicationOrConfigOrDefault(
         return backend;
     }
 
+    /**
+     * This is the state backend loader that loads a {@link 
DelegateStateBackend} wrapping the state
+     * backend loaded from {@link 
StateBackendLoader#fromApplicationOrConfigOrDefault} when
+     * delegation is enabled. If delegation is not enabled, the underlying 
wrapped state backend is
+     * returned instead.
+     *
+     * <p>{@link DelegateStateBackend} can only be enabled through 
configuration.
+     *
+     * @param fromApplication StateBackend defined from application
+     * @param config The configuration to load the state backend from
+     * @param classLoader The class loader that should be used to load the 
state backend
+     * @param logger Optionally, a logger to log actions to (may be null)
+     * @return The instantiated state backend.
+     * @throws DynamicCodeLoadingException Thrown if a state backend (factory) 
is configured and the
+     *     (factory) class was not found or could not be instantiated
+     * @throws IllegalConfigurationException May be thrown by the 
StateBackendFactory when creating
+     *     / configuring the state backend in the factory
+     * @throws IOException May be thrown by the StateBackendFactory when 
instantiating the state
+     *     backend
+     */
+    public static StateBackend loadStateBackend(
+            @Nullable StateBackend fromApplication,
+            Configuration config,
+            ClassLoader classLoader,
+            @Nullable Logger logger)
+            throws IllegalConfigurationException, DynamicCodeLoadingException, 
IOException {
+
+        final StateBackend backend =
+                fromApplicationOrConfigOrDefault(fromApplication, config, 
classLoader, logger);
+
+        if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG)) {
+            LOG.info(
+                    "Delegate State Backend is used, and the root State 
Backend is {}",
+                    backend.getClass().getSimpleName());
+
+            // ChangelogStateBackend resides in a separate module, load it 
using reflection
+            try {
+                Constructor<? extends DelegateStateBackend> constructor =
+                        Class.forName(CHANGELOG_STATE_BACKEND, false, 
classLoader)
+                                .asSubclass(DelegateStateBackend.class)
+                                .getConstructor(StateBackend.class);
+                return constructor.newInstance(backend);

Review comment:
       Should we check for double delegation here?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+import java.util.Collection;
+
+/**
+ * Delegated partitioned {@link AggregatingState} that forwards changes to 
{@link StateChange} upon
+ * {@link AggregatingState} is updated.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <IN> The type of the value added to the state.
+ * @param <ACC> The type of the value stored in the state (the accumulator 
type).
+ * @param <OUT> The type of the value returned from the state.
+ */
+public class ChangelogAggregatingState<K, N, IN, ACC, OUT>

Review comment:
       Looks like it can be package-private.
   
   ditto other Changelog State classes

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.delegate.DelegateKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DelegateKeyedStateBackend} that keeps state on the underlying 
delegated keyed state
+ * backend as well as on the state change log.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class ChangelogKeyedStateBackend<K> implements 
DelegateKeyedStateBackend<K> {
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> 
STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) 
ChangelogValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    /** delegated keyedStateBackend. */
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    /**
+     * This is the cache maintained by the DelegateKeyedStateBackend itself. 
It is not the same as
+     * the underlying delegated keyedStateBackend. InternalKvState is a 
delegated state.
+     */
+    private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
+
+    private final ExecutionConfig executionConfig;
+
+    private final TtlTimeProvider ttlTimeProvider;
+
+    /** last accessed partitioned state. */
+    @SuppressWarnings("rawtypes")
+    private InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    private String lastName;
+
+    public ChangelogKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N> Stream<K> getKeys(String state, N namespace) {
+        return keyedStateBackend.getKeys(state, namespace);
+    }
+
+    @Override
+    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+        return keyedStateBackend.getKeysAndNamespaces(state);
+    }
+
+    @Override
+    public void dispose() {
+        keyedStateBackend.dispose();
+        lastName = null;
+        lastState = null;
+        keyValueStatesByName.clear();
+    }
+
+    @Override
+    public void registerKeySelectionListener(KeySelectionListener<K> listener) 
{
+        keyedStateBackend.registerKeySelectionListener(listener);
+    }
+
+    @Override
+    public boolean deregisterKeySelectionListener(KeySelectionListener<K> 
listener) {
+        return keyedStateBackend.deregisterKeySelectionListener(listener);
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(

Review comment:
       I think this method can be refactored to avoid code duplication by:
   1. Remove `AbstractKeyedStateBackend.supportConcurrentModification()`
   2. Add an abstract method to iterate over keys to 
`AbstractKeyedStateBackend` (and override in heap backend)
   3. Add `AbstractKeyedStateBackend.applyToAllKeys(Function<..., S> 
stateFactory, ...)` method
   4. By default, use `AbstractKeyedStateBackend.getPartitionedState()`
   5. In this class override it and by pass `this::getPartitionedState`
   So here it would look like:
   ```
   public void applyToAllKeys(...) {
       keyedStateBackend.applyToAllKeys(namespace, namespaceSerializer, 
stateDescriptor, function,
              this::getPartitionedState);
   ```
   
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/delegate/DelegateStateBackend.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.delegate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StateBackend;
+
+/**
+ * An interface for delegate state backend.
+ *
+ * <p>As its name, it should include a state backend to delegate. The 
delegated state backend itself
+ * can not be a {@link DelegateStateBackend}.
+ *
+ * <p>TODO: provide configurable way to verify whether a state backend can be 
delegated.
+ */
+@Internal
+public interface DelegateStateBackend extends StateBackend {

Review comment:
       `DelegateStateBackend` -> `DelegatingStateBackend`?
   Just "Delegate" is ambiguous to me.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.delegate.DelegateKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DelegateKeyedStateBackend} that keeps state on the underlying 
delegated keyed state
+ * backend as well as on the state change log.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class ChangelogKeyedStateBackend<K> implements 
DelegateKeyedStateBackend<K> {

Review comment:
       package-private and `@Internal`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
##########
@@ -31,7 +31,7 @@
  * @param <N> The type of the namespace.
  * @param <V> The type of the value.
  */
-class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>
+public class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>

Review comment:
       Please revert the visibility if you adjust tests as I suggested.
   
   ditto: other state classes

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogDelegateHashMapTest.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
+import org.apache.flink.runtime.state.HashMapStateBackendTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.HeapAggregatingState;
+import org.apache.flink.runtime.state.heap.HeapListState;
+import org.apache.flink.runtime.state.heap.HeapMapState;
+import org.apache.flink.runtime.state.heap.HeapReducingState;
+import org.apache.flink.runtime.state.heap.HeapValueState;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.assertAggregatingState;
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.assertListState;
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.assertMapState;
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.assertReducingState;
+import static 
org.apache.flink.state.changelog.ChangelogStateBackendTestUtils.assertValueState;
+
+/** Tests for {@link ChangelogStateBackend} delegating {@link 
HashMapStateBackendTest}. */
+public class ChangelogDelegateHashMapTest extends HashMapStateBackendTest {
+
+    @Override
+    protected <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(
+            TypeSerializer<K> keySerializer,
+            int numberOfKeyGroups,
+            KeyGroupRange keyGroupRange,
+            Environment env)
+            throws Exception {
+
+        return ChangelogStateBackendTestUtils.createKeyedBackend(
+                getStateBackend(), keySerializer, numberOfKeyGroups, 
keyGroupRange, env);
+    }
+
+    @Test
+    public void testDelegatedValueState() throws Exception {
+        assertValueState(createKeyedBackend(IntSerializer.INSTANCE), 
HeapValueState.class);

Review comment:
       I see some issues with ChangelogDelegate.*Tests:
   1. Production code is made public (e.g. `HeapValueState`, 
`RocksDBValueState`)
   2. Explicit assertions about each delegated backend implementation details 
(so changing RocksDb state class will break the tests)
   3. A lot of code duplication
   
   Why not have just one test class that would assert for every state type the 
returned delegated state
   is the same as delegating:
   
   ```
   StateBackend delegatedBackend = new HashMapStateBackend(); // implementation 
doesn't matter
   State delegatedState = 
delegatedBackend.createKeyedStateBackend(...).getPartitionedState(...);
   ChangelogStateBackend changelogBackend = new 
ChangelogStateBackend(delegatedBackend);
   ChangelogValueState changeLogState = 
changelogBackend.createKeyedStateBackend().getPartitionedState();
   assertEquals(delegatedState.getClass(), 
changeLogState.getDelegatedValueState().getClass());
   ```
   
   (while still having almost empty sublasses to run `StateBackendTestBase` 
again different backend combinations)

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.delegate.DelegateKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link DelegateKeyedStateBackend} that keeps state on the underlying 
delegated keyed state
+ * backend as well as on the state change log.
+ *
+ * @param <K> The key by which state is keyed.
+ */
+public class ChangelogKeyedStateBackend<K> implements 
DelegateKeyedStateBackend<K> {
+
+    private static final Map<Class<? extends StateDescriptor>, StateFactory> 
STATE_FACTORIES =
+            Stream.of(
+                            Tuple2.of(
+                                    ValueStateDescriptor.class,
+                                    (StateFactory) 
ChangelogValueState::create),
+                            Tuple2.of(
+                                    ListStateDescriptor.class,
+                                    (StateFactory) ChangelogListState::create),
+                            Tuple2.of(
+                                    ReducingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogReducingState::create),
+                            Tuple2.of(
+                                    AggregatingStateDescriptor.class,
+                                    (StateFactory) 
ChangelogAggregatingState::create),
+                            Tuple2.of(
+                                    MapStateDescriptor.class,
+                                    (StateFactory) ChangelogMapState::create))
+                    .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+    /** delegated keyedStateBackend. */
+    private final AbstractKeyedStateBackend<K> keyedStateBackend;
+
+    /**
+     * This is the cache maintained by the DelegateKeyedStateBackend itself. 
It is not the same as
+     * the underlying delegated keyedStateBackend. InternalKvState is a 
delegated state.
+     */
+    private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
+
+    private final ExecutionConfig executionConfig;
+
+    private final TtlTimeProvider ttlTimeProvider;
+
+    /** last accessed partitioned state. */
+    @SuppressWarnings("rawtypes")
+    private InternalKvState lastState;
+
+    /** For caching the last accessed partitioned state. */
+    private String lastName;
+
+    public ChangelogKeyedStateBackend(
+            AbstractKeyedStateBackend<K> keyedStateBackend,
+            ExecutionConfig executionConfig,
+            TtlTimeProvider ttlTimeProvider) {
+        this.keyedStateBackend = keyedStateBackend;
+        this.executionConfig = executionConfig;
+        this.ttlTimeProvider = ttlTimeProvider;
+        this.keyValueStatesByName = new HashMap<>();
+    }
+
+    // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return keyedStateBackend.getKeyGroupRange();
+    }
+
+    @Override
+    public void close() throws IOException {
+        keyedStateBackend.close();
+    }
+
+    @Override
+    public void setCurrentKey(K newKey) {
+        keyedStateBackend.setCurrentKey(newKey);
+    }
+
+    @Override
+    public K getCurrentKey() {
+        return keyedStateBackend.getCurrentKey();
+    }
+
+    @Override
+    public TypeSerializer<K> getKeySerializer() {
+        return keyedStateBackend.getKeySerializer();
+    }
+
+    @Override
+    public <N> Stream<K> getKeys(String state, N namespace) {
+        return keyedStateBackend.getKeys(state, namespace);
+    }
+
+    @Override
+    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
+        return keyedStateBackend.getKeysAndNamespaces(state);
+    }
+
+    @Override
+    public void dispose() {
+        keyedStateBackend.dispose();
+        lastName = null;
+        lastState = null;
+        keyValueStatesByName.clear();
+    }
+
+    @Override
+    public void registerKeySelectionListener(KeySelectionListener<K> listener) 
{
+        keyedStateBackend.registerKeySelectionListener(listener);
+    }
+
+    @Override
+    public boolean deregisterKeySelectionListener(KeySelectionListener<K> 
listener) {
+        return keyedStateBackend.deregisterKeySelectionListener(listener);
+    }
+
+    @Override
+    public <N, S extends State, T> void applyToAllKeys(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, T> stateDescriptor,
+            KeyedStateFunction<K, S> function)
+            throws Exception {
+        try (Stream<K> keyStream = getKeys(stateDescriptor.getName(), 
namespace)) {
+
+            final S state = getPartitionedState(namespace, 
namespaceSerializer, stateDescriptor);
+
+            if (keyedStateBackend.supportConcurrentModification()) {
+                keyStream.forEach(
+                        (K key) -> {
+                            setCurrentKey(key);
+                            try {
+                                function.process(key, state);
+                            } catch (Throwable e) {
+                                // we wrap the checked exception in an 
unchecked
+                                // one and catch it (and re-throw it) later.
+                                throw new RuntimeException(e);
+                            }
+                        });
+            } else {
+                final List<K> keys = keyStream.collect(Collectors.toList());
+                for (K key : keys) {
+                    setCurrentKey(key);
+                    function.process(key, state);
+                }
+            }
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <N, S extends State> S getPartitionedState(
+            N namespace,
+            TypeSerializer<N> namespaceSerializer,
+            StateDescriptor<S, ?> stateDescriptor)
+            throws Exception {
+
+        checkNotNull(namespace, "Namespace");
+
+        if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+            lastState.setCurrentNamespace(namespace);
+            return (S) lastState;
+        }
+
+        final InternalKvState<K, ?, ?> previous =
+                keyValueStatesByName.get(stateDescriptor.getName());
+        if (previous != null) {
+            lastState = previous;
+            lastState.setCurrentNamespace(namespace);
+            lastName = stateDescriptor.getName();
+            return (S) previous;
+        }
+
+        final S state = getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
+        final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) 
state;
+
+        lastName = stateDescriptor.getName();
+        lastState = kvState;
+        kvState.setCurrentNamespace(namespace);
+
+        return state;
+    }
+
+    @Nonnull
+    @Override
+    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+            long checkpointId,
+            long timestamp,
+            @Nonnull CheckpointStreamFactory streamFactory,
+            @Nonnull CheckpointOptions checkpointOptions)
+            throws Exception {
+        return keyedStateBackend.snapshot(
+                checkpointId, timestamp, streamFactory, checkpointOptions);
+    }
+
+    @Nonnull
+    @Override
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> 
& Keyed<?>>
+            KeyGroupedInternalPriorityQueue<T> create(
+                    @Nonnull String stateName,
+                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
+        return keyedStateBackend.create(stateName, 
byteOrderedElementSerializer);
+    }
+
+    @Override
+    public int numKeyValueStateEntries() {
+        return keyedStateBackend.numKeyValueStateEntries();
+    }
+
+    @Override
+    public boolean requiresLegacySynchronousTimerSnapshots(CheckpointType 
checkpointOptions) {
+        return 
keyedStateBackend.requiresLegacySynchronousTimerSnapshots(checkpointOptions);
+    }
+
+    // -------------------- CheckpointListener --------------------------------
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        keyedStateBackend.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        keyedStateBackend.notifyCheckpointAborted(checkpointId);
+    }
+
+    // -------------------- DelegateKeyedStateBackend 
--------------------------
+    public AbstractKeyedStateBackend<K> getDelegatedKeyedStateBackend() {
+        return keyedStateBackend;

Review comment:
       This method is unnecessary anymore if `isStateImmutableInStateBackend` 
is simplified as commented above.
   If not, please add `@Override` annotation.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.delegate.DelegateKeyedStateBackend;
+import org.apache.flink.runtime.state.delegate.DelegateStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+
+/**
+ * This state backend holds the working state in the underlying 
delegatedStateBackend, and forwards
+ * state changes to State Changelog.
+ */
+public class ChangelogStateBackend implements DelegateStateBackend {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogStateBackend.class);
+
+    private static final long serialVersionUID = 1000L;
+
+    private final StateBackend delegatedStateBackend;
+
+    public ChangelogStateBackend(StateBackend stateBackend) {
+        this.delegatedStateBackend = stateBackend;
+    }
+
+    @Override
+    public <K> DelegateKeyedStateBackend<K> createKeyedStateBackend(
+            Environment env,
+            JobID jobID,
+            String operatorIdentifier,
+            TypeSerializer<K> keySerializer,
+            int numberOfKeyGroups,
+            KeyGroupRange keyGroupRange,
+            TaskKvStateRegistry kvStateRegistry,
+            TtlTimeProvider ttlTimeProvider,
+            MetricGroup metricGroup,
+            @Nonnull Collection<KeyedStateHandle> stateHandles,
+            CloseableRegistry cancelStreamRegistry)
+            throws Exception {
+        LOG.debug("ChangelogStateBackend creates its KeyedStateBackend.");

Review comment:
       I think this logging is unnecessary (ditto othere similar log 
statements).

##########
File path: flink-state-backends/flink-statebackend-changelog/pom.xml
##########
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-state-backends</artifactId>
+               <version>1.13-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       
<artifactId>flink-statebackend-changelog_${scala.binary.version}</artifactId>
+       <name>Flink : State backends : Changelog</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-guava</artifactId>
+        </dependency>
+
+        <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>

Review comment:
       test scope?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to