This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 04ccb61cfc7b27c675d3b3198f63b2fb114d054a Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Jun 2 17:51:18 2021 +0200 [hotfix][state/changelog] Introduce StateChangelogHandleReader Inverse control when reading StateChangelogHandles to avoid embedding logic into handles and make the reader creation explicit and symmetric to writer. The reader is now obtained from StateChangelogWriterFactory which is renamed in a subsequent commit. --- .../metadata/MetadataV2V3SerializerBase.java | 10 +-- .../state/changelog/StateChangelogHandle.java | 14 +--- ...Handle.java => StateChangelogHandleReader.java} | 12 +-- .../StateChangelogHandleStreamHandleReader.java | 97 ++++++++++++++++++++++ .../changelog/StateChangelogHandleStreamImpl.java | 53 +----------- .../state/changelog/StateChangelogWriter.java | 6 +- .../changelog/StateChangelogWriterFactory.java | 8 +- .../inmemory/InMemoryStateChangelogHandle.java | 9 +- .../inmemory/InMemoryStateChangelogWriter.java | 18 ++-- .../InMemoryStateChangelogWriterFactory.java | 7 ++ .../inmemory/StateChangelogWriterFactoryTest.java | 41 ++++----- .../state/changelog/StateChangeLoggerTestBase.java | 5 ++ 12 files changed, 169 insertions(+), 111 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 96961ed..0ea7337 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -64,6 +64,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkState; + /** * Base (De)serializer for checkpoint metadata format version 2 and 3. * @@ -322,10 +324,8 @@ public abstract class MetadataV2V3SerializerBase { dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); dos.writeLong(handle.getFrom()); dos.writeLong(handle.getTo()); - List<StateChange> list = new ArrayList<>(); - handle.getChanges(null).forEachRemaining(list::add); - dos.writeInt(list.size()); - for (StateChange change : list) { + dos.writeInt(handle.getChanges().size()); + for (StateChange change : handle.getChanges()) { dos.writeInt(change.getKeyGroup()); dos.writeInt(change.getChange().length); dos.write(change.getChange()); @@ -419,7 +419,7 @@ public abstract class MetadataV2V3SerializerBase { int keyGroup = dis.readInt(); int bytesSize = dis.readInt(); byte[] bytes = new byte[bytesSize]; - dis.read(bytes); + checkState(bytesSize == dis.read(bytes)); changes.add(new StateChange(keyGroup, bytes)); } return new InMemoryStateChangelogHandle(changes, from, to, keyGroupRange); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java index a4a8656..c200b28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java @@ -19,17 +19,7 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.util.CloseableIterator; -import java.io.IOException; - -/** - * A handle to saved {@link StateChange state changes}. - * - * @param <ReaderContext> type of context used while reading (on TM). - */ +/** A handle to saved {@link StateChange state changes}. */ @Internal -public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle { - - CloseableIterator<StateChange> getChanges(ReaderContext context) throws IOException; -} +public interface StateChangelogHandle extends KeyedStateHandle {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java similarity index 73% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java index a4a8656..9edd7e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java @@ -18,18 +18,12 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.util.CloseableIterator; import java.io.IOException; -/** - * A handle to saved {@link StateChange state changes}. - * - * @param <ReaderContext> type of context used while reading (on TM). - */ +/** Allows to read state changelog referenced by the provided {@link StateChangelogHandle}. */ @Internal -public interface StateChangelogHandle<ReaderContext> extends KeyedStateHandle { - - CloseableIterator<StateChange> getChanges(ReaderContext context) throws IOException; +public interface StateChangelogHandleReader<Handle extends StateChangelogHandle> { + CloseableIterator<StateChange> getChanges(Handle handle) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java new file mode 100644 index 0000000..482590c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.changelog; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; + +/** + * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its underlying {@link + * StreamStateHandle stream handles} and offsets. Starting from each offset, it enumerates the + * {@link StateChange state changes} using the provided {@link StateChangeIterator}. Different + * {@link StateChangelogStorage} implementations may have different <b>iterator</b> implementations. + * Using a different {@link StateChangelogHandle} (and reader) is problematic as it needs to be + * serialized. + */ +@Internal +public class StateChangelogHandleStreamHandleReader + implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> { + private static final Logger LOG = + LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class); + + /** Reads a stream of state changes starting from a specified offset. */ + public interface StateChangeIterator { + CloseableIterator<StateChange> read(StreamStateHandle handle, long offset); + } + + private final StateChangeIterator changeIterator; + + public StateChangelogHandleStreamHandleReader(StateChangeIterator changeIterator) { + this.changeIterator = changeIterator; + } + + @Override + public CloseableIterator<StateChange> getChanges(StateChangelogHandleStreamImpl handle) + throws IOException { + return new CloseableIterator<StateChange>() { + private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator = + handle.getHandlesAndOffsets().iterator(); + + private CloseableIterator<StateChange> current = CloseableIterator.empty(); + + @Override + public boolean hasNext() { + advance(); + return current.hasNext(); + } + + @Override + public StateChange next() { + advance(); + return current.next(); + } + + private void advance() { + while (!current.hasNext() && handleIterator.hasNext()) { + try { + current.close(); + Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); + LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); + current = changeIterator.read(tuple2.f0, tuple2.f1); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + } + } + + @Override + public void close() throws Exception { + current.close(); + } + }; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java index 16a4e60..24c1405 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java @@ -28,24 +28,19 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.CloseableIterator; -import org.apache.flink.util.ExceptionUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; /** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */ @Internal -public final class StateChangelogHandleStreamImpl - implements StateChangelogHandle<StateChangelogHandleStreamImpl.StateChangeStreamReader> { +public final class StateChangelogHandleStreamImpl implements StateChangelogHandle { + private static final long serialVersionUID = -8070326169926626355L; - private static final Logger LOG = LoggerFactory.getLogger(StateChangelogHandleStreamImpl.class); private final KeyGroupRange keyGroupRange; /** NOTE: order is important as it reflects the order of changes. */ @@ -96,46 +91,6 @@ public final class StateChangelogHandleStreamImpl } @Override - public CloseableIterator<StateChange> getChanges(StateChangeStreamReader reader) { - return new CloseableIterator<StateChange>() { - private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator = - handlesAndOffsets.iterator(); - - private CloseableIterator<StateChange> current = CloseableIterator.empty(); - - @Override - public boolean hasNext() { - advance(); - return current.hasNext(); - } - - @Override - public StateChange next() { - advance(); - return current.next(); - } - - private void advance() { - while (!current.hasNext() && handleIterator.hasNext()) { - try { - current.close(); - Tuple2<StreamStateHandle, Long> tuple2 = handleIterator.next(); - LOG.debug("read at {} from {}", tuple2.f1, tuple2.f0); - current = reader.read(tuple2.f0, tuple2.f1); - } catch (Exception e) { - ExceptionUtils.rethrow(e); - } - } - } - - @Override - public void close() throws Exception { - current.close(); - } - }; - } - - @Override public void discardState() { handlesAndOffsets.forEach( handleAndOffset -> stateRegistry.unregisterReference(getKey(handleAndOffset.f0))); @@ -168,6 +123,6 @@ public final class StateChangelogHandleStreamImpl } public List<Tuple2<StreamStateHandle, Long>> getHandlesAndOffsets() { - return handlesAndOffsets; + return Collections.unmodifiableList(handlesAndOffsets); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java index a206d3e..48cf9c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java @@ -24,8 +24,10 @@ import java.util.concurrent.CompletableFuture; /** Allows to write data to the log. Scoped to a single writer (e.g. state backend). */ @Internal -public interface StateChangelogWriter<Handle extends StateChangelogHandle<?>> - extends AutoCloseable { +public interface StateChangelogWriter<Handle extends StateChangelogHandle> extends AutoCloseable { + + /** Get the initial {@link SequenceNumber} that is used for the first element. */ + SequenceNumber initialSequenceNumber(); /** * Get {@link SequenceNumber} of the last element added by {@link #append(int, byte[]) append}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java index e5f1100..072cd15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriterFactory.java @@ -22,15 +22,17 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.KeyGroupRange; /** - * {@link StateChangelogWriter} factory. Scoped to a single entity (e.g. a SubTask or - * OperatorCoordinator). Please use {@link StateChangelogWriterFactoryLoader} to obtain an instance. + * A factory for {@link StateChangelogWriter} and {@link StateChangelogHandleReader}. Please use + * {@link StateChangelogWriterFactoryLoader} to obtain an instance. */ @Internal -public interface StateChangelogWriterFactory<Handle extends StateChangelogHandle<?>> +public interface StateChangelogWriterFactory<Handle extends StateChangelogHandle> extends AutoCloseable { StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange keyGroupRange); + StateChangelogHandleReader<Handle> createReader(); + @Override default void close() throws Exception {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java index 0896dc7..f492cad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java @@ -24,15 +24,15 @@ import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogHandle; -import org.apache.flink.util.CloseableIterator; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; /** In-memory {@link StateChangelogHandle}. */ @Internal -public class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> { +public class InMemoryStateChangelogHandle implements StateChangelogHandle { private static final long serialVersionUID = 1L; @@ -65,9 +65,8 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle<Void> return changes.stream().mapToLong(change -> change.getChange().length).sum(); } - @Override - public CloseableIterator<StateChange> getChanges(Void unused) { - return CloseableIterator.fromList(changes, change -> {}); + public List<StateChange> getChanges() { + return Collections.unmodifiableList(changes); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index fa99f69..c63fb93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -44,11 +44,12 @@ import static java.util.concurrent.CompletableFuture.completedFuture; @NotThreadSafe class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> { private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class); + private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L); private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap<>(); private final KeyGroupRange keyGroupRange; - private long sqn = 0L; + private SequenceNumber sqn = INITIAL_SQN; private boolean closed; public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) { @@ -59,14 +60,18 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState public void append(int keyGroup, byte[] value) { Preconditions.checkState(!closed, "LogWriter is closed"); LOG.trace("append, keyGroup={}, {} bytes", keyGroup, value.length); - changesByKeyGroup - .computeIfAbsent(keyGroup, unused -> new TreeMap<>()) - .put(SequenceNumber.of(++sqn), value); + sqn = sqn.next(); + changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new TreeMap<>()).put(sqn, value); + } + + @Override + public SequenceNumber initialSequenceNumber() { + return INITIAL_SQN; } @Override public SequenceNumber lastAppendedSequenceNumber() { - return SequenceNumber.of(sqn); + return sqn; } @Override @@ -74,8 +79,7 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState LOG.debug("Persist after {}", from); Preconditions.checkNotNull(from); return completedFuture( - new InMemoryStateChangelogHandle( - collectChanges(from), from, SequenceNumber.of(sqn), keyGroupRange)); + new InMemoryStateChangelogHandle(collectChanges(from), from, sqn, keyGroupRange)); } private List<StateChange> collectChanges(SequenceNumber after) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java index 255de2a..9383c26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriterFactory.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory; +import org.apache.flink.util.CloseableIterator; /** An in-memory (non-production) implementation of {@link StateChangelogWriterFactory}. */ public class InMemoryStateChangelogWriterFactory @@ -29,4 +31,9 @@ public class InMemoryStateChangelogWriterFactory String operatorID, KeyGroupRange keyGroupRange) { return new InMemoryStateChangelogWriter(keyGroupRange); } + + @Override + public StateChangelogHandleReader<InMemoryStateChangelogHandle> createReader() { + return handle -> CloseableIterator.fromList(handle.getChanges(), change -> {}); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java index 126783e..01cc322 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogWriterFactoryTest.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; import org.apache.flink.runtime.state.changelog.StateChangelogHandle; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.changelog.StateChangelogWriterFactory; import org.apache.flink.util.CloseableIterator; @@ -30,6 +31,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -47,14 +49,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; /** {@link InMemoryStateChangelogWriterFactory} test. */ -public class StateChangelogWriterFactoryTest { +public class StateChangelogWriterFactoryTest<T extends StateChangelogHandle> { private final Random random = new Random(); @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test(expected = IllegalStateException.class) - public void testNoAppendAfterClose() { + public void testNoAppendAfterClose() throws IOException { StateChangelogWriter<?> writer = getFactory().createWriter(new OperatorID().toString(), KeyGroupRange.of(0, 0)); writer.close(); @@ -66,16 +68,22 @@ public class StateChangelogWriterFactoryTest { KeyGroupRange kgRange = KeyGroupRange.of(0, 5); Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20); - try (StateChangelogWriterFactory<?> client = getFactory(); - StateChangelogWriter<?> writer = + try (StateChangelogWriterFactory<T> client = getFactory(); + StateChangelogWriter<T> writer = client.createWriter(new OperatorID().toString(), kgRange)) { - SequenceNumber prev = writer.lastAppendedSequenceNumber(); - appendsByKeyGroup.forEach( - (group, appends) -> appends.forEach(bytes -> writer.append(group, bytes))); + SequenceNumber prev = writer.initialSequenceNumber(); + for (Map.Entry<Integer, List<byte[]>> entry : appendsByKeyGroup.entrySet()) { + Integer group = entry.getKey(); + List<byte[]> appends = entry.getValue(); + for (byte[] bytes : appends) { + writer.append(group, bytes); + } + } - StateChangelogHandle<?> handle = writer.persist(prev.next()).get(); + T handle = writer.persist(prev).get(); + StateChangelogHandleReader<T> reader = client.createReader(); - assertByteMapsEqual(appendsByKeyGroup, extract(handle)); + assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader)); } } @@ -94,11 +102,10 @@ public class StateChangelogWriterFactoryTest { } } - private Map<Integer, List<byte[]>> extract(StateChangelogHandle<?> handle) throws Exception { + private Map<Integer, List<byte[]>> extract(T handle, StateChangelogHandleReader<T> reader) + throws Exception { Map<Integer, List<byte[]>> changes = new HashMap<>(); - //noinspection unchecked - StateChangelogHandle<Object> objHandle = (StateChangelogHandle<Object>) handle; - try (CloseableIterator<StateChange> it = objHandle.getChanges(getContext())) { + try (CloseableIterator<StateChange> it = reader.getChanges(handle)) { while (it.hasNext()) { StateChange change = it.next(); changes.computeIfAbsent(change.getKeyGroup(), k -> new ArrayList<>()) @@ -126,11 +133,7 @@ public class StateChangelogWriterFactoryTest { return bytes; } - private InMemoryStateChangelogWriterFactory getFactory() { - return new InMemoryStateChangelogWriterFactory(); - } - - private Object getContext() { - return null; + protected StateChangelogWriterFactory<T> getFactory() { + return (StateChangelogWriterFactory<T>) new InMemoryStateChangelogWriterFactory(); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java index fd79d86..1d2572c 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java @@ -109,6 +109,11 @@ abstract class StateChangeLoggerTestBase<Namespace> { } @Override + public SequenceNumber initialSequenceNumber() { + throw new UnsupportedOperationException(); + } + + @Override public SequenceNumber lastAppendedSequenceNumber() { throw new UnsupportedOperationException(); }
