curcur commented on a change in pull request #15200:
URL: https://github.com/apache/flink/pull/15200#discussion_r603752889



##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
##########
@@ -20,8 +20,10 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;

Review comment:
       unused import

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+interface StateChangeLogger<State, Namespace> {
+    static <Namespace, State, StateElement> Iterator<StateElement> 
loggingIterator(
+            @Nullable Iterator<StateElement> iterator,
+            StateChangeLogger<State, Namespace> changeLogger,
+            BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
+                    changeWriter,
+            Namespace ns) {
+        if (iterator == null) {
+            return null;
+        }
+        return new Iterator<StateElement>() {
+
+            @Nullable private StateElement lastReturned;
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public StateElement next() {
+                return lastReturned = iterator.next();
+            }
+
+            @Override
+            public void remove() {
+                try {
+                    changeLogger.stateElementRemoved(
+                            out -> changeWriter.accept(lastReturned, out), ns);
+                } catch (IOException e) {
+                    ExceptionUtils.rethrow(e);
+                }
+                iterator.remove();
+            }
+        };
+    }
+
+    static <Namespace, State, StateElement> Iterable<StateElement> 
loggingIterable(
+            @Nullable Iterable<StateElement> iterable,
+            KvStateChangeLogger<State, Namespace> changeLogger,
+            BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, 
IOException>
+                    changeWriter,
+            Namespace ns) {
+        if (iterable == null) {
+            return null;
+        }
+        return () -> loggingIterator(iterable.iterator(), changeLogger, 
changeWriter, ns);
+    }
+
+    static <UK, UV, State, Namespace> Map.Entry<UK, UV> loggingMapEntry(
+            Map.Entry<UK, UV> entry,
+            KvStateChangeLogger<State, Namespace> changeLogger,
+            BiConsumerWithException<Map.Entry<UK, UV>, 
DataOutputViewStreamWrapper, IOException>
+                    changeWriter,
+            Namespace ns) {
+        return new Map.Entry<UK, UV>() {
+            @Override
+            public UK getKey() {
+                return entry.getKey();
+            }
+
+            @Override
+            public UV getValue() {
+                return entry.getValue();
+            }
+
+            @Override
+            public UV setValue(UV value) {
+                try {
+                    changeLogger.stateElementChanged(out -> 
changeWriter.accept(entry, out), ns);
+                } catch (IOException e) {
+                    ExceptionUtils.rethrow(e);
+                }
+                return entry.setValue(value);
+            }
+        };
+    }
+
+    void stateUpdated(State newState, Namespace ns) throws IOException;
+
+    void stateAdded(State addedState, Namespace ns) throws IOException;
+
+    void stateCleared(Namespace ns);

Review comment:
       would you please document/comment here why this interface can not throw 
IOException and why the implementation has to rethrow IOException -> 
RuntimeException?
   
   If it is because the @PublicEvolving interface in `State` does not throw 
IOException, this comment can help clean-up and improve when refactoring.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java
##########
@@ -83,4 +90,8 @@ public void setCurrentNamespace(N namespace) {
             int recommendedMaxNumberOfReturnedRecords) {
         return 
delegatedState.getStateIncrementalVisitor(recommendedMaxNumberOfReturnedRecords);
     }
+
+    protected N getCurrentNamespace() throws NullPointerException {
+        return checkNotNull(currentNamespace);
+    }

Review comment:
       that's good

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CHANGE_ELEMENT;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractStateChangeLogger<Key, State, Ns> implements 
StateChangeLogger<State, Ns> {
+    protected final StateChangelogWriter<?> stateChangelogWriter;
+    protected final InternalKeyContext<Key> keyContext;
+
+    public AbstractStateChangeLogger(
+            StateChangelogWriter<?> stateChangelogWriter, 
InternalKeyContext<Key> keyContext) {
+        this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
+        this.keyContext = checkNotNull(keyContext);
+    }
+
+    @Override
+    public void stateUpdated(State newState, Ns ns) throws IOException {
+        if (newState == null) {
+            stateCleared(ns);
+        } else {
+            log(SET, out -> serializeState(newState, out), ns);
+        }
+    }
+
+    protected abstract void serializeState(State state, 
DataOutputViewStreamWrapper out)
+            throws IOException;
+
+    @Override
+    public void stateAdded(State addedState, Ns ns) throws IOException {
+        log(ADD, out -> serializeState(addedState, out), ns);
+    }
+
+    @Override
+    public void stateCleared(Ns ns) {
+        try {
+            log(CLEAR, out -> {}, ns);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    @Override
+    public void stateElementChanged(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(CHANGE_ELEMENT, dataSerializer, ns);
+    }
+
+    @Override
+    public void stateElementRemoved(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(REMOVE_ELEMENT, dataSerializer, ns);
+    }
+
+    protected void log(
+            StateChangeOperation op,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter,
+            Ns ns)
+            throws IOException {
+        stateChangelogWriter.append(
+                keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, 
dataWriter));
+    }
+
+    private byte[] serialize(
+            StateChangeOperation op,
+            Ns ns,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        return serializeRaw(
+                wrapper -> {
+                    wrapper.writeByte(op.code);
+                    serializeScope(ns, wrapper);
+                    dataWriter.accept(wrapper);
+                });
+    }
+
+    protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper 
out)
+            throws IOException;
+
+    private byte[] serializeRaw(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+                DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(out)) {
+            dataWriter.accept(wrapper);
+            return out.toByteArray();
+        }
+    }
+
+    enum StateChangeOperation {
+        /** Scope: key + namespace. */
+        CLEAR((byte) 0),
+        /** Scope: key + namespace. */
+        SET((byte) 1),
+        /** Scope: key + namespace. */
+        ADD((byte) 2),
+        /** Scope: key + namespace, also affecting other (source) namespaces. 
*/
+        MERGE_NS((byte) 3),
+        /** Scope: key + namespace + element (e.g. user map key put or list 
append). */
+        CHANGE_ELEMENT((byte) 4),
+        /** Scope: key + namespace + element (e.g. user map remove or iterator 
remove). */
+        REMOVE_ELEMENT((byte) 5),
+        /** Scope: key + namespace, last element. */
+        POLL_ELEMENT((byte) 6);

Review comment:
       could you please add an "e.g., " here, similar to "CHANGE_ELEMENT and 
REMOVE_ELEMENT"

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +94,67 @@ public boolean contains(UK key) throws Exception {
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        return delegatedState.entries();
+        Iterator<Map.Entry<UK, UV>> iterator = 
delegatedState.entries().iterator();
+        final N currentNamespace = getCurrentNamespace();
+        return () ->
+                loggingIterator(
+                        new Iterator<Map.Entry<UK, UV>>() {
+                            @Override
+                            public Map.Entry<UK, UV> next() {
+                                return loggingMapEntry(
+                                        iterator.next(),
+                                        changeLogger,
+                                        changeWriter,
+                                        currentNamespace);
+                            }
+
+                            @Override
+                            public boolean hasNext() {
+                                return iterator.hasNext();
+                            }
+
+                            @Override
+                            public void remove() {
+                                iterator.remove();
+                            }
+                        },
+                        changeLogger,
+                        changeWriter,
+                        currentNamespace);
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        return delegatedState.keys();
+        return loggingIterable(
+                delegatedState.keys(), changeLogger, this::serializeKey, 
getCurrentNamespace());

Review comment:
       Is it possible to remove a key without removing its value for a map?
   Ask because the loggingIterable is mainly for remove change

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+interface StateChangeLogger<State, Namespace> {
+    static <Namespace, State, StateElement> Iterator<StateElement> 
loggingIterator(

Review comment:
       1. I am not sure whether these static implementations/methods should be 
put in the (base) interface.
   
   2. Especially `loggingMapEntry` and `loggingIterable` are only used for 
`MapState`
   
   3. Also, why such iterators are needed? Is it because iterator `removes` 
supported in some statebackends?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PqStateChangeLogger.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import java.io.IOException;
+
+interface PqStateChangeLogger<T> extends StateChangeLogger<T, Void> {

Review comment:
       If I guess correctly, Pq stands for priority queue, would it be fine to 
make it specific? It is difficult to guess what Pq stands for unless being 
familiar with the implmentaion.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
##########
@@ -67,18 +71,20 @@ public OUT get() throws Exception {
     @Override
     public void add(IN value) throws Exception {
         delegatedState.add(value);
+        changeLogger.stateUpdated(delegatedState.getInternal(), 
getCurrentNamespace());

Review comment:
       That's interesting. Using `update` to simulate `add` could work, but I 
guess this is something different than what we want?
   
   It is possible the case that "add" could be small, but "update" is huge.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+@NotThreadSafe
+class KvStateChangeLoggerImpl<Key, State, Ns> extends 
AbstractStateChangeLogger<Key, State, Ns>
+        implements KvStateChangeLogger<State, Ns> {
+
+    private final TypeSerializer<Ns> namespaceSerializer;
+    protected final TypeSerializer<Key> keySerializer;
+    private final TypeSerializer<State> valueSerializer;
+
+    KvStateChangeLoggerImpl(
+            TypeSerializer<Key> keySerializer,
+            TypeSerializer<Ns> namespaceSerializer,
+            TypeSerializer<State> valueSerializer,

Review comment:
       Wondering whether it makes sense to change `<State>` to `<Value>`, and 
other related places. I feel "state" in statebackend is slightly different from 
what's the state is used here.
   
   If you look at `InternalKvState` which is a state, it contains
   
       /** Returns the {@link TypeSerializer} for the type of key this state is 
associated to. */
       TypeSerializer<K> getKeySerializer();
   
       /** Returns the {@link TypeSerializer} for the type of namespace this 
state is associated to. */
       TypeSerializer<N> getNamespaceSerializer();
   
       /** Returns the {@link TypeSerializer} for the type of value this state 
holds. */
       TypeSerializer<V> getValueSerializer();
   
   And I think the <State> you are using here is more or less meaning to 
"Value" instead?
   
   In my understanding, state is somehow not nullable, but value can be. That's 
something confuses me at the beginning, where at some places state is not 
allowed to be null, but we still has a CLEAR OP allows state(value) to be null.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +94,67 @@ public boolean contains(UK key) throws Exception {
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        return delegatedState.entries();
+        Iterator<Map.Entry<UK, UV>> iterator = 
delegatedState.entries().iterator();
+        final N currentNamespace = getCurrentNamespace();
+        return () ->
+                loggingIterator(
+                        new Iterator<Map.Entry<UK, UV>>() {
+                            @Override
+                            public Map.Entry<UK, UV> next() {
+                                return loggingMapEntry(
+                                        iterator.next(),
+                                        changeLogger,
+                                        changeWriter,
+                                        currentNamespace);
+                            }
+
+                            @Override
+                            public boolean hasNext() {
+                                return iterator.hasNext();
+                            }
+
+                            @Override
+                            public void remove() {
+                                iterator.remove();
+                            }

Review comment:
       would it be possible to remove from the entries here? 
   
   should we have delegated stateElementRemoved here as well?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java
##########
@@ -67,18 +71,20 @@ public OUT get() throws Exception {
     @Override
     public void add(IN value) throws Exception {
         delegatedState.add(value);
+        changeLogger.stateUpdated(delegatedState.getInternal(), 
getCurrentNamespace());

Review comment:
       I guess here we have to have both IN serializer and ACC serializer.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java
##########
@@ -18,24 +18,36 @@
 
 package org.apache.flink.state.changelog;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
 
+import static 
org.apache.flink.state.changelog.StateChangeLogger.loggingIterator;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link KeyGroupedInternalPriorityQueue} that keeps state on the 
underlying delegated {@link
  * KeyGroupedInternalPriorityQueue} as well as on the state change log.
  */
 public class ChangelogKeyGroupedPriorityQueue<T> implements 
KeyGroupedInternalPriorityQueue<T> {
     private final KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue;
+    private final PqStateChangeLogger<T> logger;

Review comment:
       Are these updates writing to the same log? I mean they are using 
different writers, but they might share the same log write?
   
   If the scope is not specified, how we can differentiate this PQ from the 
normal change log?
   
   I understand with normal changes can be differentiated using key context, 
but how is this one? 

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +94,67 @@ public boolean contains(UK key) throws Exception {
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        return delegatedState.entries();
+        Iterator<Map.Entry<UK, UV>> iterator = 
delegatedState.entries().iterator();
+        final N currentNamespace = getCurrentNamespace();
+        return () ->
+                loggingIterator(
+                        new Iterator<Map.Entry<UK, UV>>() {
+                            @Override
+                            public Map.Entry<UK, UV> next() {
+                                return loggingMapEntry(
+                                        iterator.next(),
+                                        changeLogger,
+                                        changeWriter,
+                                        currentNamespace);
+                            }
+
+                            @Override
+                            public boolean hasNext() {
+                                return iterator.hasNext();
+                            }
+
+                            @Override
+                            public void remove() {
+                                iterator.remove();
+                            }
+                        },
+                        changeLogger,
+                        changeWriter,
+                        currentNamespace);
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        return delegatedState.keys();
+        return loggingIterable(
+                delegatedState.keys(), changeLogger, this::serializeKey, 
getCurrentNamespace());
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        return delegatedState.values();
+        Iterator<Map.Entry<UK, UV>> iterator = entries().iterator();
+        return () ->
+                new Iterator<UV>() {
+                    @Override
+                    public boolean hasNext() {
+                        return iterator.hasNext();
+                    }
+
+                    @Override
+                    public UV next() {
+                        return iterator.next().getValue();
+                    }
+
+                    @Override
+                    public void remove() {
+                        iterator.remove();
+                    }

Review comment:
       same question as entries() since it is using entries iterator.

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java
##########
@@ -39,53 +42,79 @@
         extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, 
V>>
         implements InternalListState<K, N, V> {
 
-    ChangelogListState(InternalListState<K, N, V> delegatedState) {
-        super(delegatedState);
+    ChangelogListState(
+            InternalListState<K, N, V> delegatedState,
+            KvStateChangeLogger<List<V>, N> changeLogger) {
+        super(delegatedState, changeLogger);
     }
 
     @Override
     public void update(List<V> values) throws Exception {
+        changeLogger.stateUpdated(values, getCurrentNamespace());
         delegatedState.update(values);
     }
 
     @Override
     public void addAll(List<V> values) throws Exception {
+        changeLogger.stateAdded(values, getCurrentNamespace());
         delegatedState.addAll(values);
     }
 
     @Override
     public void updateInternal(List<V> valueToStore) throws Exception {
+        changeLogger.stateUpdated(valueToStore, getCurrentNamespace());
         delegatedState.updateInternal(valueToStore);
     }
 
     @Override
     public void add(V value) throws Exception {
+        if (getValueSerializer() instanceof ListSerializer) {

Review comment:
       Is it possible that getValueSerializer() is instance of ListSerializer, 
but list is null?
   
   That would be handled during deserialization I guess.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
##########
@@ -391,8 +393,8 @@ private static StateBackend loadChangelogStateBackend(
             Constructor<? extends DelegatingStateBackend> constructor =
                     Class.forName(CHANGELOG_STATE_BACKEND, false, classLoader)
                             .asSubclass(DelegatingStateBackend.class)
-                            .getConstructor(StateBackend.class);
-            return constructor.newInstance(backend);
+                            .getConstructor(StateBackend.class, 
StateChangelogWriterFactory.class);
+            return constructor.newInstance(backend, new 
InMemoryStateChangelogWriterFactory());

Review comment:
       This... I am not sure of this. It's a bit too hacky for prod code. But 
given the limited time, maybe it is fine?
   
   @pnowojski what do you think?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CHANGE_ELEMENT;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractStateChangeLogger<Key, State, Ns> implements 
StateChangeLogger<State, Ns> {
+    protected final StateChangelogWriter<?> stateChangelogWriter;
+    protected final InternalKeyContext<Key> keyContext;
+
+    public AbstractStateChangeLogger(
+            StateChangelogWriter<?> stateChangelogWriter, 
InternalKeyContext<Key> keyContext) {
+        this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
+        this.keyContext = checkNotNull(keyContext);
+    }
+
+    @Override
+    public void stateUpdated(State newState, Ns ns) throws IOException {
+        if (newState == null) {
+            stateCleared(ns);
+        } else {
+            log(SET, out -> serializeState(newState, out), ns);
+        }
+    }
+
+    protected abstract void serializeState(State state, 
DataOutputViewStreamWrapper out)
+            throws IOException;
+
+    @Override
+    public void stateAdded(State addedState, Ns ns) throws IOException {
+        log(ADD, out -> serializeState(addedState, out), ns);
+    }
+
+    @Override
+    public void stateCleared(Ns ns) {
+        try {
+            log(CLEAR, out -> {}, ns);
+        } catch (IOException e) {
+            ExceptionUtils.rethrow(e);
+        }
+    }
+
+    @Override
+    public void stateElementChanged(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(CHANGE_ELEMENT, dataSerializer, ns);
+    }
+
+    @Override
+    public void stateElementRemoved(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataSerializer, Ns ns)
+            throws IOException {
+        log(REMOVE_ELEMENT, dataSerializer, ns);
+    }
+
+    protected void log(
+            StateChangeOperation op,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter,
+            Ns ns)
+            throws IOException {
+        stateChangelogWriter.append(
+                keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, 
dataWriter));
+    }
+
+    private byte[] serialize(
+            StateChangeOperation op,
+            Ns ns,
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        return serializeRaw(
+                wrapper -> {
+                    wrapper.writeByte(op.code);
+                    serializeScope(ns, wrapper);
+                    dataWriter.accept(wrapper);
+                });
+    }
+
+    protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper 
out)
+            throws IOException;
+
+    private byte[] serializeRaw(
+            ThrowingConsumer<DataOutputViewStreamWrapper, IOException> 
dataWriter)
+            throws IOException {
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream();
+                DataOutputViewStreamWrapper wrapper = new 
DataOutputViewStreamWrapper(out)) {
+            dataWriter.accept(wrapper);
+            return out.toByteArray();
+        }
+    }
+
+    enum StateChangeOperation {
+        /** Scope: key + namespace. */
+        CLEAR((byte) 0),
+        /** Scope: key + namespace. */
+        SET((byte) 1),
+        /** Scope: key + namespace. */
+        ADD((byte) 2),
+        /** Scope: key + namespace, also affecting other (source) namespaces. 
*/
+        MERGE_NS((byte) 3),
+        /** Scope: key + namespace + element (e.g. user map key put or list 
append). */
+        CHANGE_ELEMENT((byte) 4),
+        /** Scope: key + namespace + element (e.g. user map remove or iterator 
remove). */
+        REMOVE_ELEMENT((byte) 5),
+        /** Scope: key + namespace, last element. */
+        POLL_ELEMENT((byte) 6);
+        private final byte code;
+
+        StateChangeOperation(byte code) {
+            this.code = code;
+        }
+
+        static Map<Byte, KvStateChangeLoggerImpl.StateChangeOperation> byCodes 
=

Review comment:
       usually, a static element is capitalized? and I guess you want a `final` 
as well?  maybe `CODE_MAP`?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLogger.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import java.io.IOException;
+import java.util.Collection;
+
+interface KvStateChangeLogger<State, Namespace> extends 
StateChangeLogger<State, Namespace> {
+
+    void stateMerged(Namespace target, Collection<Namespace> sources) throws 
IOException;

Review comment:
       -> `mergeNamespaces` ?

##########
File path: 
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java
##########
@@ -71,22 +94,67 @@ public boolean contains(UK key) throws Exception {
 
     @Override
     public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
-        return delegatedState.entries();
+        Iterator<Map.Entry<UK, UV>> iterator = 
delegatedState.entries().iterator();
+        final N currentNamespace = getCurrentNamespace();
+        return () ->
+                loggingIterator(
+                        new Iterator<Map.Entry<UK, UV>>() {
+                            @Override
+                            public Map.Entry<UK, UV> next() {
+                                return loggingMapEntry(
+                                        iterator.next(),
+                                        changeLogger,
+                                        changeWriter,
+                                        currentNamespace);
+                            }
+
+                            @Override
+                            public boolean hasNext() {
+                                return iterator.hasNext();
+                            }
+
+                            @Override
+                            public void remove() {
+                                iterator.remove();
+                            }
+                        },
+                        changeLogger,
+                        changeWriter,
+                        currentNamespace);
     }
 
     @Override
     public Iterable<UK> keys() throws Exception {
-        return delegatedState.keys();
+        return loggingIterable(
+                delegatedState.keys(), changeLogger, this::serializeKey, 
getCurrentNamespace());
     }
 
     @Override
     public Iterable<UV> values() throws Exception {
-        return delegatedState.values();
+        Iterator<Map.Entry<UK, UV>> iterator = entries().iterator();
+        return () ->
+                new Iterator<UV>() {
+                    @Override
+                    public boolean hasNext() {
+                        return iterator.hasNext();
+                    }
+
+                    @Override
+                    public UV next() {
+                        return iterator.next().getValue();
+                    }
+
+                    @Override
+                    public void remove() {
+                        iterator.remove();
+                    }
+                };
     }
 
     @Override
     public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
-        return delegatedState.iterator();
+        return loggingIterator(
+                delegatedState.iterator(), changeLogger, changeWriter, 
getCurrentNamespace());
     }

Review comment:
       would it possible to change the value (not add a value) here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to