This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48270e3e2d7d48bdfce4be1caadda1e7f85000a4
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 2 17:52:12 2021 +0200

    [FLINK-21356][state/changelog] Implement checkpointing using changelog
    
    Both materialized and non-materialized state are checkpointed.
    But materialization and recovery from changelog will be implemented in 
subsequent commits.
    For now,
    - changelog grows indefinitely (not truncated)
    - state changes are not applied on recovery, tests fail with incorrect 
results
---
 .../metadata/MetadataV2V3SerializerBase.java       |  38 ++++++
 .../changelog/ChangelogStateBackendHandle.java     | 143 +++++++++++++++++++++
 .../inmemory/InMemoryStateChangelogHandle.java     |   2 +-
 .../inmemory/InMemoryStateChangelogWriter.java     |  11 +-
 .../changelog/ChangelogKeyedStateBackend.java      | 140 +++++++++++++++++++-
 .../state/changelog/ChangelogStateBackend.java     |  21 ++-
 6 files changed, 343 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 0ea7337..1be1fa5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -38,7 +38,9 @@ import 
org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import org.apache.flink.runtime.state.changelog.StateChange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
@@ -99,6 +101,7 @@ public abstract class MetadataV2V3SerializerBase {
     private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
     private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
     private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7;
+    private static final byte CHANGELOG_HANDLE = 8;
     private static final byte CHANGELOG_BYTE_INCREMENT_HANDLE = 9;
     private static final byte CHANGELOG_FILE_INCREMENT_HANDLE = 10;
 
@@ -316,6 +319,23 @@ public abstract class MetadataV2V3SerializerBase {
 
             
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), 
dos);
             
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), 
dos);
+        } else if (stateHandle instanceof ChangelogStateBackendHandle) {
+            ChangelogStateBackendHandle handle = (ChangelogStateBackendHandle) 
stateHandle;
+
+            dos.writeByte(CHANGELOG_HANDLE);
+
+            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
+            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
+
+            dos.writeInt(handle.getMaterializedStateHandles().size());
+            for (KeyedStateHandle k : handle.getMaterializedStateHandles()) {
+                serializeKeyedStateHandle(k, dos);
+            }
+
+            dos.writeInt(handle.getNonMaterializedStateHandles().size());
+            for (KeyedStateHandle k : handle.getNonMaterializedStateHandles()) 
{
+                serializeKeyedStateHandle(k, dos);
+            }
 
         } else if (stateHandle instanceof InMemoryStateChangelogHandle) {
             InMemoryStateChangelogHandle handle = 
(InMemoryStateChangelogHandle) stateHandle;
@@ -406,6 +426,24 @@ public abstract class MetadataV2V3SerializerBase {
                     sharedStates,
                     privateStates,
                     metaDataStateHandle);
+        } else if (CHANGELOG_HANDLE == type) {
+
+            int startKeyGroup = dis.readInt();
+            int numKeyGroups = dis.readInt();
+            KeyGroupRange keyGroupRange =
+                    KeyGroupRange.of(startKeyGroup, startKeyGroup + 
numKeyGroups - 1);
+            int baseSize = dis.readInt();
+            List<KeyedStateHandle> base = new ArrayList<>(baseSize);
+            for (int i = 0; i < baseSize; i++) {
+                base.add(deserializeKeyedStateHandle(dis, context));
+            }
+            int deltaSize = dis.readInt();
+            List<StateChangelogHandle> delta = new ArrayList<>(deltaSize);
+            for (int i = 0; i < deltaSize; i++) {
+                delta.add((StateChangelogHandle) 
deserializeKeyedStateHandle(dis, context));
+            }
+            return new 
ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(
+                    base, delta, keyGroupRange);
 
         } else if (CHANGELOG_BYTE_INCREMENT_HANDLE == type) {
             int start = dis.readInt();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
new file mode 100644
index 0000000..7940210
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableList;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A handle to ChangelogStateBackend state. Consists of the base and delta 
parts. Base part
+ * references materialized state (e.g. SST files), while delta part references 
state changes that
+ * were not not materialized at the time of the snapshot. Both are potentially 
empty lists as there
+ * can be no state or multiple states (e.g. after rescaling).
+ */
+@Internal
+public interface ChangelogStateBackendHandle extends KeyedStateHandle {
+    List<KeyedStateHandle> getMaterializedStateHandles();
+
+    List<StateChangelogHandle> getNonMaterializedStateHandles();
+
+    class ChangelogStateBackendHandleImpl implements 
ChangelogStateBackendHandle {
+        private static final long serialVersionUID = 1L;
+        private final List<KeyedStateHandle> materialized;
+        private final List<StateChangelogHandle> nonMaterialized;
+        private final KeyGroupRange keyGroupRange;
+
+        public ChangelogStateBackendHandleImpl(
+                List<KeyedStateHandle> materialized,
+                List<StateChangelogHandle> nonMaterialized,
+                KeyGroupRange keyGroupRange) {
+            this.materialized = unmodifiableList(materialized);
+            this.nonMaterialized = unmodifiableList(nonMaterialized);
+            this.keyGroupRange = keyGroupRange;
+            checkArgument(keyGroupRange.getNumberOfKeyGroups() > 0);
+        }
+
+        @Override
+        public void registerSharedStates(SharedStateRegistry stateRegistry) {
+            stateRegistry.registerAll(materialized);
+            stateRegistry.registerAll(nonMaterialized);
+        }
+
+        @Override
+        public void discardState() throws Exception {
+            try (Closer closer = Closer.create()) {
+                materialized.forEach(h -> closer.register(asCloseable(h)));
+                nonMaterialized.forEach(h -> closer.register(asCloseable(h)));
+            }
+        }
+
+        @Override
+        public KeyGroupRange getKeyGroupRange() {
+            return keyGroupRange;
+        }
+
+        @Nullable
+        @Override
+        public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+            // todo: revisit/review
+            KeyGroupRange intersection = 
this.keyGroupRange.getIntersection(keyGroupRange);
+            if (intersection.getNumberOfKeyGroups() == 0) {
+                return null;
+            }
+            List<KeyedStateHandle> basePart =
+                    this.materialized.stream()
+                            .map(handle -> 
handle.getIntersection(keyGroupRange))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+            List<StateChangelogHandle> deltaPart =
+                    this.nonMaterialized.stream()
+                            .map(
+                                    handle ->
+                                            (StateChangelogHandle)
+                                                    
handle.getIntersection(keyGroupRange))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+            return new ChangelogStateBackendHandleImpl(basePart, deltaPart, 
intersection);
+        }
+
+        @Override
+        public long getStateSize() {
+            return 
materialized.stream().mapToLong(StateObject::getStateSize).sum()
+                    + 
nonMaterialized.stream().mapToLong(StateObject::getStateSize).sum();
+        }
+
+        @Override
+        public List<KeyedStateHandle> getMaterializedStateHandles() {
+            return materialized;
+        }
+
+        @Override
+        public List<StateChangelogHandle> getNonMaterializedStateHandles() {
+            return nonMaterialized;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "keyGroupRange=%s, basePartSize=%d, deltaPartSize=%d",
+                    keyGroupRange, materialized.size(), 
nonMaterialized.size());
+        }
+
+        private static Closeable asCloseable(KeyedStateHandle h) {
+            return () -> {
+                try {
+                    h.discardState();
+                } catch (Exception e) {
+                    ExceptionUtils.rethrowIOException(e);
+                }
+            };
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
index f492cad..8735f98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -84,7 +84,7 @@ public class InMemoryStateChangelogHandle implements 
StateChangelogHandle {
 
     @Override
     public void registerSharedStates(SharedStateRegistry stateRegistry) {
-        throw new UnsupportedOperationException();
+        // do nothing
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index c63fb93..3c15984 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -104,16 +104,13 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
 
     @Override
     public void truncate(SequenceNumber before) {
-        changesByKeyGroup.forEach((k, v) -> {});
+        changesByKeyGroup.forEach(
+                (kg, changesBySqn) -> changesBySqn.headMap(before, 
false).clear());
     }
 
     @Override
-    public void confirm(SequenceNumber from, SequenceNumber to) {
-        throw new UnsupportedOperationException();
-    }
+    public void confirm(SequenceNumber from, SequenceNumber to) {}
 
     @Override
-    public void reset(SequenceNumber from, SequenceNumber to) {
-        throw new UnsupportedOperationException();
-    }
+    public void reset(SequenceNumber from, SequenceNumber to) {}
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index abe7568..7293c7b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -44,6 +44,9 @@ import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
 import org.apache.flink.runtime.state.TestableKeyedStateBackend;
+import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -52,12 +55,23 @@ import org.apache.flink.runtime.state.ttl.TtlStateFactory;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.FlinkRuntimeException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -74,6 +88,7 @@ class ChangelogKeyedStateBackend<K>
         implements CheckpointableKeyedStateBackend<K>,
                 CheckpointListener,
                 TestableKeyedStateBackend<K> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogKeyedStateBackend.class);
 
     private static final Map<StateDescriptor.Type, StateFactory> 
STATE_FACTORIES =
             Stream.of(
@@ -109,6 +124,8 @@ class ChangelogKeyedStateBackend<K>
 
     private final StateChangelogWriter<?> stateChangelogWriter;
 
+    private long lastCheckpointId = -1L;
+
     /** last accessed partitioned state. */
     @SuppressWarnings("rawtypes")
     private InternalKvState lastState;
@@ -116,6 +133,39 @@ class ChangelogKeyedStateBackend<K>
     /** For caching the last accessed partitioned state. */
     private String lastName;
 
+    /** Updated initially on restore and later upon materialization (in 
FLINK-21357). */
+    @GuardedBy("materialized")
+    private final List<KeyedStateHandle> materialized = new ArrayList<>();
+
+    /** Updated initially on restore and later cleared upon materialization 
(in FLINK-21357). */
+    @GuardedBy("materialized")
+    private final List<StateChangelogHandle> nonMaterialized = new 
ArrayList<>();
+
+    /**
+     * {@link SequenceNumber} denoting last upload range <b>start</b>, 
inclusive. Updated to {@link
+     * #materializedTo} when {@link #snapshot(long, long, 
CheckpointStreamFactory,
+     * CheckpointOptions) starting snapshot}. Used to notify {@link 
#stateChangelogWriter} about
+     * changelog ranges that were confirmed or aborted by JM.
+     */
+    @Nullable private SequenceNumber lastUploadedFrom;
+    /**
+     * {@link SequenceNumber} denoting last upload range <b>end</b>, 
exclusive. Updated to {@link
+     * 
org.apache.flink.runtime.state.changelog.StateChangelogWriter#lastAppendedSequenceNumber}
+     * when {@link #snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions) starting
+     * snapshot}. Used to notify {@link #stateChangelogWriter} about changelog 
ranges that were
+     * confirmed or aborted by JM.
+     */
+    @Nullable private SequenceNumber lastUploadedTo;
+    /**
+     * The {@link SequenceNumber} up to which the state is materialized, 
exclusive. The log should
+     * be truncated accordingly.
+     *
+     * <p>WARN: currently not updated - to be changed in FLINK-21357.
+     *
+     * <p>WARN: this value needs to be updated for benchmarking, e.g. in 
notifyCheckpointComplete.
+     */
+    private final SequenceNumber materializedTo;
+
     public ChangelogKeyedStateBackend(
             AbstractKeyedStateBackend<K> keyedStateBackend,
             ExecutionConfig executionConfig,
@@ -126,6 +176,7 @@ class ChangelogKeyedStateBackend<K>
         this.ttlTimeProvider = ttlTimeProvider;
         this.keyValueStatesByName = new HashMap<>();
         this.stateChangelogWriter = stateChangelogWriter;
+        this.materializedTo = stateChangelogWriter.initialSequenceNumber();
     }
 
     // -------------------- CheckpointableKeyedStateBackend 
--------------------------------
@@ -240,8 +291,43 @@ class ChangelogKeyedStateBackend<K>
             @Nonnull CheckpointStreamFactory streamFactory,
             @Nonnull CheckpointOptions checkpointOptions)
             throws Exception {
-        return keyedStateBackend.snapshot(
-                checkpointId, timestamp, streamFactory, checkpointOptions);
+        // The range to upload may overlap with the previous one(s). To reuse 
them, we could store
+        // the previous results either here in the backend or in the writer. 
However,
+        // materialization may truncate only a part of the previous result and 
the backend would
+        // have to split it somehow for the former option, so the latter is 
used.
+        lastCheckpointId = checkpointId;
+        lastUploadedFrom = materializedTo;
+        lastUploadedTo = 
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+        LOG.debug(
+                "snapshot for checkpoint {}, change range: {}..{}",
+                checkpointId,
+                lastUploadedFrom,
+                lastUploadedTo);
+        return toRunnableFuture(
+                stateChangelogWriter
+                        .persist(lastUploadedFrom)
+                        .thenApply(this::buildSnapshotResult));
+    }
+
+    private SnapshotResult<KeyedStateHandle> 
buildSnapshotResult(StateChangelogHandle delta) {
+        // Can be called by either task thread during the sync checkpoint 
phase (if persist future
+        // was already completed); or by the writer thread otherwise. So need 
to synchronize.
+        // todo: revisit after FLINK-21357 - use mailbox action?
+        synchronized (materialized) {
+            // collections don't change once started and handles are immutable
+            List<StateChangelogHandle> prevDeltaCopy = new 
ArrayList<>(nonMaterialized);
+            if (delta != null && delta.getStateSize() > 0) {
+                prevDeltaCopy.add(delta);
+            }
+            if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
+                return SnapshotResult.empty();
+            } else {
+                return SnapshotResult.of(
+                        new ChangelogStateBackendHandleImpl(
+                                materialized, prevDeltaCopy, 
getKeyGroupRange()));
+            }
+        }
     }
 
     @Nonnull
@@ -283,11 +369,26 @@ class ChangelogKeyedStateBackend<K>
     // -------------------- CheckpointListener --------------------------------
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (lastCheckpointId == checkpointId) {
+            // Notify the writer so that it can re-use the previous uploads. 
Do NOT notify it about
+            // a range status change if it is not relevant anymore. Otherwise, 
it could CONFIRM a
+            // newer upload instead of the previous one. This newer upload 
could then be re-used
+            // while in fact JM has discarded its results.
+            // This might change if the log ownership changes (the method 
won't likely be needed).
+            stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo);
+        }
         keyedStateBackend.notifyCheckpointComplete(checkpointId);
     }
 
     @Override
     public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        if (lastCheckpointId == checkpointId) {
+            // Notify the writer so that it can clean up. Do NOT notify it 
about a range status
+            // change if it is not relevant anymore. Otherwise, it could 
DISCARD a newer upload
+            // instead of the previous one. Rely on truncation for the cleanup 
in this case.
+            // This might change if the log ownership changes (the method 
won't likely be needed).
+            stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
+        }
         keyedStateBackend.notifyCheckpointAborted(checkpointId);
     }
 
@@ -372,4 +473,39 @@ class ChangelogKeyedStateBackend<K>
                 InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> 
changeLogger)
                 throws Exception;
     }
+
+    private static <T> RunnableFuture<T> toRunnableFuture(CompletableFuture<T> 
f) {
+        return new RunnableFuture<T>() {
+            @Override
+            public void run() {
+                f.join();
+            }
+
+            @Override
+            public boolean cancel(boolean mayInterruptIfRunning) {
+                return f.cancel(mayInterruptIfRunning);
+            }
+
+            @Override
+            public boolean isCancelled() {
+                return f.isCancelled();
+            }
+
+            @Override
+            public boolean isDone() {
+                return f.isDone();
+            }
+
+            @Override
+            public T get() throws InterruptedException, ExecutionException {
+                return f.get();
+            }
+
+            @Override
+            public T get(long timeout, TimeUnit unit)
+                    throws InterruptedException, ExecutionException, 
TimeoutException {
+                return f.get(timeout, unit);
+            }
+        };
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
index 36a7a59..d354203 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -46,6 +47,8 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.util.Collection;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * This state backend holds the working state in the underlying 
delegatedStateBackend, and forwards
@@ -105,10 +108,11 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
                                 kvStateRegistry,
                                 ttlTimeProvider,
                                 metricGroup,
-                                stateHandles,
+                                extractMaterializedState(stateHandles),
                                 cancelStreamRegistry);
         // todo: FLINK-21804 get from Environment.getTaskStateManager
         InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
+        // todo: apply state changes from non-materialized part of stateHandles
         return new ChangelogKeyedStateBackend<>(
                 keyedStateBackend,
                 env.getExecutionConfig(),
@@ -144,12 +148,13 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
                                 kvStateRegistry,
                                 ttlTimeProvider,
                                 metricGroup,
-                                stateHandles,
+                                extractMaterializedState(stateHandles),
                                 cancelStreamRegistry,
                                 managedMemoryFraction);
 
         // todo: FLINK-21804 get from Environment.getTaskStateManager
         InMemoryStateChangelogStorage changelogWriterFactory = new 
InMemoryStateChangelogStorage();
+        // todo: apply state changes from non-materialized part of stateHandles
         return new ChangelogKeyedStateBackend<>(
                 keyedStateBackend,
                 env.getExecutionConfig(),
@@ -190,4 +195,16 @@ public class ChangelogStateBackend implements 
DelegatingStateBackend, Configurab
 
         return this;
     }
+
+    private static Collection<KeyedStateHandle> extractMaterializedState(
+            Collection<KeyedStateHandle> stateHandles) {
+        return stateHandles.stream()
+                .flatMap(
+                        keyedStateHandle ->
+                                
((ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl)
+                                                keyedStateHandle)
+                                        
.getMaterializedStateHandles().stream())
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+    }
 }

Reply via email to