This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b31f4bd  [FLINK-22808][state/changelog] Log metadata
b31f4bd is described below

commit b31f4bde84caa4664cfa305f7890c9a35ff43821
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Mar 29 13:28:58 2021 +0200

    [FLINK-22808][state/changelog] Log metadata
---
 .../state/changelog/AbstractStateChangeLogger.java |  44 ++++++-
 .../changelog/ChangelogKeyedStateBackend.java      |  17 ++-
 .../state/changelog/KvStateChangeLoggerImpl.java   |   6 +-
 .../PriorityQueueStateChangeLoggerImpl.java        |   6 +-
 .../changelog/KvStateChangeLoggerImplTest.java     |  70 +++++++++++
 .../PriorityQueueStateChangeLoggerImplTest.java    |  64 ++++++++++
 .../state/changelog/StateChangeLoggerTestBase.java | 133 +++++++++++++++++++++
 7 files changed, 330 insertions(+), 10 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index 55ceaa7..da106c8 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -18,8 +18,10 @@
 package org.apache.flink.state.changelog;
 
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import javax.annotation.Nullable;
@@ -31,23 +33,31 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_ELEMENT;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_OR_UPDATE_ELEMENT;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET;
 import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET_INTERNAL;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 abstract class AbstractStateChangeLogger<Key, Value, Ns> implements 
StateChangeLogger<Value, Ns> {
+    static final int COMMON_KEY_GROUP = -1;
     protected final StateChangelogWriter<?> stateChangelogWriter;
     protected final InternalKeyContext<Key> keyContext;
+    protected final RegisteredStateMetaInfoBase metaInfo;
+    private boolean metaDataWritten = false;
 
     public AbstractStateChangeLogger(
-            StateChangelogWriter<?> stateChangelogWriter, 
InternalKeyContext<Key> keyContext) {
+            StateChangelogWriter<?> stateChangelogWriter,
+            InternalKeyContext<Key> keyContext,
+            RegisteredStateMetaInfoBase metaInfo) {
         this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
         this.keyContext = checkNotNull(keyContext);
+        this.metaInfo = checkNotNull(metaInfo);
     }
 
     @Override
@@ -103,7 +113,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
     }
 
     protected void log(StateChangeOperation op, Ns ns) throws IOException {
-        // todo: log metadata (FLINK-22808)
+        logMetaIfNeeded();
         stateChangelogWriter.append(keyContext.getCurrentKeyGroupIndex(), 
serialize(op, ns, null));
     }
 
@@ -112,11 +122,28 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
             @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, 
IOException> dataWriter,
             Ns ns)
             throws IOException {
-        // todo: log metadata (FLINK-22808)
+        logMetaIfNeeded();
         stateChangelogWriter.append(
                 keyContext.getCurrentKeyGroupIndex(), serialize(op, ns, 
dataWriter));
     }
 
+    private void logMetaIfNeeded() throws IOException {
+        if (!metaDataWritten) {
+            // todo: add StateChangelogWriter.append() version without a 
keygroup
+            //     when all callers and implementers are merged (FLINK-21356 
or later)
+            stateChangelogWriter.append(
+                    COMMON_KEY_GROUP,
+                    serializeRaw(
+                            out -> {
+                                out.writeByte(METADATA.code);
+                                
out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
+                                StateMetaInfoSnapshotReadersWriters.getWriter()
+                                        
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
+                            }));
+            metaDataWritten = true;
+        }
+    }
+
     private byte[] serialize(
             StateChangeOperation op,
             Ns ns,
@@ -125,6 +152,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
         return serializeRaw(
                 wrapper -> {
                     wrapper.writeByte(op.code);
+                    // todo: optimize in FLINK-22944 by either writing short 
code or grouping and
+                    // writing once (same for key, ns)
+                    wrapper.writeUTF(metaInfo.getName());
                     serializeScope(ns, wrapper);
                     if (dataWriter != null) {
                         dataWriter.accept(wrapper);
@@ -163,7 +193,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
         /** Scope: key + namespace + element (e.g. user map remove or iterator 
remove). */
         REMOVE_ELEMENT((byte) 7),
         /** Scope: key + namespace, first element (e.g. priority queue poll). 
*/
-        REMOVE_FIRST_ELEMENT((byte) 8);
+        REMOVE_FIRST_ELEMENT((byte) 8),
+        /** State metadata (name, serializers, etc.). */
+        METADATA((byte) 9);
         private final byte code;
 
         StateChangeOperation(byte code) {
@@ -177,5 +209,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> 
implements StateChangeL
         public static StateChangeOperation byCode(byte opCode) {
             return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode));
         }
+
+        public byte getCode() {
+            return code;
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 805ae95..e234e97 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -43,6 +43,8 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.SavepointResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateSnapshotTransformer;
@@ -257,7 +259,9 @@ class ChangelogKeyedStateBackend<K>
                 new PriorityQueueStateChangeLoggerImpl<>(
                         byteOrderedElementSerializer,
                         keyedStateBackend.getKeyContext(),
-                        stateChangelogWriter);
+                        stateChangelogWriter,
+                        new RegisteredPriorityQueueStateBackendMetaInfo<>(
+                                stateName, byteOrderedElementSerializer));
         return new ChangelogKeyGroupedPriorityQueue<>(
                 keyedStateBackend.create(stateName, 
byteOrderedElementSerializer),
                 priorityQueueStateChangeLogger,
@@ -339,6 +343,14 @@ class ChangelogKeyedStateBackend<K>
                             stateDesc.getClass(), this.getClass());
             throw new FlinkRuntimeException(message);
         }
+        RegisteredKeyValueStateBackendMetaInfo<N, SV> meta =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        stateDesc.getType(),
+                        stateDesc.getName(),
+                        namespaceSerializer,
+                        stateDesc.getSerializer(),
+                        
(StateSnapshotTransformer.StateSnapshotTransformFactory<SV>)
+                                snapshotTransformFactory);
 
         InternalKvState<K, N, SV> state =
                 keyedStateBackend.createInternalState(
@@ -349,7 +361,8 @@ class ChangelogKeyedStateBackend<K>
                         state.getNamespaceSerializer(),
                         state.getValueSerializer(),
                         keyedStateBackend.getKeyContext(),
-                        stateChangelogWriter);
+                        stateChangelogWriter,
+                        meta);
         return stateFactory.create(state, kvStateChangeLogger);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index ffbb670..5615ce7 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
@@ -43,8 +44,9 @@ class KvStateChangeLoggerImpl<Key, Value, Ns> extends 
AbstractStateChangeLogger<
             TypeSerializer<Ns> namespaceSerializer,
             TypeSerializer<Value> valueSerializer,
             InternalKeyContext<Key> keyContext,
-            StateChangelogWriter<?> stateChangelogWriter) {
-        super(stateChangelogWriter, keyContext);
+            StateChangelogWriter<?> stateChangelogWriter,
+            RegisteredStateMetaInfoBase metaInfo) {
+        super(stateChangelogWriter, keyContext, metaInfo);
         this.keySerializer = checkNotNull(keySerializer);
         this.valueSerializer = checkNotNull(valueSerializer);
         this.namespaceSerializer = checkNotNull(namespaceSerializer);
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index bc793c8..c800d7a 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.state.changelog;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
 
@@ -33,8 +34,9 @@ class PriorityQueueStateChangeLoggerImpl<K, T> extends 
AbstractStateChangeLogger
     PriorityQueueStateChangeLoggerImpl(
             TypeSerializer<T> serializer,
             InternalKeyContext<K> keyContext,
-            StateChangelogWriter<?> stateChangelogWriter) {
-        super(stateChangelogWriter, keyContext);
+            StateChangelogWriter<?> stateChangelogWriter,
+            RegisteredPriorityQueueStateBackendMetaInfo<T> meta) {
+        super(stateChangelogWriter, keyContext, meta);
         this.serializer = checkNotNull(serializer);
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
new file mode 100644
index 0000000..bfea119
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.flink.api.common.state.StateDescriptor.Type.VALUE;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS;
+
+/** {@link KvStateChangeLoggerImpl} test. */
+public class KvStateChangeLoggerImplTest extends 
StateChangeLoggerTestBase<String> {
+
+    @Override
+    protected StateChangeLogger<String, String> getLogger(
+            TestingStateChangelogWriter writer, InternalKeyContextImpl<String> 
keyContext) {
+        StringSerializer keySerializer = new StringSerializer();
+        StringSerializer nsSerializer = new StringSerializer();
+        StringSerializer valueSerializer = new StringSerializer();
+        RegisteredKeyValueStateBackendMetaInfo<String, String> metaInfo =
+                new RegisteredKeyValueStateBackendMetaInfo<>(
+                        VALUE, "test", nsSerializer, valueSerializer);
+        return new KvStateChangeLoggerImpl<>(
+                keySerializer, nsSerializer, valueSerializer, keyContext, 
writer, metaInfo);
+    }
+
+    @Override
+    protected String getNamespace(String element) {
+        return element;
+    }
+
+    @Override
+    protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+            StateChangeOperation op,
+            String element,
+            StateChangeLogger<String, String> logger,
+            InternalKeyContextImpl<String> keyContext)
+            throws IOException {
+        if (op == MERGE_NS) {
+            keyContext.setCurrentKey(element);
+            ((KvStateChangeLogger<String, String>) logger)
+                    .namespacesMerged(element, Collections.emptyList());
+            return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(), 
op));
+        } else {
+            return super.log(op, element, logger, keyContext);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
new file mode 100644
index 0000000..b6bf280
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_FIRST_ELEMENT;
+
+/** {@link PriorityQueueStateChangeLoggerImpl} test. */
+public class PriorityQueueStateChangeLoggerImplTest extends 
StateChangeLoggerTestBase<Void> {
+
+    @Override
+    protected StateChangeLogger<String, Void> getLogger(
+            TestingStateChangelogWriter writer, InternalKeyContextImpl<String> 
keyContext) {
+        StringSerializer valueSerializer = new StringSerializer();
+        RegisteredPriorityQueueStateBackendMetaInfo<String> metaInfo =
+                new RegisteredPriorityQueueStateBackendMetaInfo<>("test", 
valueSerializer);
+        return new PriorityQueueStateChangeLoggerImpl<>(
+                valueSerializer, keyContext, writer, metaInfo);
+    }
+
+    @Override
+    protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+            StateChangeOperation op,
+            String element,
+            StateChangeLogger<String, Void> logger,
+            InternalKeyContextImpl<String> keyContext)
+            throws IOException {
+        if (op == REMOVE_FIRST_ELEMENT) {
+            keyContext.setCurrentKey(element);
+            ((PriorityQueueStateChangeLogger<String>) 
logger).stateElementPolled();
+            return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(), 
op));
+        } else {
+            return super.log(op, element, logger, keyContext);
+        }
+    }
+
+    @Override
+    protected Void getNamespace(String element) {
+        return null;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
new file mode 100644
index 0000000..fd79d86
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
+import static 
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
+import static org.junit.Assert.assertEquals;
+
+abstract class StateChangeLoggerTestBase<Namespace> {
+    /** A basic test for appending the metadata on first state access. */
+    @Test
+    public void testMetadataOperationLogged() throws IOException {
+        TestingStateChangelogWriter writer = new TestingStateChangelogWriter();
+        InternalKeyContextImpl<String> keyContext =
+                new InternalKeyContextImpl<>(KeyGroupRange.of(1, 1000), 1000);
+        StateChangeLogger<String, Namespace> logger = getLogger(writer, 
keyContext);
+
+        List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new 
ArrayList<>();
+        expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));
+
+        // log every applicable operations, several times each
+        int numOpTypes = StateChangeOperation.values().length;
+        for (int i = 0; i < numOpTypes * 7; i++) {
+            String element = Integer.toString(i);
+            StateChangeOperation operation = 
StateChangeOperation.byCode((byte) (i % numOpTypes));
+            log(operation, element, logger, 
keyContext).ifPresent(expectedAppends::add);
+        }
+        assertEquals(expectedAppends, writer.appends);
+    }
+
+    protected abstract StateChangeLogger<String, Namespace> getLogger(
+            TestingStateChangelogWriter writer, InternalKeyContextImpl<String> 
keyContext);
+
+    protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+            StateChangeOperation op,
+            String element,
+            StateChangeLogger<String, Namespace> logger,
+            InternalKeyContextImpl<String> keyContext)
+            throws IOException {
+        keyContext.setCurrentKey(element);
+        Namespace namespace = getNamespace(element);
+        switch (op) {
+            case ADD:
+                logger.valueAdded(element, namespace);
+                break;
+            case ADD_ELEMENT:
+                logger.valueElementAdded(w -> {}, namespace);
+                break;
+            case REMOVE_ELEMENT:
+                logger.valueElementRemoved(w -> {}, namespace);
+                break;
+            case CLEAR:
+                logger.valueCleared(namespace);
+                break;
+            case SET:
+                logger.valueUpdated(element, namespace);
+                break;
+            case SET_INTERNAL:
+                logger.valueUpdatedInternal(element, namespace);
+                break;
+            case ADD_OR_UPDATE_ELEMENT:
+                logger.valueElementAddedOrUpdated(w -> {}, namespace);
+                break;
+            default:
+                return Optional.empty();
+        }
+        return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(), 
op));
+    }
+
+    protected abstract Namespace getNamespace(String element);
+
+    @SuppressWarnings("rawtypes")
+    protected static class TestingStateChangelogWriter implements 
StateChangelogWriter {
+        private final List<Tuple2<Integer, StateChangeOperation>> appends = 
new ArrayList<>();
+
+        @Override
+        public void append(int keyGroup, byte[] value) {
+            appends.add(Tuple2.of(keyGroup, 
StateChangeOperation.byCode(value[0])));
+        }
+
+        @Override
+        public SequenceNumber lastAppendedSequenceNumber() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<?> persist(SequenceNumber from) throws 
IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void truncate(SequenceNumber to) {}
+
+        @Override
+        public void confirm(SequenceNumber from, SequenceNumber to) {}
+
+        @Override
+        public void reset(SequenceNumber from, SequenceNumber to) {}
+
+        @Override
+        public void close() {}
+    }
+}

Reply via email to