This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04ccb61cfc7b27c675d3b3198f63b2fb114d054a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 2 17:51:18 2021 +0200

    [hotfix][state/changelog] Introduce StateChangelogHandleReader
    
    Inverse control when reading StateChangelogHandles to avoid
    embedding logic into handles and make the reader creation explicit and
    symmetric to writer.
    
    The reader is now obtained from StateChangelogWriterFactory which is
    renamed in a subsequent commit.
---
 .../metadata/MetadataV2V3SerializerBase.java       | 10 +--
 .../state/changelog/StateChangelogHandle.java      | 14 +---
 ...Handle.java => StateChangelogHandleReader.java} | 12 +--
 .../StateChangelogHandleStreamHandleReader.java    | 97 ++++++++++++++++++++++
 .../changelog/StateChangelogHandleStreamImpl.java  | 53 +-----------
 .../state/changelog/StateChangelogWriter.java      |  6 +-
 .../changelog/StateChangelogWriterFactory.java     |  8 +-
 .../inmemory/InMemoryStateChangelogHandle.java     |  9 +-
 .../inmemory/InMemoryStateChangelogWriter.java     | 18 ++--
 .../InMemoryStateChangelogWriterFactory.java       |  7 ++
 .../inmemory/StateChangelogWriterFactoryTest.java  | 41 ++++-----
 .../state/changelog/StateChangeLoggerTestBase.java |  5 ++
 12 files changed, 169 insertions(+), 111 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 96961ed..0ea7337 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -64,6 +64,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * Base (De)serializer for checkpoint metadata format version 2 and 3.
  *
@@ -322,10 +324,8 @@ public abstract class MetadataV2V3SerializerBase {
             dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
             dos.writeLong(handle.getFrom());
             dos.writeLong(handle.getTo());
-            List<StateChange> list = new ArrayList<>();
-            handle.getChanges(null).forEachRemaining(list::add);
-            dos.writeInt(list.size());
-            for (StateChange change : list) {
+            dos.writeInt(handle.getChanges().size());
+            for (StateChange change : handle.getChanges()) {
                 dos.writeInt(change.getKeyGroup());
                 dos.writeInt(change.getChange().length);
                 dos.write(change.getChange());
@@ -419,7 +419,7 @@ public abstract class MetadataV2V3SerializerBase {
                 int keyGroup = dis.readInt();
                 int bytesSize = dis.readInt();
                 byte[] bytes = new byte[bytesSize];
-                dis.read(bytes);
+                checkState(bytesSize == dis.read(bytes));
                 changes.add(new StateChange(keyGroup, bytes));
             }
             return new InMemoryStateChangelogHandle(changes, from, to, 
keyGroupRange);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
index a4a8656..c200b28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
@@ -19,17 +19,7 @@ package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.util.CloseableIterator;
 
-import java.io.IOException;
-
-/**
- * A handle to saved {@link StateChange state changes}.
- *
- * @param <ReaderContext> type of context used while reading (on TM).
- */
+/** A handle to saved {@link StateChange state changes}. */
 @Internal
-public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle {
-
-    CloseableIterator<StateChange> getChanges(ReaderContext context) throws 
IOException;
-}
+public interface StateChangelogHandle extends KeyedStateHandle {}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
similarity index 73%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
index a4a8656..9edd7e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java
@@ -18,18 +18,12 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.util.CloseableIterator;
 
 import java.io.IOException;
 
-/**
- * A handle to saved {@link StateChange state changes}.
- *
- * @param <ReaderContext> type of context used while reading (on TM).
- */
+/** Allows to read state changelog referenced by the provided {@link 
StateChangelogHandle}. */
 @Internal
-public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle {
-
-    CloseableIterator<StateChange> getChanges(ReaderContext context) throws 
IOException;
+public interface StateChangelogHandleReader<Handle extends 
StateChangelogHandle> {
+    CloseableIterator<StateChange> getChanges(Handle handle) throws 
IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
new file mode 100644
index 0000000..482590c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its 
underlying {@link
+ * StreamStateHandle stream handles} and offsets. Starting from each offset, 
it enumerates the
+ * {@link StateChange state changes} using the provided {@link 
StateChangeIterator}. Different
+ * {@link StateChangelogStorage} implementations may have different 
<b>iterator</b> implementations.
+ * Using a different {@link StateChangelogHandle} (and reader) is problematic 
as it needs to be
+ * serialized.
+ */
+@Internal
+public class StateChangelogHandleStreamHandleReader
+        implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class);
+
+    /** Reads a stream of state changes starting from a specified offset. */
+    public interface StateChangeIterator {
+        CloseableIterator<StateChange> read(StreamStateHandle handle, long 
offset);
+    }
+
+    private final StateChangeIterator changeIterator;
+
+    public StateChangelogHandleStreamHandleReader(StateChangeIterator 
changeIterator) {
+        this.changeIterator = changeIterator;
+    }
+
+    @Override
+    public CloseableIterator<StateChange> 
getChanges(StateChangelogHandleStreamImpl handle)
+            throws IOException {
+        return new CloseableIterator<StateChange>() {
+            private final Iterator<Tuple2<StreamStateHandle, Long>> 
handleIterator =
+                    handle.getHandlesAndOffsets().iterator();
+
+            private CloseableIterator<StateChange> current = 
CloseableIterator.empty();
+
+            @Override
+            public boolean hasNext() {
+                advance();
+                return current.hasNext();
+            }
+
+            @Override
+            public StateChange next() {
+                advance();
+                return current.next();
+            }
+
+            private void advance() {
+                while (!current.hasNext() && handleIterator.hasNext()) {
+                    try {
+                        current.close();
+                        Tuple2<StreamStateHandle, Long> tuple2 = 
handleIterator.next();
+                        LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
+                        current = changeIterator.read(tuple2.f0, tuple2.f1);
+                    } catch (Exception e) {
+                        ExceptionUtils.rethrow(e);
+                    }
+                }
+            }
+
+            @Override
+            public void close() throws Exception {
+                current.close();
+            }
+        };
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
index 16a4e60..24c1405 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java
@@ -28,24 +28,19 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.CloseableIterator;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
 /** {@link StateChangelogHandle} implementation based on {@link 
StreamStateHandle}. */
 @Internal
-public final class StateChangelogHandleStreamImpl
-        implements 
StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> {
+public final class StateChangelogHandleStreamImpl implements 
StateChangelogHandle {
+
     private static final long serialVersionUID = -8070326169926626355L;
-    private static final Logger LOG = 
LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class);
 
     private final KeyGroupRange keyGroupRange;
     /** NOTE: order is important as it reflects the order of changes. */
@@ -96,46 +91,6 @@ public final class StateChangelogHandleStreamImpl
     }
 
     @Override
-    public CloseableIterator<StateChange> getChanges(StateChangeStreamReader 
reader) {
-        return new CloseableIterator<StateChange>() {
-            private final Iterator<Tuple2<StreamStateHandle, Long>> 
handleIterator =
-                    handlesAndOffsets.iterator();
-
-            private CloseableIterator<StateChange> current = 
CloseableIterator.empty();
-
-            @Override
-            public boolean hasNext() {
-                advance();
-                return current.hasNext();
-            }
-
-            @Override
-            public StateChange next() {
-                advance();
-                return current.next();
-            }
-
-            private void advance() {
-                while (!current.hasNext() && handleIterator.hasNext()) {
-                    try {
-                        current.close();
-                        Tuple2<StreamStateHandle, Long> tuple2 = 
handleIterator.next();
-                        LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0);
-                        current = reader.read(tuple2.f0, tuple2.f1);
-                    } catch (Exception e) {
-                        ExceptionUtils.rethrow(e);
-                    }
-                }
-            }
-
-            @Override
-            public void close() throws Exception {
-                current.close();
-            }
-        };
-    }
-
-    @Override
     public void discardState() {
         handlesAndOffsets.forEach(
                 handleAndOffset -> 
stateRegistry.unregisterReference(getKey(handleAndOffset.f0)));
@@ -168,6 +123,6 @@ public final class StateChangelogHandleStreamImpl
     }
 
     public List<Tuple2<StreamStateHandle, Long>> getHandlesAndOffsets() {
-        return handlesAndOffsets;
+        return Collections.unmodifiableList(handlesAndOffsets);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index a206d3e..48cf9c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -24,8 +24,10 @@ import java.util.concurrent.CompletableFuture;
 
 /** Allows to write data to the log. Scoped to a single writer (e.g. state 
backend). */
 @Internal
-public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>>
-        extends AutoCloseable {
+public interface StateChangelogWriter<Handle extends StateChangelogHandle> 
extends AutoCloseable {
+
+    /** Get the initial {@link SequenceNumber} that is used for the first 
element. */
+    SequenceNumber initialSequenceNumber();
 
     /**
      * Get {@link SequenceNumber} of the last element added by {@link 
#append(int, byte[]) append}.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
index e5f1100..072cd15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java
@@ -22,15 +22,17 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
 /**
- * {@link StateChangelogWriter} factory. Scoped to a single entity (e.g. a 
SubTask or
- * OperatorCoordinator). Please use {@link StateChangelogWriterFactoryLoader} 
to obtain an instance.
+ * A factory for {@link StateChangelogWriter} and {@link 
StateChangelogHandleReader}. Please use
+ * {@link StateChangelogWriterFactoryLoader} to obtain an instance.
  */
 @Internal
-public interface StateChangelogWriterFactory<Handle extends 
StateChangelogHandle<?>>
+public interface StateChangelogWriterFactory<Handle extends 
StateChangelogHandle>
         extends AutoCloseable {
 
     StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange 
keyGroupRange);
 
+    StateChangelogHandleReader<Handle> createReader();
+
     @Override
     default void close() throws Exception {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
index 0896dc7..f492cad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java
@@ -24,15 +24,15 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
-import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
 
 /** In-memory {@link StateChangelogHandle}. */
 @Internal
-public class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void> {
+public class InMemoryStateChangelogHandle implements StateChangelogHandle {
 
     private static final long serialVersionUID = 1L;
 
@@ -65,9 +65,8 @@ public class InMemoryStateChangelogHandle implements 
StateChangelogHandle<Void>
         return changes.stream().mapToLong(change -> 
change.getChange().length).sum();
     }
 
-    @Override
-    public CloseableIterator<StateChange> getChanges(Void unused) {
-        return CloseableIterator.fromList(changes, change -> {});
+    public List<StateChange> getChanges() {
+        return Collections.unmodifiableList(changes);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index fa99f69..c63fb93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -44,11 +44,12 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 @NotThreadSafe
 class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryStateChangelogHandle> {
     private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
+    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L);
 
     private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> 
changesByKeyGroup =
             new HashMap<>();
     private final KeyGroupRange keyGroupRange;
-    private long sqn = 0L;
+    private SequenceNumber sqn = INITIAL_SQN;
     private boolean closed;
 
     public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
@@ -59,14 +60,18 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
     public void append(int keyGroup, byte[] value) {
         Preconditions.checkState(!closed, "LogWriter is closed");
         LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length);
-        changesByKeyGroup
-                .computeIfAbsent(keyGroup, unused -> new TreeMap<>())
-                .put(SequenceNumber.of(++sqn), value);
+        sqn = sqn.next();
+        changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new 
TreeMap<>()).put(sqn, value);
+    }
+
+    @Override
+    public SequenceNumber initialSequenceNumber() {
+        return INITIAL_SQN;
     }
 
     @Override
     public SequenceNumber lastAppendedSequenceNumber() {
-        return SequenceNumber.of(sqn);
+        return sqn;
     }
 
     @Override
@@ -74,8 +79,7 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryState
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
         return completedFuture(
-                new InMemoryStateChangelogHandle(
-                        collectChanges(from), from, SequenceNumber.of(sqn), 
keyGroupRange));
+                new InMemoryStateChangelogHandle(collectChanges(from), from, 
sqn, keyGroupRange));
     }
 
     private List<StateChange> collectChanges(SequenceNumber after) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
index 255de2a..9383c26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java
@@ -18,7 +18,9 @@
 package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
+import org.apache.flink.util.CloseableIterator;
 
 /** An in-memory (non-production) implementation of {@link 
StateChangelogWriterFactory}. */
 public class InMemoryStateChangelogWriterFactory
@@ -29,4 +31,9 @@ public class InMemoryStateChangelogWriterFactory
             String operatorID, KeyGroupRange keyGroupRange) {
         return new InMemoryStateChangelogWriter(keyGroupRange);
     }
+
+    @Override
+    public StateChangelogHandleReader<InMemoryStateChangelogHandle> 
createReader() {
+        return handle -> CloseableIterator.fromList(handle.getChanges(), 
change -> {});
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
index 126783e..01cc322 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
+import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory;
 import org.apache.flink.util.CloseableIterator;
@@ -30,6 +31,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -47,14 +49,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 
 /** {@link InMemoryStateChangelogWriterFactory} test. */
-public class StateChangelogWriterFactoryTest {
+public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> {
 
     private final Random random = new Random();
 
     @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
     @Test(expected = IllegalStateException.class)
-    public void testNoAppendAfterClose() {
+    public void testNoAppendAfterClose() throws IOException {
         StateChangelogWriter<?> writer =
                 getFactory().createWriter(new OperatorID().toString(), 
KeyGroupRange.of(0, 0));
         writer.close();
@@ -66,16 +68,22 @@ public class StateChangelogWriterFactoryTest {
         KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
         Map<Integer, List<byte[]>> appendsByKeyGroup = 
generateAppends(kgRange, 10, 20);
 
-        try (StateChangelogWriterFactory<?> client = getFactory();
-                StateChangelogWriter<?> writer =
+        try (StateChangelogWriterFactory<T> client = getFactory();
+                StateChangelogWriter<T> writer =
                         client.createWriter(new OperatorID().toString(), 
kgRange)) {
-            SequenceNumber prev = writer.lastAppendedSequenceNumber();
-            appendsByKeyGroup.forEach(
-                    (group, appends) -> appends.forEach(bytes -> 
writer.append(group, bytes)));
+            SequenceNumber prev = writer.initialSequenceNumber();
+            for (Map.Entry<Integer, List<byte[]>> entry : 
appendsByKeyGroup.entrySet()) {
+                Integer group = entry.getKey();
+                List<byte[]> appends = entry.getValue();
+                for (byte[] bytes : appends) {
+                    writer.append(group, bytes);
+                }
+            }
 
-            StateChangelogHandle<?> handle = writer.persist(prev.next()).get();
+            T handle = writer.persist(prev).get();
+            StateChangelogHandleReader<T> reader = client.createReader();
 
-            assertByteMapsEqual(appendsByKeyGroup, extract(handle));
+            assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
         }
     }
 
@@ -94,11 +102,10 @@ public class StateChangelogWriterFactoryTest {
         }
     }
 
-    private Map<Integer, List<byte[]>> extract(StateChangelogHandle<?> handle) 
throws Exception {
+    private Map<Integer, List<byte[]>> extract(T handle, 
StateChangelogHandleReader<T> reader)
+            throws Exception {
         Map<Integer, List<byte[]>> changes = new HashMap<>();
-        //noinspection unchecked
-        StateChangelogHandle<Object> objHandle = 
(StateChangelogHandle<Object>) handle;
-        try (CloseableIterator<StateChange> it = 
objHandle.getChanges(getContext())) {
+        try (CloseableIterator<StateChange> it = reader.getChanges(handle)) {
             while (it.hasNext()) {
                 StateChange change = it.next();
                 changes.computeIfAbsent(change.getKeyGroup(), k -> new 
ArrayList<>())
@@ -126,11 +133,7 @@ public class StateChangelogWriterFactoryTest {
         return bytes;
     }
 
-    private InMemoryStateChangelogWriterFactory getFactory() {
-        return new InMemoryStateChangelogWriterFactory();
-    }
-
-    private Object getContext() {
-        return null;
+    protected StateChangelogWriterFactory<T> getFactory() {
+        return (StateChangelogWriterFactory<T>) new 
InMemoryStateChangelogWriterFactory();
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index fd79d86..1d2572c 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -109,6 +109,11 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         }
 
         @Override
+        public SequenceNumber initialSequenceNumber() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
         public SequenceNumber lastAppendedSequenceNumber() {
             throw new UnsupportedOperationException();
         }

Reply via email to