This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe5df06c727138ef02359aa3b23ef0e4b80a57df
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Jun 3 18:32:37 2021 +0200

    [FLINK-21356][state/changelog] Implement recovery using changelog
    
    Both materialized and non-materialized states are read for recovery.
    
    Materialization will be implemented in subsequent commits.
    For now, changelog grows indefinitely.
---
 .../runtime/state/AbstractKeyedStateBackend.java   |   8 +-
 .../flink/runtime/state/changelog/StateChange.java |   3 +-
 .../StateChangelogHandleStreamHandleReader.java    |   1 +
 .../state/metainfo/StateMetaInfoSnapshot.java      |  26 ++-
 .../state/changelog/AbstractChangelogState.java    |   2 +-
 .../state/changelog/AbstractStateChangeLogger.java |  79 +++------
 .../state/changelog/ChangelogAggregatingState.java |  22 ++-
 .../ChangelogKeyGroupedPriorityQueue.java          |  13 +-
 .../changelog/ChangelogKeyedStateBackend.java      | 110 ++++++++++---
 .../flink/state/changelog/ChangelogListState.java  |  21 ++-
 .../flink/state/changelog/ChangelogMapState.java   |  21 ++-
 .../state/changelog/ChangelogReducingState.java    |  21 ++-
 .../flink/state/changelog/ChangelogState.java      |  36 ++---
 .../state/changelog/ChangelogStateBackend.java     | 121 +++++++-------
 .../flink/state/changelog/ChangelogValueState.java |  22 ++-
 .../state/changelog/KvStateChangeLoggerImpl.java   |   2 +-
 .../state/changelog/StateChangeOperation.java      |  69 ++++++++
 .../restore/AggregatingStateChangeApplier.java     |  57 +++++++
 .../changelog/restore/ChangelogApplierFactory.java |  55 +++++++
 .../restore/ChangelogApplierFactoryImpl.java       |  69 ++++++++
 .../restore/ChangelogBackendLogApplier.java        | 176 +++++++++++++++++++++
 .../restore/ChangelogBackendRestoreOperation.java  | 107 +++++++++++++
 .../restore/FunctionDelegationHelper.java          | 149 +++++++++++++++++
 .../changelog/restore/KvStateChangeApplier.java    |  62 ++++++++
 .../changelog/restore/ListStateChangeApplier.java  |  69 ++++++++
 .../changelog/restore/MapStateChangeApplier.java   |  65 ++++++++
 .../restore/PriorityQueueStateChangeApplier.java   |  54 +++++++
 .../restore/ReducingStateChangeApplier.java        |  57 +++++++
 .../changelog/restore/StateChangeApplier.java      |  32 +---
 .../changelog/restore/ValueStateChangeApplier.java |  53 +++++++
 .../state/changelog/ChangelogListStateTest.java    |   7 +-
 .../state/changelog/ChangelogMapStateTest.java     |   7 +-
 .../changelog/KvStateChangeLoggerImplTest.java     |   3 +-
 .../PriorityQueueStateChangeLoggerImplTest.java    |   3 +-
 .../state/changelog/StateChangeLoggerTestBase.java |   3 +-
 35 files changed, 1392 insertions(+), 213 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 7f36c84..f62e6c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -53,7 +53,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public abstract class AbstractKeyedStateBackend<K>
         implements CheckpointableKeyedStateBackend<K>,
                 CheckpointListener,
-                TestableKeyedStateBackend<K> {
+                TestableKeyedStateBackend<K>,
+                InternalKeyContext<K> {
 
     /** The key serializer. */
     protected final TypeSerializer<K> keySerializer;
@@ -394,4 +395,9 @@ public abstract class AbstractKeyedStateBackend<K>
                 final StateDescriptor<S, ?> stateDescriptor)
                 throws Exception;
     }
+
+    @Override
+    public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+        keyContext.setCurrentKeyGroupIndex(currentKeyGroupIndex);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
index 7ae60f0..e777875 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
@@ -28,7 +28,8 @@ public class StateChange {
     private final byte[] change;
 
     public StateChange(int keyGroup, byte[] change) {
-        Preconditions.checkArgument(keyGroup >= 0);
+        // todo: enable check in FLINK-23035
+        // Preconditions.checkArgument(keyGroup >= 0);
         this.keyGroup = keyGroup;
         this.change = Preconditions.checkNotNull(change);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
index 482590c..e586d95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
@@ -45,6 +45,7 @@ public class StateChangelogHandleStreamHandleReader
 
     /** Reads a stream of state changes starting from a specified offset. */
     public interface StateChangeIterator {
+        // todo: add implementation (FLINK-21353)
         CloseableIterator<StateChange> read(StreamStateHandle handle, long 
offset);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
index 6ade74b..72a3152 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java
@@ -38,10 +38,28 @@ public class StateMetaInfoSnapshot {
 
     /** Enum that defines the different types of state that live in Flink 
backends. */
     public enum BackendStateType {
-        KEY_VALUE,
-        OPERATOR,
-        BROADCAST,
-        PRIORITY_QUEUE
+        KEY_VALUE(0),
+        OPERATOR(1),
+        BROADCAST(2),
+        PRIORITY_QUEUE(3);
+        private final byte code;
+
+        BackendStateType(int code) {
+            this.code = (byte) code;
+        }
+
+        public byte getCode() {
+            return code;
+        }
+
+        public static BackendStateType byCode(int code) {
+            for (BackendStateType type : values()) {
+                if (type.code == code) {
+                    return type;
+                }
+            }
+            throw new IllegalArgumentException("Unknown BackendStateType: " + 
code);
+        }
     }
 
     /** Predefined keys for the most common options in the meta info. */
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
index ba6a95a..823f261 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
@@ -33,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <S> Type of originally wrapped state object
  */
 abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, 
V>>
-        implements InternalKvState<K, N, V> {
+        implements InternalKvState<K, N, V>, ChangelogState {
 
     protected final S delegatedState;
     protected final KvStateChangeLogger<V, N> changeLogger;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index da106c8..5b21e15 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -18,9 +18,12 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.util.function.ThrowingConsumer;
 
@@ -28,20 +31,18 @@ import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE;
 import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_ELEMENT;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_OR_UPDATE_ELEMENT;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET_INTERNAL;
+import static org.apache.flink.state.changelog.StateChangeOperation.ADD;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.ADD_ELEMENT;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.ADD_OR_UPDATE_ELEMENT;
+import static org.apache.flink.state.changelog.StateChangeOperation.CLEAR;
+import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.REMOVE_ELEMENT;
+import static org.apache.flink.state.changelog.StateChangeOperation.SET;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.SET_INTERNAL;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 abstract class AbstractStateChangeLogger<Key, Value, Ns> implements 
StateChangeLogger<Value, Ns> {
@@ -49,6 +50,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
     protected final StateChangelogWriter<?> stateChangelogWriter;
     protected final InternalKeyContext<Key> keyContext;
     protected final RegisteredStateMetaInfoBase metaInfo;
+    private final StateMetaInfoSnapshot.BackendStateType stateType;
     private boolean metaDataWritten = false;
 
     public AbstractStateChangeLogger(
@@ -58,6 +60,13 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
         this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
         this.keyContext = checkNotNull(keyContext);
         this.metaInfo = checkNotNull(metaInfo);
+        if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
+            this.stateType = KEY_VALUE;
+        } else if (metaInfo instanceof 
RegisteredPriorityQueueStateBackendMetaInfo) {
+            this.stateType = PRIORITY_QUEUE;
+        } else {
+            throw new IllegalArgumentException("Unsupported state type: " + 
metaInfo);
+        }
     }
 
     @Override
@@ -135,7 +144,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
                     COMMON_KEY_GROUP,
                     serializeRaw(
                             out -> {
-                                out.writeByte(METADATA.code);
+                                out.writeByte(METADATA.getCode());
                                 
out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
                                 StateMetaInfoSnapshotReadersWriters.getWriter()
                                         
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
@@ -151,10 +160,11 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
             throws IOException {
         return serializeRaw(
                 wrapper -> {
-                    wrapper.writeByte(op.code);
+                    wrapper.writeByte(op.getCode());
                     // todo: optimize in FLINK-22944 by either writing short 
code or grouping and
                     // writing once (same for key, ns)
                     wrapper.writeUTF(metaInfo.getName());
+                    wrapper.writeByte(stateType.getCode());
                     serializeScope(ns, wrapper);
                     if (dataWriter != null) {
                         dataWriter.accept(wrapper);
@@ -168,50 +178,11 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
     private byte[] serializeRaw(
             ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
             throws IOException {
+        // todo: optimize performance
         try (ByteArrayOutputStream out = new ByteArrayOutputStream();
                 DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(out)) {
             dataWriter.accept(wrapper);
             return out.toByteArray();
         }
     }
-
-    enum StateChangeOperation {
-        /** Scope: key + namespace. */
-        CLEAR((byte) 0),
-        /** Scope: key + namespace. */
-        SET((byte) 1),
-        /** Scope: key + namespace. */
-        SET_INTERNAL((byte) 2),
-        /** Scope: key + namespace. */
-        ADD((byte) 3),
-        /** Scope: key + namespace, also affecting other (source) namespaces. 
*/
-        MERGE_NS((byte) 4),
-        /** Scope: key + namespace + element (e.g. user list append). */
-        ADD_ELEMENT((byte) 5),
-        /** Scope: key + namespace + element (e.g. user map key put). */
-        ADD_OR_UPDATE_ELEMENT((byte) 6),
-        /** Scope: key + namespace + element (e.g. user map remove or iterator 
remove). */
-        REMOVE_ELEMENT((byte) 7),
-        /** Scope: key + namespace, first element (e.g. priority queue poll). 
*/
-        REMOVE_FIRST_ELEMENT((byte) 8),
-        /** State metadata (name, serializers, etc.). */
-        METADATA((byte) 9);
-        private final byte code;
-
-        StateChangeOperation(byte code) {
-            this.code = code;
-        }
-
-        private static final Map<Byte, 
KvStateChangeLoggerImpl.StateChangeOperation> BY_CODES =
-                
Arrays.stream(AbstractStateChangeLogger.StateChangeOperation.values())
-                        .collect(Collectors.toMap(o -> o.code, 
Function.identity()));
-
-        public static StateChangeOperation byCode(byte opCode) {
-            return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode));
-        }
-
-        public byte getCode() {
-            return code;
-        }
-    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
index 3a1d9c8..2fa9712 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
@@ -21,8 +21,11 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
@@ -42,10 +45,14 @@ class ChangelogAggregatingState<K, N, IN, ACC, OUT>
         extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, 
N, IN, ACC, OUT>>
         implements InternalAggregatingState<K, N, IN, ACC, OUT> {
 
+    private final InternalKeyContext<K> keyContext;
+
     ChangelogAggregatingState(
             InternalAggregatingState<K, N, IN, ACC, OUT> delegatedState,
-            KvStateChangeLogger<ACC, N> changeLogger) {
+            KvStateChangeLogger<ACC, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         super(delegatedState, changeLogger);
+        this.keyContext = keyContext;
     }
 
     @Override
@@ -95,9 +102,18 @@ class ChangelogAggregatingState<K, N, IN, ACC, OUT>
 
     @SuppressWarnings("unchecked")
     static <T, K, N, SV, S extends State, IS extends S> IS create(
-            InternalKvState<K, N, SV> aggregatingState, 
KvStateChangeLogger<SV, N> changeLogger) {
+            InternalKvState<K, N, SV> aggregatingState,
+            KvStateChangeLogger<SV, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         return (IS)
                 new ChangelogAggregatingState<>(
-                        (InternalAggregatingState<K, N, T, SV, ?>) 
aggregatingState, changeLogger);
+                        (InternalAggregatingState<K, N, T, SV, ?>) 
aggregatingState,
+                        changeLogger,
+                        keyContext);
+    }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forAggregating(delegatedState, keyContext);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
index a864e14..eff82db 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
@@ -20,6 +20,8 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -37,7 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A {@link KeyGroupedInternalPriorityQueue} that keeps state on the 
underlying delegated {@link
  * KeyGroupedInternalPriorityQueue} as well as on the state change log.
  */
-public class ChangelogKeyGroupedPriorityQueue<T> implements 
KeyGroupedInternalPriorityQueue<T> {
+public class ChangelogKeyGroupedPriorityQueue<T>
+        implements KeyGroupedInternalPriorityQueue<T>, ChangelogState {
     private final KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
     private final PriorityQueueStateChangeLogger<T> logger;
     private final TypeSerializer<T> serializer;
@@ -109,9 +112,6 @@ public class ChangelogKeyGroupedPriorityQueue<T> implements 
KeyGroupedInternalPr
     }
 
     private void logAddition(Collection<? extends T> toAdd) {
-        if (toAdd == null) {
-            return;
-        }
         try {
             logger.valueElementAdded(
                     out -> {
@@ -133,4 +133,9 @@ public class ChangelogKeyGroupedPriorityQueue<T> implements 
KeyGroupedInternalPr
                 StateChangeLoggingIterator.create(
                         delegatedPriorityQueue.iterator(), logger, 
serializer::serialize, null));
     }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forPriorityQueue(delegatedPriorityQueue, serializer);
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 7293c7b..f7a83f0 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -44,15 +44,19 @@ import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.TestableKeyedStateBackend;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
 import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -64,9 +68,11 @@ import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
@@ -84,7 +90,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * @param <K> The key by which state is keyed.
  */
 @Internal
-class ChangelogKeyedStateBackend<K>
+public class ChangelogKeyedStateBackend<K>
         implements CheckpointableKeyedStateBackend<K>,
                 CheckpointListener,
                 TestableKeyedStateBackend<K> {
@@ -118,11 +124,13 @@ class ChangelogKeyedStateBackend<K>
      */
     private final HashMap<String, InternalKvState<K, ?, ?>> 
keyValueStatesByName;
 
+    private final HashMap<String, ChangelogKeyGroupedPriorityQueue<?>> 
priorityQueueStatesByName;
+
     private final ExecutionConfig executionConfig;
 
     private final TtlTimeProvider ttlTimeProvider;
 
-    private final StateChangelogWriter<?> stateChangelogWriter;
+    private final StateChangelogWriter<StateChangelogHandle> 
stateChangelogWriter;
 
     private long lastCheckpointId = -1L;
 
@@ -133,13 +141,16 @@ class ChangelogKeyedStateBackend<K>
     /** For caching the last accessed partitioned state. */
     private String lastName;
 
+    private final FunctionDelegationHelper functionDelegationHelper =
+            new FunctionDelegationHelper();
+
     /** Updated initially on restore and later upon materialization (in 
FLINK-21357). */
     @GuardedBy("materialized")
     private final List<KeyedStateHandle> materialized = new ArrayList<>();
 
     /** Updated initially on restore and later cleared upon materialization 
(in FLINK-21357). */
     @GuardedBy("materialized")
-    private final List<StateChangelogHandle> nonMaterialized = new 
ArrayList<>();
+    private final List<StateChangelogHandle> restoredNonMaterialized = new 
ArrayList<>();
 
     /**
      * {@link SequenceNumber} denoting last upload range <b>start</b>, 
inclusive. Updated to {@link
@@ -170,13 +181,16 @@ class ChangelogKeyedStateBackend<K>
             AbstractKeyedStateBackend<K> keyedStateBackend,
             ExecutionConfig executionConfig,
             TtlTimeProvider ttlTimeProvider,
-            StateChangelogWriter<?> stateChangelogWriter) {
+            StateChangelogWriter<StateChangelogHandle> stateChangelogWriter,
+            Collection<ChangelogStateBackendHandle> initialState) {
         this.keyedStateBackend = keyedStateBackend;
         this.executionConfig = executionConfig;
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
+        this.priorityQueueStatesByName = new HashMap<>();
         this.stateChangelogWriter = stateChangelogWriter;
         this.materializedTo = stateChangelogWriter.initialSequenceNumber();
+        this.completeRestore(initialState);
     }
 
     // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
@@ -316,7 +330,7 @@ class ChangelogKeyedStateBackend<K>
         // todo: revisit after FLINK-21357 - use mailbox action?
         synchronized (materialized) {
             // collections don't change once started and handles are immutable
-            List<StateChangelogHandle> prevDeltaCopy = new 
ArrayList<>(nonMaterialized);
+            List<StateChangelogHandle> prevDeltaCopy = new 
ArrayList<>(restoredNonMaterialized);
             if (delta != null && delta.getStateSize() > 0) {
                 prevDeltaCopy.add(delta);
             }
@@ -332,21 +346,29 @@ class ChangelogKeyedStateBackend<K>
 
     @Nonnull
     @Override
+    @SuppressWarnings("unchecked")
     public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> 
& Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
-        PriorityQueueStateChangeLoggerImpl<K, T> 
priorityQueueStateChangeLogger =
-                new PriorityQueueStateChangeLoggerImpl<>(
-                        byteOrderedElementSerializer,
-                        keyedStateBackend.getKeyContext(),
-                        stateChangelogWriter,
-                        new RegisteredPriorityQueueStateBackendMetaInfo<>(
-                                stateName, byteOrderedElementSerializer));
-        return new ChangelogKeyGroupedPriorityQueue<>(
-                keyedStateBackend.create(stateName, 
byteOrderedElementSerializer),
-                priorityQueueStateChangeLogger,
-                byteOrderedElementSerializer);
+        ChangelogKeyGroupedPriorityQueue<T> queue =
+                (ChangelogKeyGroupedPriorityQueue<T>) 
priorityQueueStatesByName.get(stateName);
+        if (queue == null) {
+            PriorityQueueStateChangeLoggerImpl<K, T> 
priorityQueueStateChangeLogger =
+                    new PriorityQueueStateChangeLoggerImpl<>(
+                            byteOrderedElementSerializer,
+                            keyedStateBackend.getKeyContext(),
+                            stateChangelogWriter,
+                            new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                                    stateName, byteOrderedElementSerializer));
+            queue =
+                    new ChangelogKeyGroupedPriorityQueue<>(
+                            keyedStateBackend.create(stateName, 
byteOrderedElementSerializer),
+                            priorityQueueStateChangeLogger,
+                            byteOrderedElementSerializer);
+            priorityQueueStatesByName.put(stateName, queue);
+        }
+        return queue;
     }
 
     @VisibleForTesting
@@ -405,6 +427,10 @@ class ChangelogKeyedStateBackend<K>
                         + "This operation cannot use partitioned state.");
 
         InternalKvState<K, ?, ?> kvState = 
keyValueStatesByName.get(stateDescriptor.getName());
+        // todo: support state migration (in FLINK-23143)
+        //     This method is currently called both on recovery and on user 
access.
+        //     So keyValueStatesByName may contain an entry for user-requested 
state which will
+        //     prevent state migration (in contrast to other backends).
         if (kvState == null) {
             if (!stateDescriptor.isSerializerInitialized()) {
                 stateDescriptor.initializeSerializerUnlessSet(executionConfig);
@@ -418,6 +444,7 @@ class ChangelogKeyedStateBackend<K>
             keyValueStatesByName.put(stateDescriptor.getName(), kvState);
             keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, 
kvState);
         }
+        functionDelegationHelper.addOrUpdate(stateDescriptor);
         return (S) kvState;
     }
 
@@ -459,7 +486,23 @@ class ChangelogKeyedStateBackend<K>
                         keyedStateBackend.getKeyContext(),
                         stateChangelogWriter,
                         meta);
-        return stateFactory.create(state, kvStateChangeLogger);
+        return stateFactory.create(
+                state,
+                kvStateChangeLogger,
+                keyedStateBackend /* pass the nested backend as key context so 
that it get key updates on recovery*/);
+    }
+
+    private void completeRestore(Collection<ChangelogStateBackendHandle> 
stateHandles) {
+        if (!stateHandles.isEmpty()) {
+            synchronized (materialized) { // ensure visibility
+                for (ChangelogStateBackendHandle h : stateHandles) {
+                    if (h != null) {
+                        materialized.addAll(h.getMaterializedStateHandles());
+                        
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
+                    }
+                }
+            }
+        }
     }
 
     @Override
@@ -470,10 +513,41 @@ class ChangelogKeyedStateBackend<K>
     // Factory function interface
     private interface StateFactory {
         <K, N, SV, S extends State, IS extends S> IS create(
-                InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> 
changeLogger)
+                InternalKvState<K, N, SV> kvState,
+                KvStateChangeLogger<SV, N> changeLogger,
+                InternalKeyContext<K> keyContext)
                 throws Exception;
     }
 
+    /**
+     * @param name state name
+     * @param type state type (the only supported type currently are: {@link
+     *     BackendStateType#KEY_VALUE key value}, {@link 
BackendStateType#PRIORITY_QUEUE priority
+     *     queue})
+     * @return an existing state, i.e. the one that was already created
+     * @throws NoSuchElementException if the state wasn't created
+     * @throws UnsupportedOperationException if state type is not supported
+     */
+    public ChangelogState getExistingState(String name, BackendStateType type)
+            throws NoSuchElementException, UnsupportedOperationException {
+        ChangelogState state;
+        switch (type) {
+            case KEY_VALUE:
+                state = (ChangelogState) keyValueStatesByName.get(name);
+                break;
+            case PRIORITY_QUEUE:
+                state = priorityQueueStatesByName.get(name);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unknown state type %s (%s)", type, 
name));
+        }
+        if (state == null) {
+            throw new NoSuchElementException(String.format("%s state %s not 
found", type, name));
+        }
+        return state;
+    }
+
     private static <T> RunnableFuture<T> toRunnableFuture(CompletableFuture<T> 
f) {
         return new RunnableFuture<T>() {
             @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
index 6d6ee69..9d07797 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
@@ -22,8 +22,11 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
@@ -42,10 +45,14 @@ class ChangelogListState<K, N, V>
         extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, 
V>>
         implements InternalListState<K, N, V> {
 
+    private final InternalKeyContext<K> keyContext;
+
     ChangelogListState(
             InternalListState<K, N, V> delegatedState,
-            KvStateChangeLogger<List<V>, N> changeLogger) {
+            KvStateChangeLogger<List<V>, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         super(delegatedState, changeLogger);
+        this.keyContext = keyContext;
     }
 
     @Override
@@ -111,10 +118,18 @@ class ChangelogListState<K, N, V>
 
     @SuppressWarnings("unchecked")
     static <K, N, SV, S extends State, IS extends S> IS create(
-            InternalKvState<K, N, SV> listState, KvStateChangeLogger<SV, N> 
changeLogger) {
+            InternalKvState<K, N, SV> listState,
+            KvStateChangeLogger<SV, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         return (IS)
                 new ChangelogListState<>(
                         (InternalListState<K, N, SV>) listState,
-                        (KvStateChangeLogger<List<SV>, N>) changeLogger);
+                        (KvStateChangeLogger<List<SV>, N>) changeLogger,
+                        keyContext);
+    }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forList(delegatedState, keyContext);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
index 25cd0a5..3b35dcd 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
@@ -23,8 +23,11 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.function.ThrowingConsumer;
 
@@ -45,10 +48,14 @@ class ChangelogMapState<K, N, UK, UV>
         extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, 
N, UK, UV>>
         implements InternalMapState<K, N, UK, UV> {
 
+    private final InternalKeyContext<K> keyContext;
+
     ChangelogMapState(
             InternalMapState<K, N, UK, UV> delegatedState,
-            KvStateChangeLogger<Map<UK, UV>, N> changeLogger) {
+            KvStateChangeLogger<Map<UK, UV>, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         super(delegatedState, changeLogger);
+        this.keyContext = keyContext;
     }
 
     private Map.Entry<UK, UV> loggingMapEntry(
@@ -210,10 +217,18 @@ class ChangelogMapState<K, N, UK, UV>
 
     @SuppressWarnings("unchecked")
     static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
-            InternalKvState<K, N, SV> mapState, KvStateChangeLogger<SV, N> 
changeLogger) {
+            InternalKvState<K, N, SV> mapState,
+            KvStateChangeLogger<SV, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         return (IS)
                 new ChangelogMapState<>(
                         (InternalMapState<K, N, UK, UV>) mapState,
-                        (KvStateChangeLogger<Map<UK, UV>, N>) changeLogger);
+                        (KvStateChangeLogger<Map<UK, UV>, N>) changeLogger,
+                        keyContext);
+    }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forMap(delegatedState, keyContext);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
index 10d7d8b..068754d 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java
@@ -21,8 +21,11 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
@@ -40,9 +43,14 @@ class ChangelogReducingState<K, N, V>
         extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>>
         implements InternalReducingState<K, N, V> {
 
+    private final InternalKeyContext<K> keyContext;
+
     ChangelogReducingState(
-            InternalReducingState<K, N, V> delegatedState, 
KvStateChangeLogger<V, N> changeLogger) {
+            InternalReducingState<K, N, V> delegatedState,
+            KvStateChangeLogger<V, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         super(delegatedState, changeLogger);
+        this.keyContext = keyContext;
     }
 
     @Override
@@ -94,9 +102,16 @@ class ChangelogReducingState<K, N, V>
 
     @SuppressWarnings("unchecked")
     static <K, N, SV, S extends State, IS extends S> IS create(
-            InternalKvState<K, N, SV> reducingState, KvStateChangeLogger<SV, 
N> changeLogger) {
+            InternalKvState<K, N, SV> reducingState,
+            KvStateChangeLogger<SV, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         return (IS)
                 new ChangelogReducingState<>(
-                        (InternalReducingState<K, N, SV>) reducingState, 
changeLogger);
+                        (InternalReducingState<K, N, SV>) reducingState, 
changeLogger, keyContext);
+    }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forReducing(delegatedState, keyContext);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java
similarity index 53%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
copy to 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java
index 7ae60f0..fc77ae0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java
@@ -1,3 +1,4 @@
+package org.apache.flink.state.changelog;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,34 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.changelog;
-
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 
-/** Change of state of a keyed operator. Used for generic incremental 
checkpoints. */
+/**
+ * State used by {@link ChangelogKeyedStateBackend}. Allows replaying recorded 
changes on recovery
+ * using {@link StateChangeApplier}
+ */
 @Internal
-public class StateChange {
-
-    private final int keyGroup;
-    private final byte[] change;
-
-    public StateChange(int keyGroup, byte[] change) {
-        Preconditions.checkArgument(keyGroup >= 0);
-        this.keyGroup = keyGroup;
-        this.change = Preconditions.checkNotNull(change);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("keyGroup=%d, dataSize=%d", keyGroup, 
change.length);
-    }
-
-    public int getKeyGroup() {
-        return keyGroup;
-    }
-
-    public byte[] getChange() {
-        return change;
-    }
+public interface ChangelogState {
+    StateChangeApplier getChangeApplier(ChangelogApplierFactory factory);
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index d354203..4346370f 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
+import 
org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.BaseBackendBuilder;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -47,7 +49,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.util.Collection;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 /**
@@ -96,28 +97,26 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
             @Nonnull Collection<KeyedStateHandle> stateHandles,
             CloseableRegistry cancelStreamRegistry)
             throws Exception {
-        AbstractKeyedStateBackend<K> keyedStateBackend =
-                (AbstractKeyedStateBackend<K>)
-                        delegatedStateBackend.createKeyedStateBackend(
-                                env,
-                                jobID,
-                                operatorIdentifier,
-                                keySerializer,
-                                numberOfKeyGroups,
-                                keyGroupRange,
-                                kvStateRegistry,
-                                ttlTimeProvider,
-                                metricGroup,
-                                extractMaterializedState(stateHandles),
-                                cancelStreamRegistry);
-        // todo: FLINK-21804 get from Environment.getTaskStateManager
-        InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
-        // todo: apply state changes from non-materialized part of stateHandles
-        return new ChangelogKeyedStateBackend<>(
-                keyedStateBackend,
-                env.getExecutionConfig(),
+        return restore(
+                env,
+                operatorIdentifier,
+                keyGroupRange,
                 ttlTimeProvider,
-                changelogWriterFactory.createWriter(operatorIdentifier, 
keyGroupRange));
+                stateHandles,
+                baseHandles ->
+                        (AbstractKeyedStateBackend<K>)
+                                delegatedStateBackend.createKeyedStateBackend(
+                                        env,
+                                        jobID,
+                                        operatorIdentifier,
+                                        keySerializer,
+                                        numberOfKeyGroups,
+                                        keyGroupRange,
+                                        kvStateRegistry,
+                                        ttlTimeProvider,
+                                        metricGroup,
+                                        baseHandles,
+                                        cancelStreamRegistry));
     }
 
     @Override
@@ -135,31 +134,27 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
             CloseableRegistry cancelStreamRegistry,
             double managedMemoryFraction)
             throws Exception {
-
-        AbstractKeyedStateBackend<K> keyedStateBackend =
-                (AbstractKeyedStateBackend<K>)
-                        delegatedStateBackend.createKeyedStateBackend(
-                                env,
-                                jobID,
-                                operatorIdentifier,
-                                keySerializer,
-                                numberOfKeyGroups,
-                                keyGroupRange,
-                                kvStateRegistry,
-                                ttlTimeProvider,
-                                metricGroup,
-                                extractMaterializedState(stateHandles),
-                                cancelStreamRegistry,
-                                managedMemoryFraction);
-
-        // todo: FLINK-21804 get from Environment.getTaskStateManager
-        InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
-        // todo: apply state changes from non-materialized part of stateHandles
-        return new ChangelogKeyedStateBackend<>(
-                keyedStateBackend,
-                env.getExecutionConfig(),
+        return restore(
+                env,
+                operatorIdentifier,
+                keyGroupRange,
                 ttlTimeProvider,
-                changelogWriterFactory.createWriter(operatorIdentifier, 
keyGroupRange));
+                stateHandles,
+                baseHandles ->
+                        (AbstractKeyedStateBackend<K>)
+                                delegatedStateBackend.createKeyedStateBackend(
+                                        env,
+                                        jobID,
+                                        operatorIdentifier,
+                                        keySerializer,
+                                        numberOfKeyGroups,
+                                        keyGroupRange,
+                                        kvStateRegistry,
+                                        ttlTimeProvider,
+                                        metricGroup,
+                                        baseHandles,
+                                        cancelStreamRegistry,
+                                        managedMemoryFraction));
     }
 
     @Override
@@ -196,15 +191,35 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
         return this;
     }
 
-    private static Collection<KeyedStateHandle> extractMaterializedState(
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <K> ChangelogKeyedStateBackend<K> restore(
+            Environment env,
+            String operatorIdentifier,
+            KeyGroupRange keyGroupRange,
+            TtlTimeProvider ttlTimeProvider,
+            Collection<KeyedStateHandle> stateHandles,
+            BaseBackendBuilder<K> baseBackendBuilder)
+            throws Exception {
+        // todo: FLINK-21804 get from Environment.getTaskStateManager
+        InMemoryStateChangelogStorage changelogStorage = new 
InMemoryStateChangelogStorage();
+        return ChangelogBackendRestoreOperation.restore(
+                changelogStorage.createReader(),
+                env.getUserCodeClassLoader().asClassLoader(),
+                castHandles(stateHandles),
+                baseBackendBuilder,
+                (baseBackend, baseState) ->
+                        new ChangelogKeyedStateBackend(
+                                baseBackend,
+                                env.getExecutionConfig(),
+                                ttlTimeProvider,
+                                
changelogStorage.createWriter(operatorIdentifier, keyGroupRange),
+                                baseState));
+    }
+
+    private Collection<ChangelogStateBackendHandle> castHandles(
             Collection<KeyedStateHandle> stateHandles) {
         return stateHandles.stream()
-                .flatMap(
-                        keyedStateHandle ->
-                                
((ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl)
-                                                keyedStateHandle)
-                                        
.getMaterializedStateHandles().stream())
-                .filter(Objects::nonNull)
+                .map(keyedStateHandle -> (ChangelogStateBackendHandle) 
keyedStateHandle)
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
index 0530170..d37026b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java
@@ -21,8 +21,11 @@ package org.apache.flink.state.changelog;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
+import org.apache.flink.state.changelog.restore.StateChangeApplier;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
@@ -39,9 +42,14 @@ class ChangelogValueState<K, N, V>
         extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>>
         implements InternalValueState<K, N, V> {
 
+    private final InternalKeyContext<K> keyContext;
+
     ChangelogValueState(
-            InternalValueState<K, N, V> delegatedState, KvStateChangeLogger<V, 
N> changeLogger) {
+            InternalValueState<K, N, V> delegatedState,
+            KvStateChangeLogger<V, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         super(delegatedState, changeLogger);
+        this.keyContext = keyContext;
     }
 
     @Override
@@ -67,8 +75,16 @@ class ChangelogValueState<K, N, V>
 
     @SuppressWarnings("unchecked")
     static <K, N, SV, S extends State, IS extends S> IS create(
-            InternalKvState<K, N, SV> valueState, KvStateChangeLogger<SV, N> 
changeLogger) {
+            InternalKvState<K, N, SV> valueState,
+            KvStateChangeLogger<SV, N> changeLogger,
+            InternalKeyContext<K> keyContext) {
         return (IS)
-                new ChangelogValueState<>((InternalValueState<K, N, SV>) 
valueState, changeLogger);
+                new ChangelogValueState<>(
+                        (InternalValueState<K, N, SV>) valueState, 
changeLogger, keyContext);
+    }
+
+    @Override
+    public StateChangeApplier getChangeApplier(ChangelogApplierFactory 
factory) {
+        return factory.forValue(delegatedState, keyContext);
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index 5615ce7..b4b2350 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -28,7 +28,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.Collection;
 
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS;
+import static org.apache.flink.state.changelog.StateChangeOperation.MERGE_NS;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 @NotThreadSafe
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
new file mode 100644
index 0000000..c0a75bd
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The operation applied to {@link ChangelogState}. */
+@Internal
+public enum StateChangeOperation {
+    /** Scope: key + namespace. */
+    CLEAR((byte) 0),
+    /** Scope: key + namespace. */
+    SET((byte) 1),
+    /** Scope: key + namespace. */
+    SET_INTERNAL((byte) 2),
+    /** Scope: key + namespace. */
+    ADD((byte) 3),
+    /** Scope: key + namespace, also affecting other (source) namespaces. */
+    MERGE_NS((byte) 4),
+    /** Scope: key + namespace + element (e.g. user list append). */
+    ADD_ELEMENT((byte) 5),
+    /** Scope: key + namespace + element (e.g. user map key put). */
+    ADD_OR_UPDATE_ELEMENT((byte) 6),
+    /** Scope: key + namespace + element (e.g. user map remove or iterator 
remove). */
+    REMOVE_ELEMENT((byte) 7),
+    /** Scope: key + namespace, first element (e.g. priority queue poll). */
+    REMOVE_FIRST_ELEMENT((byte) 8),
+    /** State metadata (name, serializers, etc.). */
+    METADATA((byte) 9);
+    private final byte code;
+
+    StateChangeOperation(byte code) {
+        this.code = code;
+    }
+
+    private static final Map<Byte, StateChangeOperation> BY_CODES =
+            Arrays.stream(StateChangeOperation.values())
+                    .collect(Collectors.toMap(o -> o.code, 
Function.identity()));
+
+    public static StateChangeOperation byCode(byte opCode) {
+        return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode));
+    }
+
+    public byte getCode() {
+        return code;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
new file mode 100644
index 0000000..aabe8cd
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class AggregatingStateChangeApplier<K, N, IN, SV, OUT> extends 
KvStateChangeApplier<K, N> {
+    private final InternalAggregatingState<K, N, IN, SV, OUT> state;
+
+    protected AggregatingStateChangeApplier(
+            InternalKeyContext<K> keyContext, InternalAggregatingState<K, N, 
IN, SV, OUT> state) {
+        super(keyContext);
+        this.state = state;
+    }
+
+    @Override
+    protected InternalKvState<K, N, ?> getState() {
+        return state;
+    }
+
+    protected void applyInternal(StateChangeOperation operation, DataInputView 
in)
+            throws Exception {
+        switch (operation) {
+            case SET:
+            case SET_INTERNAL:
+                
state.updateInternal(state.getValueSerializer().deserialize(in));
+                break;
+            case MERGE_NS:
+                applyMergeNamespaces(state, in);
+                break;
+            case CLEAR:
+                state.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown state change 
operation: " + operation);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
new file mode 100644
index 0000000..7c6c059
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+/**
+ * {@link StateChangeApplier} factory. It's purpose is to decouple 
restore/apply logic from state
+ * logic.
+ */
+@Internal
+public interface ChangelogApplierFactory {
+
+    <K, N, UK, UV> KvStateChangeApplier<K, N> forMap(
+            InternalMapState<K, N, UK, UV> map, InternalKeyContext<K> 
keyContext);
+
+    <K, N, T> KvStateChangeApplier<K, N> forValue(
+            InternalValueState<K, N, T> value, InternalKeyContext<K> 
keyContext);
+
+    <K, N, T> KvStateChangeApplier<K, N> forList(
+            InternalListState<K, N, T> list, InternalKeyContext<K> keyContext);
+
+    <K, N, T> KvStateChangeApplier<K, N> forReducing(
+            InternalReducingState<K, N, T> reducing, InternalKeyContext<K> 
keyContext);
+
+    <K, N, IN, SV, OUT> KvStateChangeApplier<K, N> forAggregating(
+            InternalAggregatingState<K, N, IN, SV, OUT> aggregating,
+            InternalKeyContext<K> keyContext);
+
+    <T> StateChangeApplier forPriorityQueue(
+            KeyGroupedInternalPriorityQueue<T> priorityQueue, 
TypeSerializer<T> serializer);
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
new file mode 100644
index 0000000..b11df22
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+class ChangelogApplierFactoryImpl implements ChangelogApplierFactory {
+    public static final ChangelogApplierFactoryImpl INSTANCE = new 
ChangelogApplierFactoryImpl();
+
+    private ChangelogApplierFactoryImpl() {}
+
+    @Override
+    public <K, N, UK, UV> KvStateChangeApplier<K, N> forMap(
+            InternalMapState<K, N, UK, UV> map, InternalKeyContext<K> 
keyContext) {
+        return new MapStateChangeApplier<>(map, keyContext);
+    }
+
+    @Override
+    public <K, N, T> KvStateChangeApplier<K, N> forList(
+            InternalListState<K, N, T> state, InternalKeyContext<K> 
keyContext) {
+        return new ListStateChangeApplier<>(keyContext, state);
+    }
+
+    @Override
+    public <K, N, T> KvStateChangeApplier<K, N> forReducing(
+            InternalReducingState<K, N, T> state, InternalKeyContext<K> 
keyContext) {
+        return new ReducingStateChangeApplier<>(keyContext, state);
+    }
+
+    @Override
+    public <K, N, IN, SV, OUT> KvStateChangeApplier<K, N> forAggregating(
+            InternalAggregatingState<K, N, IN, SV, OUT> state, 
InternalKeyContext<K> keyContext) {
+        return new AggregatingStateChangeApplier<>(keyContext, state);
+    }
+
+    @Override
+    public <K, N, T> KvStateChangeApplier<K, N> forValue(
+            InternalValueState<K, N, T> state, InternalKeyContext<K> 
keyContext) {
+        return new ValueStateChangeApplier<>(keyContext, state);
+    }
+
+    @Override
+    public <T> StateChangeApplier forPriorityQueue(
+            KeyGroupedInternalPriorityQueue<T> queue, TypeSerializer<T> 
serializer) {
+        return new PriorityQueueStateChangeApplier<>(queue, serializer);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
new file mode 100644
index 0000000..bd26dbb
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader;
+import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
+import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
+import org.apache.flink.state.changelog.ChangelogState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE;
+import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
+import static 
org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateAggregateFunction;
+import static 
org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateReduceFunction;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Applies {@link StateChange}s to a {@link ChangelogKeyedStateBackend}. */
+@SuppressWarnings({"rawtypes", "unchecked"})
+class ChangelogBackendLogApplier {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogBackendLogApplier.class);
+
+    public static void apply(
+            StateChange stateChange,
+            ChangelogKeyedStateBackend<?> changelogBackend,
+            ClassLoader classLoader)
+            throws Exception {
+        DataInputViewStreamWrapper in =
+                new DataInputViewStreamWrapper(new 
ByteArrayInputStream(stateChange.getChange()));
+        applyOperation(
+                StateChangeOperation.byCode(in.readByte()),
+                stateChange.getKeyGroup(),
+                changelogBackend,
+                in,
+                classLoader,
+                ChangelogApplierFactoryImpl.INSTANCE);
+    }
+
+    private static void applyOperation(
+            StateChangeOperation operation,
+            int keyGroup,
+            ChangelogKeyedStateBackend<?> backend,
+            DataInputView in,
+            ClassLoader classLoader,
+            ChangelogApplierFactory factory)
+            throws Exception {
+        LOG.debug("apply {} in key group {}", operation, keyGroup);
+        if (operation == METADATA) {
+            applyMetaDataChange(in, backend, classLoader);
+        } else if (backend.getKeyGroupRange().contains(keyGroup)) {
+            applyDataChange(in, factory, backend, operation);
+        }
+    }
+
+    private static void applyMetaDataChange(
+            DataInputView in, ChangelogKeyedStateBackend<?> backend, 
ClassLoader classLoader)
+            throws Exception {
+        StateMetaInfoSnapshot snapshot = readStateMetaInfoSnapshot(in, 
classLoader);
+        switch (snapshot.getBackendStateType()) {
+            case KEY_VALUE:
+                restoreKvMetaData(backend, snapshot);
+                return;
+            case PRIORITY_QUEUE:
+                restorePqMetaData(backend, snapshot);
+                return;
+            default:
+                throw new RuntimeException(
+                        "Unsupported state type: "
+                                + snapshot.getBackendStateType()
+                                + ", sate: "
+                                + snapshot.getName());
+        }
+    }
+
+    private static void restoreKvMetaData(
+            ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot 
snapshot)
+            throws Exception {
+        RegisteredKeyValueStateBackendMetaInfo meta =
+                new RegisteredKeyValueStateBackendMetaInfo(snapshot);
+        // Use regular API to create states in both changelog and the base 
backends the metadata is
+        // persisted in log before data changes.
+        // An alternative solution to load metadata "natively" by the base 
backends would require
+        // base state to be always present, i.e. the 1st checkpoint would have 
to be "full" always.
+        backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), 
toStateDescriptor(meta));
+    }
+
+    private static StateDescriptor 
toStateDescriptor(RegisteredKeyValueStateBackendMetaInfo meta) {
+        switch (meta.getStateType()) {
+            case VALUE:
+                return new ValueStateDescriptor(meta.getName(), 
meta.getStateSerializer());
+            case MAP:
+                MapSerializer mapSerializer = (MapSerializer) 
meta.getStateSerializer();
+                return new MapStateDescriptor(
+                        meta.getName(),
+                        mapSerializer.getKeySerializer(),
+                        mapSerializer.getValueSerializer());
+            case LIST:
+                return new ListStateDescriptor(
+                        meta.getName(),
+                        ((ListSerializer) 
meta.getStateSerializer()).getElementSerializer());
+            case AGGREGATING:
+                return new AggregatingStateDescriptor(
+                        meta.getName(), delegateAggregateFunction(), 
meta.getStateSerializer());
+            case REDUCING:
+                return new ReducingStateDescriptor(
+                        meta.getName(), delegateReduceFunction(), 
meta.getStateSerializer());
+            default:
+                throw new 
IllegalArgumentException(meta.getStateType().toString());
+        }
+    }
+
+    private static void restorePqMetaData(
+            ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot 
snapshot) {
+        RegisteredPriorityQueueStateBackendMetaInfo meta =
+                new RegisteredPriorityQueueStateBackendMetaInfo(snapshot);
+        backend.create(meta.getName(), meta.getElementSerializer());
+    }
+
+    private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(
+            DataInputView in, ClassLoader classLoader) throws IOException {
+        int version = in.readInt();
+        StateMetaInfoReader reader =
+                StateMetaInfoSnapshotReadersWriters.getReader(version, 
KEYED_STATE);
+        return reader.readStateMetaInfoSnapshot(in, classLoader);
+    }
+
+    private static void applyDataChange(
+            DataInputView in,
+            ChangelogApplierFactory factory,
+            ChangelogKeyedStateBackend<?> backend,
+            StateChangeOperation operation)
+            throws Exception {
+        String name = checkNotNull(in.readUTF());
+        BackendStateType type = BackendStateType.byCode(in.readByte());
+        ChangelogState state = backend.getExistingState(name, type);
+        StateChangeApplier changeApplier = state.getChangeApplier(factory);
+        changeApplier.apply(operation, in);
+    }
+
+    private ChangelogBackendLogApplier() {}
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
new file mode 100644
index 0000000..528e0ec
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
+import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Restores {@link ChangelogKeyedStateBackend} from the provided {@link 
ChangelogStateBackendHandle
+ * handles}.
+ */
+@Internal
+public class ChangelogBackendRestoreOperation {
+    /** Builds base backend for {@link ChangelogKeyedStateBackend} from state. 
*/
+    @FunctionalInterface
+    public interface BaseBackendBuilder<K>
+            extends FunctionWithException<
+                    Collection<KeyedStateHandle>, 
AbstractKeyedStateBackend<K>, Exception> {}
+
+    /** Builds {@link ChangelogKeyedStateBackend} from the base backend and 
state. */
+    @FunctionalInterface
+    public interface DeltaBackendBuilder<K>
+            extends BiFunctionWithException<
+                    AbstractKeyedStateBackend<K>,
+                    Collection<ChangelogStateBackendHandle>,
+                    ChangelogKeyedStateBackend<K>,
+                    Exception> {}
+
+    public static <K, T extends StateChangelogHandle> 
ChangelogKeyedStateBackend<K> restore(
+            StateChangelogHandleReader<T> changelogHandleReader,
+            ClassLoader classLoader,
+            Collection<ChangelogStateBackendHandle> stateHandles,
+            BaseBackendBuilder<K> baseBackendBuilder,
+            DeltaBackendBuilder<K> changelogBackendBuilder)
+            throws Exception {
+        Collection<KeyedStateHandle> baseState = 
extractBaseState(stateHandles);
+        AbstractKeyedStateBackend<K> baseBackend = 
baseBackendBuilder.apply(baseState);
+        ChangelogKeyedStateBackend<K> changelogBackend =
+                changelogBackendBuilder.apply(baseBackend, stateHandles);
+
+        for (ChangelogStateBackendHandle handle : stateHandles) {
+            if (handle != null) { // null is empty state (no change)
+                readBackendHandle(changelogBackend, handle, 
changelogHandleReader, classLoader);
+            }
+        }
+        return changelogBackend;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T extends StateChangelogHandle> void readBackendHandle(
+            ChangelogKeyedStateBackend<?> backend,
+            ChangelogStateBackendHandle backendHandle,
+            StateChangelogHandleReader<T> changelogHandleReader,
+            ClassLoader classLoader)
+            throws Exception {
+        for (StateChangelogHandle changelogHandle :
+                backendHandle.getNonMaterializedStateHandles()) {
+            try (CloseableIterator<StateChange> changes =
+                    changelogHandleReader.getChanges((T) changelogHandle)) {
+                while (changes.hasNext()) {
+                    ChangelogBackendLogApplier.apply(changes.next(), backend, 
classLoader);
+                }
+            }
+        }
+    }
+
+    private static Collection<KeyedStateHandle> extractBaseState(
+            Collection<ChangelogStateBackendHandle> stateHandles) {
+        Preconditions.checkNotNull(stateHandles);
+        return stateHandles.stream()
+                .filter(Objects::nonNull)
+                .map(ChangelogStateBackendHandle::getMaterializedStateHandles)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+    private ChangelogBackendRestoreOperation() {}
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java
new file mode 100644
index 0000000..4714e6c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * {@link DelegatingFunction Delegating functions} are used to create metadata 
on recovery when the
+ * actual function code is not known yet. Once the actual function is known, 
backend updates the
+ * delegate which starts receiving the calls.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+@Internal
+public class FunctionDelegationHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FunctionDelegationHelper.class);
+
+    public static <T> ReduceFunction<T> delegateReduceFunction() {
+        return new DelegatingReduceFunction<>();
+    }
+
+    public static <IN, ACC, OUT> AggregateFunction<IN, ACC, OUT> 
delegateAggregateFunction() {
+        return new DelegatingAggregateFunction<>();
+    }
+
+    private interface DelegatingFunction<F> extends Function {
+        void delegateIfNeeded(F delegated);
+    }
+
+    private final Map<String, DelegatingFunction> delegatingFunctions = new 
HashMap<>();
+
+    public <T, S extends State, F> void addOrUpdate(StateDescriptor<S, T> 
stateDescriptor) {
+        F function = tryGetFunction(stateDescriptor);
+        String name = stateDescriptor.getName();
+        if (function instanceof DelegatingFunction) {
+            LOG.debug("add delegate: {}", name);
+            delegatingFunctions.putIfAbsent(name, (DelegatingFunction<?>) 
function);
+        } else {
+            DelegatingFunction<F> delegating = delegatingFunctions.get(name);
+            if (delegating != null) {
+                LOG.debug("update delegate: {}", name);
+                checkState(function != null, "unable to extract function for 
state " + name);
+                delegating.delegateIfNeeded(function);
+            }
+        }
+    }
+
+    @Nullable
+    private static <F extends Function> F tryGetFunction(StateDescriptor<?, ?> 
stateDescriptor) {
+        if (stateDescriptor instanceof ReducingStateDescriptor) {
+            return (F) ((ReducingStateDescriptor) 
stateDescriptor).getReduceFunction();
+        } else if (stateDescriptor instanceof AggregatingStateDescriptor) {
+            return (F) ((AggregatingStateDescriptor) 
stateDescriptor).getAggregateFunction();
+        } else {
+            return null;
+        }
+    }
+
+    static class DelegatingAggregateFunction<IN, ACC, OUT>
+            implements AggregateFunction<IN, ACC, OUT>,
+                    DelegatingFunction<AggregateFunction<IN, ACC, OUT>> {
+        private static final long serialVersionUID = 1L;
+
+        @Nullable private AggregateFunction<IN, ACC, OUT> delegated;
+
+        @Override
+        public void delegateIfNeeded(AggregateFunction<IN, ACC, OUT> 
delegated) {
+            if (this.delegated == null) {
+                this.delegated = checkNotNull(delegated);
+            }
+        }
+
+        @Override
+        public ACC createAccumulator() {
+            checkNotNull(delegated);
+            return delegated.createAccumulator();
+        }
+
+        @Override
+        public ACC add(IN value, ACC accumulator) {
+            checkNotNull(delegated);
+            return delegated.add(value, accumulator);
+        }
+
+        @Override
+        public OUT getResult(ACC accumulator) {
+            checkNotNull(delegated);
+            return delegated.getResult(accumulator);
+        }
+
+        @Override
+        public ACC merge(ACC a, ACC b) {
+            checkNotNull(delegated);
+            return delegated.merge(a, b);
+        }
+    }
+
+    static class DelegatingReduceFunction<T>
+            implements ReduceFunction<T>, 
DelegatingFunction<ReduceFunction<T>> {
+        private static final long serialVersionUID = 1L;
+
+        @Nullable private ReduceFunction<T> delegated;
+
+        @Override
+        public T reduce(T left, T right) throws Exception {
+            checkNotNull(delegated);
+            return delegated.reduce(left, right);
+        }
+
+        @Override
+        public void delegateIfNeeded(ReduceFunction<T> delegated) {
+            if (this.delegated == null) {
+                this.delegated = checkNotNull(delegated);
+            }
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
new file mode 100644
index 0000000..0fe8b99
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+abstract class KvStateChangeApplier<K, N> implements StateChangeApplier {
+    private final InternalKeyContext<K> keyContext;
+
+    protected abstract InternalKvState<K, N, ?> getState();
+
+    protected KvStateChangeApplier(InternalKeyContext<K> keyContext) {
+        this.keyContext = keyContext;
+    }
+
+    @Override
+    public void apply(StateChangeOperation operation, DataInputView in) throws 
Exception {
+        K key = getState().getKeySerializer().deserialize(in);
+        keyContext.setCurrentKey(key);
+        keyContext.setCurrentKeyGroupIndex(
+                KeyGroupRangeAssignment.assignToKeyGroup(key, 
keyContext.getNumberOfKeyGroups()));
+        
getState().setCurrentNamespace(getState().getNamespaceSerializer().deserialize(in));
+        applyInternal(operation, in);
+    }
+
+    protected abstract void applyInternal(StateChangeOperation operation, 
DataInputView in)
+            throws Exception;
+
+    protected static <K, N, T> void applyMergeNamespaces(
+            InternalMergingState<K, N, T, ?, ?> state, DataInputView in) 
throws Exception {
+        N target = state.getNamespaceSerializer().deserialize(in);
+        int sourcesSize = in.readInt();
+        Collection<N> sources = new ArrayList<>(sourcesSize);
+        for (int i = 0; i < sourcesSize; i++) {
+            sources.add(state.getNamespaceSerializer().deserialize(in));
+        }
+        state.mergeNamespaces(target, sources);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
new file mode 100644
index 0000000..1f7b05e
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class ListStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> {
+    private final InternalListState<K, N, T> state;
+
+    protected ListStateChangeApplier(
+            InternalKeyContext<K> keyContext, InternalListState<K, N, T> 
state) {
+        super(keyContext);
+        this.state = state;
+    }
+
+    @Override
+    protected InternalKvState<K, N, ?> getState() {
+        return state;
+    }
+
+    protected void applyInternal(StateChangeOperation operation, DataInputView 
in)
+            throws Exception {
+        switch (operation) {
+            case ADD:
+                state.addAll(state.getValueSerializer().deserialize(in));
+                break;
+            case ADD_ELEMENT:
+                state.add(
+                        ((ListSerializer<T>) state.getValueSerializer())
+                                .getElementSerializer()
+                                .deserialize(in));
+                break;
+            case SET:
+                state.update(state.getValueSerializer().deserialize(in));
+                break;
+            case SET_INTERNAL:
+                
state.updateInternal(state.getValueSerializer().deserialize(in));
+                break;
+            case MERGE_NS:
+                applyMergeNamespaces(state, in);
+                break;
+            case CLEAR:
+                state.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown state change 
operation: " + operation);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
new file mode 100644
index 0000000..b498329
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class MapStateChangeApplier<K, N, UK, UV> extends KvStateChangeApplier<K, N> {
+    private final InternalMapState<K, N, UK, UV> mapState;
+    private final MapSerializer<UK, UV> mapSerializer;
+
+    protected MapStateChangeApplier(
+            InternalMapState<K, N, UK, UV> mapState, InternalKeyContext<K> 
keyContext) {
+        super(keyContext);
+        this.mapState = mapState;
+        this.mapSerializer = (MapSerializer<UK, UV>) 
mapState.getValueSerializer();
+    }
+
+    @Override
+    protected InternalKvState<K, N, ?> getState() {
+        return mapState;
+    }
+
+    protected void applyInternal(StateChangeOperation operation, DataInputView 
in)
+            throws Exception {
+        switch (operation) {
+            case ADD:
+                mapState.putAll(mapSerializer.deserialize(in));
+                break;
+            case ADD_ELEMENT:
+            case ADD_OR_UPDATE_ELEMENT:
+                mapState.put(
+                        mapSerializer.getKeySerializer().deserialize(in),
+                        mapSerializer.getValueSerializer().deserialize(in));
+                break;
+            case REMOVE_ELEMENT:
+                
mapState.remove(mapSerializer.getKeySerializer().deserialize(in));
+                break;
+            case CLEAR:
+                mapState.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown state change 
operation: " + operation);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
new file mode 100644
index 0000000..b3a5894
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class PriorityQueueStateChangeApplier<T> implements StateChangeApplier {
+    private final KeyGroupedInternalPriorityQueue<T> queue;
+    private final TypeSerializer<T> serializer;
+
+    public PriorityQueueStateChangeApplier(
+            KeyGroupedInternalPriorityQueue<T> queue, TypeSerializer<T> 
serializer) {
+        this.queue = queue;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public void apply(StateChangeOperation operation, DataInputView in) throws 
Exception {
+        switch (operation) {
+            case REMOVE_FIRST_ELEMENT:
+                queue.poll();
+                break;
+            case ADD_ELEMENT:
+                int numElements = in.readInt();
+                for (int i = 0; i < numElements; i++) {
+                    queue.add(serializer.deserialize(in));
+                }
+                break;
+            case REMOVE_ELEMENT:
+                queue.remove(serializer.deserialize(in));
+                break;
+            default:
+                throw new UnsupportedOperationException(operation.name());
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
new file mode 100644
index 0000000..80aeaf9
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class ReducingStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> {
+    private final InternalReducingState<K, N, T> state;
+
+    protected ReducingStateChangeApplier(
+            InternalKeyContext<K> keyContext, InternalReducingState<K, N, T> 
state) {
+        super(keyContext);
+        this.state = state;
+    }
+
+    @Override
+    protected InternalKvState<K, N, ?> getState() {
+        return state;
+    }
+
+    protected void applyInternal(StateChangeOperation operation, DataInputView 
in)
+            throws Exception {
+        switch (operation) {
+            case SET:
+            case SET_INTERNAL:
+                
state.updateInternal(state.getValueSerializer().deserialize(in));
+                break;
+            case CLEAR:
+                state.clear();
+                break;
+            case MERGE_NS:
+                applyMergeNamespaces(state, in);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown state change 
operation: " + operation);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java
similarity index 53%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
copy to 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java
index 7ae60f0..3b120ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java
@@ -15,34 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.state.changelog;
+package org.apache.flink.state.changelog.restore;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.state.changelog.StateChangeOperation;
 
-/** Change of state of a keyed operator. Used for generic incremental 
checkpoints. */
+/** Applies state data change to some state. */
 @Internal
-public class StateChange {
-
-    private final int keyGroup;
-    private final byte[] change;
-
-    public StateChange(int keyGroup, byte[] change) {
-        Preconditions.checkArgument(keyGroup >= 0);
-        this.keyGroup = keyGroup;
-        this.change = Preconditions.checkNotNull(change);
-    }
-
-    @Override
-    public String toString() {
-        return String.format("keyGroup=%d, dataSize=%d", keyGroup, 
change.length);
-    }
-
-    public int getKeyGroup() {
-        return keyGroup;
-    }
-
-    public byte[] getChange() {
-        return change;
-    }
+public interface StateChangeApplier {
+    void apply(StateChangeOperation operation, DataInputView in) throws 
Exception;
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
new file mode 100644
index 0000000..c32db7d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.restore;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.state.changelog.StateChangeOperation;
+
+class ValueStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> {
+    private final InternalValueState<K, N, T> state;
+
+    protected ValueStateChangeApplier(
+            InternalKeyContext<K> keyContext, InternalValueState<K, N, T> 
state) {
+        super(keyContext);
+        this.state = state;
+    }
+
+    @Override
+    protected InternalKvState<K, N, ?> getState() {
+        return state;
+    }
+
+    protected void applyInternal(StateChangeOperation operation, DataInputView 
in)
+            throws Exception {
+        switch (operation) {
+            case SET:
+                state.update(state.getValueSerializer().deserialize(in));
+                break;
+            case CLEAR:
+                state.clear();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown state change 
operation: " + operation);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
index 950546c..9a7420b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -202,7 +204,10 @@ public class ChangelogListStateTest {
 
     private static ChangelogListState createState(List<String> data, 
TestChangeLoggerKv logger) {
         ChangelogListState state =
-                new ChangelogListState<>(new TestingInternalListState(data), 
logger);
+                new ChangelogListState<>(
+                        new TestingInternalListState(data),
+                        logger,
+                        new 
InternalKeyContextImpl<>(KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 0));
         state.setCurrentNamespace("ns0");
         return state;
     }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
index c6bf06d..f28befc 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -246,7 +248,10 @@ public class ChangelogMapStateTest {
     private static ChangelogMapState createState(
             Map<String, String> data, TestChangeLoggerKv logger) {
         ChangelogMapState state =
-                new ChangelogMapState<>(new TestingInternalMapState(data), 
logger);
+                new ChangelogMapState<>(
+                        new TestingInternalMapState(data),
+                        logger,
+                        new 
InternalKeyContextImpl<>(KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 0));
         state.setCurrentNamespace("ns0");
         return state;
     }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
index bfea119..c52a424 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -21,14 +21,13 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
-import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
 
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Optional;
 
 import static org.apache.flink.api.common.state.StateDescriptor.Type.VALUE;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS;
+import static org.apache.flink.state.changelog.StateChangeOperation.MERGE_NS;
 
 /** {@link KvStateChangeLoggerImpl} test. */
 public class KvStateChangeLoggerImplTest extends 
StateChangeLoggerTestBase<String> {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
index b6bf280..672682d 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -21,12 +21,11 @@ import 
org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
-import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
 
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_FIRST_ELEMENT;
+import static 
org.apache.flink.state.changelog.StateChangeOperation.REMOVE_FIRST_ELEMENT;
 
 /** {@link PriorityQueueStateChangeLoggerImpl} test. */
 public class PriorityQueueStateChangeLoggerImplTest extends 
StateChangeLoggerTestBase<Void> {
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 1d2572c..ae87745 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
-import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
 
 import org.junit.Test;
 
@@ -33,7 +32,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
-import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
+import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
 import static org.junit.Assert.assertEquals;
 
 abstract class StateChangeLoggerTestBase<Namespace> {

Reply via email to