This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cf378347fceb222e9175abe8f51742332063bf1 Author: Roman Khachatryan <[email protected]> AuthorDate: Mon Jun 21 18:08:53 2021 +0200 [hotfix][state/changelog] Rename StateChangelogHandle to ChangelogStateHandle --- .../metadata/MetadataV2V3SerializerBase.java | 22 +++++++++++----------- .../changelog/ChangelogStateBackendHandle.java | 12 ++++++------ ...ngelogHandle.java => ChangelogStateHandle.java} | 2 +- ...pl.java => ChangelogStateHandleStreamImpl.java} | 10 +++++----- .../changelog/StateChangelogHandleReader.java | 4 ++-- .../StateChangelogHandleStreamHandleReader.java | 8 ++++---- .../state/changelog/StateChangelogStorage.java | 2 +- .../state/changelog/StateChangelogWriter.java | 2 +- ...ndle.java => InMemoryChangelogStateHandle.java} | 10 +++++----- .../inmemory/InMemoryStateChangelogStorage.java | 4 ++-- .../inmemory/InMemoryStateChangelogWriter.java | 6 +++--- .../inmemory/StateChangelogStorageTest.java | 4 ++-- .../changelog/ChangelogKeyedStateBackend.java | 12 ++++++------ .../restore/ChangelogBackendRestoreOperation.java | 8 ++++---- 14 files changed, 53 insertions(+), 53 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 1be1fa5..b772232 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -39,10 +39,10 @@ import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl; import org.apache.flink.runtime.state.changelog.StateChange; -import org.apache.flink.runtime.state.changelog.StateChangelogHandle; -import org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl; -import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle; +import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle; @@ -337,8 +337,8 @@ public abstract class MetadataV2V3SerializerBase { serializeKeyedStateHandle(k, dos); } - } else if (stateHandle instanceof InMemoryStateChangelogHandle) { - InMemoryStateChangelogHandle handle = (InMemoryStateChangelogHandle) stateHandle; + } else if (stateHandle instanceof InMemoryChangelogStateHandle) { + InMemoryChangelogStateHandle handle = (InMemoryChangelogStateHandle) stateHandle; dos.writeByte(CHANGELOG_BYTE_INCREMENT_HANDLE); dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup()); dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); @@ -351,8 +351,8 @@ public abstract class MetadataV2V3SerializerBase { dos.write(change.getChange()); } - } else if (stateHandle instanceof StateChangelogHandleStreamImpl) { - StateChangelogHandleStreamImpl handle = (StateChangelogHandleStreamImpl) stateHandle; + } else if (stateHandle instanceof ChangelogStateHandleStreamImpl) { + ChangelogStateHandleStreamImpl handle = (ChangelogStateHandleStreamImpl) stateHandle; dos.writeByte(CHANGELOG_FILE_INCREMENT_HANDLE); dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup()); dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups()); @@ -438,9 +438,9 @@ public abstract class MetadataV2V3SerializerBase { base.add(deserializeKeyedStateHandle(dis, context)); } int deltaSize = dis.readInt(); - List<StateChangelogHandle> delta = new ArrayList<>(deltaSize); + List<ChangelogStateHandle> delta = new ArrayList<>(deltaSize); for (int i = 0; i < deltaSize; i++) { - delta.add((StateChangelogHandle) deserializeKeyedStateHandle(dis, context)); + delta.add((ChangelogStateHandle) deserializeKeyedStateHandle(dis, context)); } return new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl( base, delta, keyGroupRange); @@ -460,7 +460,7 @@ public abstract class MetadataV2V3SerializerBase { checkState(bytesSize == dis.read(bytes)); changes.add(new StateChange(keyGroup, bytes)); } - return new InMemoryStateChangelogHandle(changes, from, to, keyGroupRange); + return new InMemoryChangelogStateHandle(changes, from, to, keyGroupRange); } else if (CHANGELOG_FILE_INCREMENT_HANDLE == type) { int start = dis.readInt(); @@ -475,7 +475,7 @@ public abstract class MetadataV2V3SerializerBase { streamHandleAndOffset.add(Tuple2.of(h, o)); } long size = dis.readLong(); - return new StateChangelogHandleStreamImpl(streamHandleAndOffset, keyGroupRange, size); + return new ChangelogStateHandleStreamImpl(streamHandleAndOffset, keyGroupRange, size); } else { throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java index 7940210..1046fa0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -46,17 +46,17 @@ import static org.apache.flink.util.Preconditions.checkArgument; public interface ChangelogStateBackendHandle extends KeyedStateHandle { List<KeyedStateHandle> getMaterializedStateHandles(); - List<StateChangelogHandle> getNonMaterializedStateHandles(); + List<ChangelogStateHandle> getNonMaterializedStateHandles(); class ChangelogStateBackendHandleImpl implements ChangelogStateBackendHandle { private static final long serialVersionUID = 1L; private final List<KeyedStateHandle> materialized; - private final List<StateChangelogHandle> nonMaterialized; + private final List<ChangelogStateHandle> nonMaterialized; private final KeyGroupRange keyGroupRange; public ChangelogStateBackendHandleImpl( List<KeyedStateHandle> materialized, - List<StateChangelogHandle> nonMaterialized, + List<ChangelogStateHandle> nonMaterialized, KeyGroupRange keyGroupRange) { this.materialized = unmodifiableList(materialized); this.nonMaterialized = unmodifiableList(nonMaterialized); @@ -96,11 +96,11 @@ public interface ChangelogStateBackendHandle extends KeyedStateHandle { .map(handle -> handle.getIntersection(keyGroupRange)) .filter(Objects::nonNull) .collect(Collectors.toList()); - List<StateChangelogHandle> deltaPart = + List<ChangelogStateHandle> deltaPart = this.nonMaterialized.stream() .map( handle -> - (StateChangelogHandle) + (ChangelogStateHandle) handle.getIntersection(keyGroupRange)) .filter(Objects::nonNull) .collect(Collectors.toList()); @@ -119,7 +119,7 @@ public interface ChangelogStateBackendHandle extends KeyedStateHandle { } @Override - public List<StateChangelogHandle> getNonMaterializedStateHandles() { + public List<ChangelogStateHandle> getNonMaterializedStateHandles() { return nonMaterialized; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java index c200b28..070afeb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandle.java @@ -22,4 +22,4 @@ import org.apache.flink.runtime.state.KeyedStateHandle; /** A handle to saved {@link StateChange state changes}. */ @Internal -public interface StateChangelogHandle extends KeyedStateHandle {} +public interface ChangelogStateHandle extends KeyedStateHandle {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java similarity index 94% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java index 24c1405..d36b8ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateHandleStreamImpl.java @@ -36,9 +36,9 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -/** {@link StateChangelogHandle} implementation based on {@link StreamStateHandle}. */ +/** {@link ChangelogStateHandle} implementation based on {@link StreamStateHandle}. */ @Internal -public final class StateChangelogHandleStreamImpl implements StateChangelogHandle { +public final class ChangelogStateHandleStreamImpl implements ChangelogStateHandle { private static final long serialVersionUID = -8070326169926626355L; @@ -49,7 +49,7 @@ public final class StateChangelogHandleStreamImpl implements StateChangelogHandl private transient SharedStateRegistry stateRegistry; private final long size; - public StateChangelogHandleStreamImpl( + public ChangelogStateHandleStreamImpl( List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, KeyGroupRange keyGroupRange, long size) { @@ -58,7 +58,7 @@ public final class StateChangelogHandleStreamImpl implements StateChangelogHandl this.size = size; } - public StateChangelogHandleStreamImpl( + public ChangelogStateHandleStreamImpl( List<Tuple3<StreamStateHandle, Long, Long>> sorted, KeyGroupRange keyGroupRange) { this( sorted.stream().map(t -> Tuple2.of(t.f0, t.f1)).collect(Collectors.toList()), @@ -87,7 +87,7 @@ public final class StateChangelogHandleStreamImpl implements StateChangelogHandl if (offsets.getNumberOfKeyGroups() == 0) { return null; } - return new StateChangelogHandleStreamImpl(handlesAndOffsets, offsets, 0L /* unknown */); + return new ChangelogStateHandleStreamImpl(handlesAndOffsets, offsets, 0L /* unknown */); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java index 9edd7e8..7c71ac1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleReader.java @@ -22,8 +22,8 @@ import org.apache.flink.util.CloseableIterator; import java.io.IOException; -/** Allows to read state changelog referenced by the provided {@link StateChangelogHandle}. */ +/** Allows to read state changelog referenced by the provided {@link ChangelogStateHandle}. */ @Internal -public interface StateChangelogHandleReader<Handle extends StateChangelogHandle> { +public interface StateChangelogHandleReader<Handle extends ChangelogStateHandle> { CloseableIterator<StateChange> getChanges(Handle handle) throws IOException; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java index e586d95..900d781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java @@ -30,16 +30,16 @@ import java.io.IOException; import java.util.Iterator; /** - * A reader for {@link StateChangelogHandleStreamImpl} that iterates over its underlying {@link + * A reader for {@link ChangelogStateHandleStreamImpl} that iterates over its underlying {@link * StreamStateHandle stream handles} and offsets. Starting from each offset, it enumerates the * {@link StateChange state changes} using the provided {@link StateChangeIterator}. Different * {@link StateChangelogStorage} implementations may have different <b>iterator</b> implementations. - * Using a different {@link StateChangelogHandle} (and reader) is problematic as it needs to be + * Using a different {@link ChangelogStateHandle} (and reader) is problematic as it needs to be * serialized. */ @Internal public class StateChangelogHandleStreamHandleReader - implements StateChangelogHandleReader<StateChangelogHandleStreamImpl> { + implements StateChangelogHandleReader<ChangelogStateHandleStreamImpl> { private static final Logger LOG = LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class); @@ -56,7 +56,7 @@ public class StateChangelogHandleStreamHandleReader } @Override - public CloseableIterator<StateChange> getChanges(StateChangelogHandleStreamImpl handle) + public CloseableIterator<StateChange> getChanges(ChangelogStateHandleStreamImpl handle) throws IOException { return new CloseableIterator<StateChange>() { private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java index 699d585..1cc08d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorage.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; * {@link StateChangelogStorageLoader} to obtain an instance. */ @Internal -public interface StateChangelogStorage<Handle extends StateChangelogHandle> extends AutoCloseable { +public interface StateChangelogStorage<Handle extends ChangelogStateHandle> extends AutoCloseable { StateChangelogWriter<Handle> createWriter(String operatorID, KeyGroupRange keyGroupRange); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java index 48cf9c9..694973d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java @@ -24,7 +24,7 @@ import java.util.concurrent.CompletableFuture; /** Allows to write data to the log. Scoped to a single writer (e.g. state backend). */ @Internal -public interface StateChangelogWriter<Handle extends StateChangelogHandle> extends AutoCloseable { +public interface StateChangelogWriter<Handle extends ChangelogStateHandle> extends AutoCloseable { /** Get the initial {@link SequenceNumber} that is used for the first element. */ SequenceNumber initialSequenceNumber(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java similarity index 92% rename from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java index 8735f98..7f5695b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java @@ -21,18 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; -import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import javax.annotation.Nullable; import java.util.Collections; import java.util.List; -/** In-memory {@link StateChangelogHandle}. */ +/** In-memory {@link ChangelogStateHandle}. */ @Internal -public class InMemoryStateChangelogHandle implements StateChangelogHandle { +public class InMemoryChangelogStateHandle implements ChangelogStateHandle { private static final long serialVersionUID = 1L; @@ -41,12 +41,12 @@ public class InMemoryStateChangelogHandle implements StateChangelogHandle { private final SequenceNumber to; // for debug purposes private final KeyGroupRange keyGroupRange; - public InMemoryStateChangelogHandle( + public InMemoryChangelogStateHandle( List<StateChange> changes, long from, long to, KeyGroupRange keyGroupRange) { this(changes, SequenceNumber.of(from), SequenceNumber.of(to), keyGroupRange); } - public InMemoryStateChangelogHandle( + public InMemoryChangelogStateHandle( List<StateChange> changes, SequenceNumber from, SequenceNumber to, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java index 2a751165..c9bfdfb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorage.java @@ -24,7 +24,7 @@ import org.apache.flink.util.CloseableIterator; /** An in-memory (non-production) implementation of {@link StateChangelogStorage}. */ public class InMemoryStateChangelogStorage - implements StateChangelogStorage<InMemoryStateChangelogHandle> { + implements StateChangelogStorage<InMemoryChangelogStateHandle> { @Override public InMemoryStateChangelogWriter createWriter( @@ -33,7 +33,7 @@ public class InMemoryStateChangelogStorage } @Override - public StateChangelogHandleReader<InMemoryStateChangelogHandle> createReader() { + public StateChangelogHandleReader<InMemoryChangelogStateHandle> createReader() { return handle -> CloseableIterator.fromList(handle.getChanges(), change -> {}); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index 3c15984..cc12045 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -42,7 +42,7 @@ import java.util.stream.Stream; import static java.util.concurrent.CompletableFuture.completedFuture; @NotThreadSafe -class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryStateChangelogHandle> { +class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChangelogStateHandle> { private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class); private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0L); @@ -75,11 +75,11 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryState } @Override - public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) { + public CompletableFuture<InMemoryChangelogStateHandle> persist(SequenceNumber from) { LOG.debug("Persist after {}", from); Preconditions.checkNotNull(from); return completedFuture( - new InMemoryStateChangelogHandle(collectChanges(from), from, sqn, keyGroupRange)); + new InMemoryChangelogStateHandle(collectChanges(from), from, sqn, keyGroupRange)); } private List<StateChange> collectChanges(SequenceNumber after) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java index cb31c38..a0dfa8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.state.changelog.inmemory; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; -import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; import org.apache.flink.runtime.state.changelog.StateChangelogStorage; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; @@ -49,7 +49,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; /** {@link InMemoryStateChangelogStorage} test. */ -public class StateChangelogStorageTest<T extends StateChangelogHandle> { +public class StateChangelogStorageTest<T extends ChangelogStateHandle> { private final Random random = new Random(); diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index f7a83f0..375ced2 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -46,8 +46,8 @@ import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.TestableKeyedStateBackend; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.SequenceNumber; -import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.InternalKeyContext; @@ -130,7 +130,7 @@ public class ChangelogKeyedStateBackend<K> private final TtlTimeProvider ttlTimeProvider; - private final StateChangelogWriter<StateChangelogHandle> stateChangelogWriter; + private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter; private long lastCheckpointId = -1L; @@ -150,7 +150,7 @@ public class ChangelogKeyedStateBackend<K> /** Updated initially on restore and later cleared upon materialization (in FLINK-21357). */ @GuardedBy("materialized") - private final List<StateChangelogHandle> restoredNonMaterialized = new ArrayList<>(); + private final List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>(); /** * {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link @@ -181,7 +181,7 @@ public class ChangelogKeyedStateBackend<K> AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, - StateChangelogWriter<StateChangelogHandle> stateChangelogWriter, + StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter, Collection<ChangelogStateBackendHandle> initialState) { this.keyedStateBackend = keyedStateBackend; this.executionConfig = executionConfig; @@ -324,13 +324,13 @@ public class ChangelogKeyedStateBackend<K> .thenApply(this::buildSnapshotResult)); } - private SnapshotResult<KeyedStateHandle> buildSnapshotResult(StateChangelogHandle delta) { + private SnapshotResult<KeyedStateHandle> buildSnapshotResult(ChangelogStateHandle delta) { // Can be called by either task thread during the sync checkpoint phase (if persist future // was already completed); or by the writer thread otherwise. So need to synchronize. // todo: revisit after FLINK-21357 - use mailbox action? synchronized (materialized) { // collections don't change once started and handles are immutable - List<StateChangelogHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); + List<ChangelogStateHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); if (delta != null && delta.getStateSize() > 0) { prevDeltaCopy.add(delta); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java index 528e0ec..4ab881c 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java @@ -21,8 +21,8 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateHandle; import org.apache.flink.runtime.state.changelog.StateChange; -import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; import org.apache.flink.state.changelog.ChangelogKeyedStateBackend; import org.apache.flink.util.CloseableIterator; @@ -55,7 +55,7 @@ public class ChangelogBackendRestoreOperation { ChangelogKeyedStateBackend<K>, Exception> {} - public static <K, T extends StateChangelogHandle> ChangelogKeyedStateBackend<K> restore( + public static <K, T extends ChangelogStateHandle> ChangelogKeyedStateBackend<K> restore( StateChangelogHandleReader<T> changelogHandleReader, ClassLoader classLoader, Collection<ChangelogStateBackendHandle> stateHandles, @@ -76,13 +76,13 @@ public class ChangelogBackendRestoreOperation { } @SuppressWarnings("unchecked") - private static <T extends StateChangelogHandle> void readBackendHandle( + private static <T extends ChangelogStateHandle> void readBackendHandle( ChangelogKeyedStateBackend<?> backend, ChangelogStateBackendHandle backendHandle, StateChangelogHandleReader<T> changelogHandleReader, ClassLoader classLoader) throws Exception { - for (StateChangelogHandle changelogHandle : + for (ChangelogStateHandle changelogHandle : backendHandle.getNonMaterializedStateHandles()) { try (CloseableIterator<StateChange> changes = changelogHandleReader.getChanges((T) changelogHandle)) {
