This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fe5df06c727138ef02359aa3b23ef0e4b80a57df Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Jun 3 18:32:37 2021 +0200 [FLINK-21356][state/changelog] Implement recovery using changelog Both materialized and non-materialized states are read for recovery. Materialization will be implemented in subsequent commits. For now, changelog grows indefinitely. --- .../runtime/state/AbstractKeyedStateBackend.java | 8 +- .../flink/runtime/state/changelog/StateChange.java | 3 +- .../StateChangelogHandleStreamHandleReader.java | 1 + .../state/metainfo/StateMetaInfoSnapshot.java | 26 ++- .../state/changelog/AbstractChangelogState.java | 2 +- .../state/changelog/AbstractStateChangeLogger.java | 79 +++------ .../state/changelog/ChangelogAggregatingState.java | 22 ++- .../ChangelogKeyGroupedPriorityQueue.java | 13 +- .../changelog/ChangelogKeyedStateBackend.java | 110 ++++++++++--- .../flink/state/changelog/ChangelogListState.java | 21 ++- .../flink/state/changelog/ChangelogMapState.java | 21 ++- .../state/changelog/ChangelogReducingState.java | 21 ++- .../flink/state/changelog/ChangelogState.java | 36 ++--- .../state/changelog/ChangelogStateBackend.java | 121 +++++++------- .../flink/state/changelog/ChangelogValueState.java | 22 ++- .../state/changelog/KvStateChangeLoggerImpl.java | 2 +- .../state/changelog/StateChangeOperation.java | 69 ++++++++ .../restore/AggregatingStateChangeApplier.java | 57 +++++++ .../changelog/restore/ChangelogApplierFactory.java | 55 +++++++ .../restore/ChangelogApplierFactoryImpl.java | 69 ++++++++ .../restore/ChangelogBackendLogApplier.java | 176 +++++++++++++++++++++ .../restore/ChangelogBackendRestoreOperation.java | 107 +++++++++++++ .../restore/FunctionDelegationHelper.java | 149 +++++++++++++++++ .../changelog/restore/KvStateChangeApplier.java | 62 ++++++++ .../changelog/restore/ListStateChangeApplier.java | 69 ++++++++ .../changelog/restore/MapStateChangeApplier.java | 65 ++++++++ .../restore/PriorityQueueStateChangeApplier.java | 54 +++++++ .../restore/ReducingStateChangeApplier.java | 57 +++++++ .../changelog/restore/StateChangeApplier.java | 32 +--- .../changelog/restore/ValueStateChangeApplier.java | 53 +++++++ .../state/changelog/ChangelogListStateTest.java | 7 +- .../state/changelog/ChangelogMapStateTest.java | 7 +- .../changelog/KvStateChangeLoggerImplTest.java | 3 +- .../PriorityQueueStateChangeLoggerImplTest.java | 3 +- .../state/changelog/StateChangeLoggerTestBase.java | 3 +- 35 files changed, 1392 insertions(+), 213 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 7f36c84..f62e6c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -53,7 +53,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public abstract class AbstractKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, - TestableKeyedStateBackend<K> { + TestableKeyedStateBackend<K>, + InternalKeyContext<K> { /** The key serializer. */ protected final TypeSerializer<K> keySerializer; @@ -394,4 +395,9 @@ public abstract class AbstractKeyedStateBackend<K> final StateDescriptor<S, ?> stateDescriptor) throws Exception; } + + @Override + public void setCurrentKeyGroupIndex(int currentKeyGroupIndex) { + keyContext.setCurrentKeyGroupIndex(currentKeyGroupIndex); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java index 7ae60f0..e777875 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java @@ -28,7 +28,8 @@ public class StateChange { private final byte[] change; public StateChange(int keyGroup, byte[] change) { - Preconditions.checkArgument(keyGroup >= 0); + // todo: enable check in FLINK-23035 + // Preconditions.checkArgument(keyGroup >= 0); this.keyGroup = keyGroup; this.change = Preconditions.checkNotNull(change); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java index 482590c..e586d95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.java @@ -45,6 +45,7 @@ public class StateChangelogHandleStreamHandleReader /** Reads a stream of state changes starting from a specified offset. */ public interface StateChangeIterator { + // todo: add implementation (FLINK-21353) CloseableIterator<StateChange> read(StreamStateHandle handle, long offset); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java index 6ade74b..72a3152 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshot.java @@ -38,10 +38,28 @@ public class StateMetaInfoSnapshot { /** Enum that defines the different types of state that live in Flink backends. */ public enum BackendStateType { - KEY_VALUE, - OPERATOR, - BROADCAST, - PRIORITY_QUEUE + KEY_VALUE(0), + OPERATOR(1), + BROADCAST(2), + PRIORITY_QUEUE(3); + private final byte code; + + BackendStateType(int code) { + this.code = (byte) code; + } + + public byte getCode() { + return code; + } + + public static BackendStateType byCode(int code) { + for (BackendStateType type : values()) { + if (type.code == code) { + return type; + } + } + throw new IllegalArgumentException("Unknown BackendStateType: " + code); + } } /** Predefined keys for the most common options in the meta info. */ diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java index ba6a95a..823f261 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractChangelogState.java @@ -33,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <S> Type of originally wrapped state object */ abstract class AbstractChangelogState<K, N, V, S extends InternalKvState<K, N, V>> - implements InternalKvState<K, N, V> { + implements InternalKvState<K, N, V>, ChangelogState { protected final S delegatedState; protected final KvStateChangeLogger<V, N> changeLogger; diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java index da106c8..5b21e15 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java @@ -18,9 +18,12 @@ package org.apache.flink.state.changelog; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; import org.apache.flink.util.function.ThrowingConsumer; @@ -28,20 +31,18 @@ import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; +import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE; +import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE; import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_ELEMENT; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_OR_UPDATE_ELEMENT; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET_INTERNAL; +import static org.apache.flink.state.changelog.StateChangeOperation.ADD; +import static org.apache.flink.state.changelog.StateChangeOperation.ADD_ELEMENT; +import static org.apache.flink.state.changelog.StateChangeOperation.ADD_OR_UPDATE_ELEMENT; +import static org.apache.flink.state.changelog.StateChangeOperation.CLEAR; +import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; +import static org.apache.flink.state.changelog.StateChangeOperation.REMOVE_ELEMENT; +import static org.apache.flink.state.changelog.StateChangeOperation.SET; +import static org.apache.flink.state.changelog.StateChangeOperation.SET_INTERNAL; import static org.apache.flink.util.Preconditions.checkNotNull; abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeLogger<Value, Ns> { @@ -49,6 +50,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeL protected final StateChangelogWriter<?> stateChangelogWriter; protected final InternalKeyContext<Key> keyContext; protected final RegisteredStateMetaInfoBase metaInfo; + private final StateMetaInfoSnapshot.BackendStateType stateType; private boolean metaDataWritten = false; public AbstractStateChangeLogger( @@ -58,6 +60,13 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeL this.stateChangelogWriter = checkNotNull(stateChangelogWriter); this.keyContext = checkNotNull(keyContext); this.metaInfo = checkNotNull(metaInfo); + if (metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) { + this.stateType = KEY_VALUE; + } else if (metaInfo instanceof RegisteredPriorityQueueStateBackendMetaInfo) { + this.stateType = PRIORITY_QUEUE; + } else { + throw new IllegalArgumentException("Unsupported state type: " + metaInfo); + } } @Override @@ -135,7 +144,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeL COMMON_KEY_GROUP, serializeRaw( out -> { - out.writeByte(METADATA.code); + out.writeByte(METADATA.getCode()); out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION); StateMetaInfoSnapshotReadersWriters.getWriter() .writeStateMetaInfoSnapshot(metaInfo.snapshot(), out); @@ -151,10 +160,11 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeL throws IOException { return serializeRaw( wrapper -> { - wrapper.writeByte(op.code); + wrapper.writeByte(op.getCode()); // todo: optimize in FLINK-22944 by either writing short code or grouping and // writing once (same for key, ns) wrapper.writeUTF(metaInfo.getName()); + wrapper.writeByte(stateType.getCode()); serializeScope(ns, wrapper); if (dataWriter != null) { dataWriter.accept(wrapper); @@ -168,50 +178,11 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeL private byte[] serializeRaw( ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataWriter) throws IOException { + // todo: optimize performance try (ByteArrayOutputStream out = new ByteArrayOutputStream(); DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out)) { dataWriter.accept(wrapper); return out.toByteArray(); } } - - enum StateChangeOperation { - /** Scope: key + namespace. */ - CLEAR((byte) 0), - /** Scope: key + namespace. */ - SET((byte) 1), - /** Scope: key + namespace. */ - SET_INTERNAL((byte) 2), - /** Scope: key + namespace. */ - ADD((byte) 3), - /** Scope: key + namespace, also affecting other (source) namespaces. */ - MERGE_NS((byte) 4), - /** Scope: key + namespace + element (e.g. user list append). */ - ADD_ELEMENT((byte) 5), - /** Scope: key + namespace + element (e.g. user map key put). */ - ADD_OR_UPDATE_ELEMENT((byte) 6), - /** Scope: key + namespace + element (e.g. user map remove or iterator remove). */ - REMOVE_ELEMENT((byte) 7), - /** Scope: key + namespace, first element (e.g. priority queue poll). */ - REMOVE_FIRST_ELEMENT((byte) 8), - /** State metadata (name, serializers, etc.). */ - METADATA((byte) 9); - private final byte code; - - StateChangeOperation(byte code) { - this.code = code; - } - - private static final Map<Byte, KvStateChangeLoggerImpl.StateChangeOperation> BY_CODES = - Arrays.stream(AbstractStateChangeLogger.StateChangeOperation.values()) - .collect(Collectors.toMap(o -> o.code, Function.identity())); - - public static StateChangeOperation byCode(byte opCode) { - return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode)); - } - - public byte getCode() { - return code; - } - } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java index 3a1d9c8..2fa9712 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogAggregatingState.java @@ -21,8 +21,11 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.State; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; @@ -42,10 +45,14 @@ class ChangelogAggregatingState<K, N, IN, ACC, OUT> extends AbstractChangelogState<K, N, ACC, InternalAggregatingState<K, N, IN, ACC, OUT>> implements InternalAggregatingState<K, N, IN, ACC, OUT> { + private final InternalKeyContext<K> keyContext; + ChangelogAggregatingState( InternalAggregatingState<K, N, IN, ACC, OUT> delegatedState, - KvStateChangeLogger<ACC, N> changeLogger) { + KvStateChangeLogger<ACC, N> changeLogger, + InternalKeyContext<K> keyContext) { super(delegatedState, changeLogger); + this.keyContext = keyContext; } @Override @@ -95,9 +102,18 @@ class ChangelogAggregatingState<K, N, IN, ACC, OUT> @SuppressWarnings("unchecked") static <T, K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> aggregatingState, KvStateChangeLogger<SV, N> changeLogger) { + InternalKvState<K, N, SV> aggregatingState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) { return (IS) new ChangelogAggregatingState<>( - (InternalAggregatingState<K, N, T, SV, ?>) aggregatingState, changeLogger); + (InternalAggregatingState<K, N, T, SV, ?>) aggregatingState, + changeLogger, + keyContext); + } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forAggregating(delegatedState, keyContext); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java index a864e14..eff82db 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java @@ -20,6 +20,8 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; @@ -37,7 +39,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A {@link KeyGroupedInternalPriorityQueue} that keeps state on the underlying delegated {@link * KeyGroupedInternalPriorityQueue} as well as on the state change log. */ -public class ChangelogKeyGroupedPriorityQueue<T> implements KeyGroupedInternalPriorityQueue<T> { +public class ChangelogKeyGroupedPriorityQueue<T> + implements KeyGroupedInternalPriorityQueue<T>, ChangelogState { private final KeyGroupedInternalPriorityQueue<T> delegatedPriorityQueue; private final PriorityQueueStateChangeLogger<T> logger; private final TypeSerializer<T> serializer; @@ -109,9 +112,6 @@ public class ChangelogKeyGroupedPriorityQueue<T> implements KeyGroupedInternalPr } private void logAddition(Collection<? extends T> toAdd) { - if (toAdd == null) { - return; - } try { logger.valueElementAdded( out -> { @@ -133,4 +133,9 @@ public class ChangelogKeyGroupedPriorityQueue<T> implements KeyGroupedInternalPr StateChangeLoggingIterator.create( delegatedPriorityQueue.iterator(), logger, serializer::serialize, null)); } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forPriorityQueue(delegatedPriorityQueue, serializer); + } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index 7293c7b..f7a83f0 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -44,15 +44,19 @@ import org.apache.flink.runtime.state.SavepointResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.TestableKeyedStateBackend; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChangelogHandle; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType; import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory; import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.state.changelog.restore.FunctionDelegationHelper; import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; @@ -64,9 +68,11 @@ import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.RunnableFuture; @@ -84,7 +90,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <K> The key by which state is keyed. */ @Internal -class ChangelogKeyedStateBackend<K> +public class ChangelogKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, CheckpointListener, TestableKeyedStateBackend<K> { @@ -118,11 +124,13 @@ class ChangelogKeyedStateBackend<K> */ private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName; + private final HashMap<String, ChangelogKeyGroupedPriorityQueue<?>> priorityQueueStatesByName; + private final ExecutionConfig executionConfig; private final TtlTimeProvider ttlTimeProvider; - private final StateChangelogWriter<?> stateChangelogWriter; + private final StateChangelogWriter<StateChangelogHandle> stateChangelogWriter; private long lastCheckpointId = -1L; @@ -133,13 +141,16 @@ class ChangelogKeyedStateBackend<K> /** For caching the last accessed partitioned state. */ private String lastName; + private final FunctionDelegationHelper functionDelegationHelper = + new FunctionDelegationHelper(); + /** Updated initially on restore and later upon materialization (in FLINK-21357). */ @GuardedBy("materialized") private final List<KeyedStateHandle> materialized = new ArrayList<>(); /** Updated initially on restore and later cleared upon materialization (in FLINK-21357). */ @GuardedBy("materialized") - private final List<StateChangelogHandle> nonMaterialized = new ArrayList<>(); + private final List<StateChangelogHandle> restoredNonMaterialized = new ArrayList<>(); /** * {@link SequenceNumber} denoting last upload range <b>start</b>, inclusive. Updated to {@link @@ -170,13 +181,16 @@ class ChangelogKeyedStateBackend<K> AbstractKeyedStateBackend<K> keyedStateBackend, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, - StateChangelogWriter<?> stateChangelogWriter) { + StateChangelogWriter<StateChangelogHandle> stateChangelogWriter, + Collection<ChangelogStateBackendHandle> initialState) { this.keyedStateBackend = keyedStateBackend; this.executionConfig = executionConfig; this.ttlTimeProvider = ttlTimeProvider; this.keyValueStatesByName = new HashMap<>(); + this.priorityQueueStatesByName = new HashMap<>(); this.stateChangelogWriter = stateChangelogWriter; this.materializedTo = stateChangelogWriter.initialSequenceNumber(); + this.completeRestore(initialState); } // -------------------- CheckpointableKeyedStateBackend -------------------------------- @@ -316,7 +330,7 @@ class ChangelogKeyedStateBackend<K> // todo: revisit after FLINK-21357 - use mailbox action? synchronized (materialized) { // collections don't change once started and handles are immutable - List<StateChangelogHandle> prevDeltaCopy = new ArrayList<>(nonMaterialized); + List<StateChangelogHandle> prevDeltaCopy = new ArrayList<>(restoredNonMaterialized); if (delta != null && delta.getStateSize() > 0) { prevDeltaCopy.add(delta); } @@ -332,21 +346,29 @@ class ChangelogKeyedStateBackend<K> @Nonnull @Override + @SuppressWarnings("unchecked") public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { - PriorityQueueStateChangeLoggerImpl<K, T> priorityQueueStateChangeLogger = - new PriorityQueueStateChangeLoggerImpl<>( - byteOrderedElementSerializer, - keyedStateBackend.getKeyContext(), - stateChangelogWriter, - new RegisteredPriorityQueueStateBackendMetaInfo<>( - stateName, byteOrderedElementSerializer)); - return new ChangelogKeyGroupedPriorityQueue<>( - keyedStateBackend.create(stateName, byteOrderedElementSerializer), - priorityQueueStateChangeLogger, - byteOrderedElementSerializer); + ChangelogKeyGroupedPriorityQueue<T> queue = + (ChangelogKeyGroupedPriorityQueue<T>) priorityQueueStatesByName.get(stateName); + if (queue == null) { + PriorityQueueStateChangeLoggerImpl<K, T> priorityQueueStateChangeLogger = + new PriorityQueueStateChangeLoggerImpl<>( + byteOrderedElementSerializer, + keyedStateBackend.getKeyContext(), + stateChangelogWriter, + new RegisteredPriorityQueueStateBackendMetaInfo<>( + stateName, byteOrderedElementSerializer)); + queue = + new ChangelogKeyGroupedPriorityQueue<>( + keyedStateBackend.create(stateName, byteOrderedElementSerializer), + priorityQueueStateChangeLogger, + byteOrderedElementSerializer); + priorityQueueStatesByName.put(stateName, queue); + } + return queue; } @VisibleForTesting @@ -405,6 +427,10 @@ class ChangelogKeyedStateBackend<K> + "This operation cannot use partitioned state."); InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName()); + // todo: support state migration (in FLINK-23143) + // This method is currently called both on recovery and on user access. + // So keyValueStatesByName may contain an entry for user-requested state which will + // prevent state migration (in contrast to other backends). if (kvState == null) { if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(executionConfig); @@ -418,6 +444,7 @@ class ChangelogKeyedStateBackend<K> keyValueStatesByName.put(stateDescriptor.getName(), kvState); keyedStateBackend.publishQueryableStateIfEnabled(stateDescriptor, kvState); } + functionDelegationHelper.addOrUpdate(stateDescriptor); return (S) kvState; } @@ -459,7 +486,23 @@ class ChangelogKeyedStateBackend<K> keyedStateBackend.getKeyContext(), stateChangelogWriter, meta); - return stateFactory.create(state, kvStateChangeLogger); + return stateFactory.create( + state, + kvStateChangeLogger, + keyedStateBackend /* pass the nested backend as key context so that it get key updates on recovery*/); + } + + private void completeRestore(Collection<ChangelogStateBackendHandle> stateHandles) { + if (!stateHandles.isEmpty()) { + synchronized (materialized) { // ensure visibility + for (ChangelogStateBackendHandle h : stateHandles) { + if (h != null) { + materialized.addAll(h.getMaterializedStateHandles()); + restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles()); + } + } + } + } } @Override @@ -470,10 +513,41 @@ class ChangelogKeyedStateBackend<K> // Factory function interface private interface StateFactory { <K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> kvState, KvStateChangeLogger<SV, N> changeLogger) + InternalKvState<K, N, SV> kvState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) throws Exception; } + /** + * @param name state name + * @param type state type (the only supported type currently are: {@link + * BackendStateType#KEY_VALUE key value}, {@link BackendStateType#PRIORITY_QUEUE priority + * queue}) + * @return an existing state, i.e. the one that was already created + * @throws NoSuchElementException if the state wasn't created + * @throws UnsupportedOperationException if state type is not supported + */ + public ChangelogState getExistingState(String name, BackendStateType type) + throws NoSuchElementException, UnsupportedOperationException { + ChangelogState state; + switch (type) { + case KEY_VALUE: + state = (ChangelogState) keyValueStatesByName.get(name); + break; + case PRIORITY_QUEUE: + state = priorityQueueStatesByName.get(name); + break; + default: + throw new UnsupportedOperationException( + String.format("Unknown state type %s (%s)", type, name)); + } + if (state == null) { + throw new NoSuchElementException(String.format("%s state %s not found", type, name)); + } + return state; + } + private static <T> RunnableFuture<T> toRunnableFuture(CompletableFuture<T> f) { return new RunnableFuture<T>() { @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java index 6d6ee69..9d07797 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogListState.java @@ -22,8 +22,11 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; @@ -42,10 +45,14 @@ class ChangelogListState<K, N, V> extends AbstractChangelogState<K, N, List<V>, InternalListState<K, N, V>> implements InternalListState<K, N, V> { + private final InternalKeyContext<K> keyContext; + ChangelogListState( InternalListState<K, N, V> delegatedState, - KvStateChangeLogger<List<V>, N> changeLogger) { + KvStateChangeLogger<List<V>, N> changeLogger, + InternalKeyContext<K> keyContext) { super(delegatedState, changeLogger); + this.keyContext = keyContext; } @Override @@ -111,10 +118,18 @@ class ChangelogListState<K, N, V> @SuppressWarnings("unchecked") static <K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> listState, KvStateChangeLogger<SV, N> changeLogger) { + InternalKvState<K, N, SV> listState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) { return (IS) new ChangelogListState<>( (InternalListState<K, N, SV>) listState, - (KvStateChangeLogger<List<SV>, N>) changeLogger); + (KvStateChangeLogger<List<SV>, N>) changeLogger, + keyContext); + } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forList(delegatedState, keyContext); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java index 25cd0a5..3b35dcd 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java @@ -23,8 +23,11 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.typeutils.base.MapSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingConsumer; @@ -45,10 +48,14 @@ class ChangelogMapState<K, N, UK, UV> extends AbstractChangelogState<K, N, Map<UK, UV>, InternalMapState<K, N, UK, UV>> implements InternalMapState<K, N, UK, UV> { + private final InternalKeyContext<K> keyContext; + ChangelogMapState( InternalMapState<K, N, UK, UV> delegatedState, - KvStateChangeLogger<Map<UK, UV>, N> changeLogger) { + KvStateChangeLogger<Map<UK, UV>, N> changeLogger, + InternalKeyContext<K> keyContext) { super(delegatedState, changeLogger); + this.keyContext = keyContext; } private Map.Entry<UK, UV> loggingMapEntry( @@ -210,10 +217,18 @@ class ChangelogMapState<K, N, UK, UV> @SuppressWarnings("unchecked") static <UK, UV, K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> mapState, KvStateChangeLogger<SV, N> changeLogger) { + InternalKvState<K, N, SV> mapState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) { return (IS) new ChangelogMapState<>( (InternalMapState<K, N, UK, UV>) mapState, - (KvStateChangeLogger<Map<UK, UV>, N>) changeLogger); + (KvStateChangeLogger<Map<UK, UV>, N>) changeLogger, + keyContext); + } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forMap(delegatedState, keyContext); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java index 10d7d8b..068754d 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogReducingState.java @@ -21,8 +21,11 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.State; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalReducingState; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; @@ -40,9 +43,14 @@ class ChangelogReducingState<K, N, V> extends AbstractChangelogState<K, N, V, InternalReducingState<K, N, V>> implements InternalReducingState<K, N, V> { + private final InternalKeyContext<K> keyContext; + ChangelogReducingState( - InternalReducingState<K, N, V> delegatedState, KvStateChangeLogger<V, N> changeLogger) { + InternalReducingState<K, N, V> delegatedState, + KvStateChangeLogger<V, N> changeLogger, + InternalKeyContext<K> keyContext) { super(delegatedState, changeLogger); + this.keyContext = keyContext; } @Override @@ -94,9 +102,16 @@ class ChangelogReducingState<K, N, V> @SuppressWarnings("unchecked") static <K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> reducingState, KvStateChangeLogger<SV, N> changeLogger) { + InternalKvState<K, N, SV> reducingState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) { return (IS) new ChangelogReducingState<>( - (InternalReducingState<K, N, SV>) reducingState, changeLogger); + (InternalReducingState<K, N, SV>) reducingState, changeLogger, keyContext); + } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forReducing(delegatedState, keyContext); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java similarity index 53% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java copy to flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java index 7ae60f0..fc77ae0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogState.java @@ -1,3 +1,4 @@ +package org.apache.flink.state.changelog; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,34 +16,15 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.changelog; - import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; -/** Change of state of a keyed operator. Used for generic incremental checkpoints. */ +/** + * State used by {@link ChangelogKeyedStateBackend}. Allows replaying recorded changes on recovery + * using {@link StateChangeApplier} + */ @Internal -public class StateChange { - - private final int keyGroup; - private final byte[] change; - - public StateChange(int keyGroup, byte[] change) { - Preconditions.checkArgument(keyGroup >= 0); - this.keyGroup = keyGroup; - this.change = Preconditions.checkNotNull(change); - } - - @Override - public String toString() { - return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length); - } - - public int getKeyGroup() { - return keyGroup; - } - - public byte[] getChange() { - return change; - } +public interface ChangelogState { + StateChangeApplier getChangeApplier(ChangelogApplierFactory factory); } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java index d354203..4346370f 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java @@ -39,6 +39,8 @@ import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage; import org.apache.flink.runtime.state.delegate.DelegatingStateBackend; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation; +import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation.BaseBackendBuilder; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -47,7 +49,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.util.Collection; -import java.util.Objects; import java.util.stream.Collectors; /** @@ -96,28 +97,26 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception { - AbstractKeyedStateBackend<K> keyedStateBackend = - (AbstractKeyedStateBackend<K>) - delegatedStateBackend.createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - metricGroup, - extractMaterializedState(stateHandles), - cancelStreamRegistry); - // todo: FLINK-21804 get from Environment.getTaskStateManager - InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); - // todo: apply state changes from non-materialized part of stateHandles - return new ChangelogKeyedStateBackend<>( - keyedStateBackend, - env.getExecutionConfig(), + return restore( + env, + operatorIdentifier, + keyGroupRange, ttlTimeProvider, - changelogWriterFactory.createWriter(operatorIdentifier, keyGroupRange)); + stateHandles, + baseHandles -> + (AbstractKeyedStateBackend<K>) + delegatedStateBackend.createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + ttlTimeProvider, + metricGroup, + baseHandles, + cancelStreamRegistry)); } @Override @@ -135,31 +134,27 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab CloseableRegistry cancelStreamRegistry, double managedMemoryFraction) throws Exception { - - AbstractKeyedStateBackend<K> keyedStateBackend = - (AbstractKeyedStateBackend<K>) - delegatedStateBackend.createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - metricGroup, - extractMaterializedState(stateHandles), - cancelStreamRegistry, - managedMemoryFraction); - - // todo: FLINK-21804 get from Environment.getTaskStateManager - InMemoryStateChangelogStorage changelogWriterFactory = new InMemoryStateChangelogStorage(); - // todo: apply state changes from non-materialized part of stateHandles - return new ChangelogKeyedStateBackend<>( - keyedStateBackend, - env.getExecutionConfig(), + return restore( + env, + operatorIdentifier, + keyGroupRange, ttlTimeProvider, - changelogWriterFactory.createWriter(operatorIdentifier, keyGroupRange)); + stateHandles, + baseHandles -> + (AbstractKeyedStateBackend<K>) + delegatedStateBackend.createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + ttlTimeProvider, + metricGroup, + baseHandles, + cancelStreamRegistry, + managedMemoryFraction)); } @Override @@ -196,15 +191,35 @@ public class ChangelogStateBackend implements DelegatingStateBackend, Configurab return this; } - private static Collection<KeyedStateHandle> extractMaterializedState( + @SuppressWarnings({"unchecked", "rawtypes"}) + private <K> ChangelogKeyedStateBackend<K> restore( + Environment env, + String operatorIdentifier, + KeyGroupRange keyGroupRange, + TtlTimeProvider ttlTimeProvider, + Collection<KeyedStateHandle> stateHandles, + BaseBackendBuilder<K> baseBackendBuilder) + throws Exception { + // todo: FLINK-21804 get from Environment.getTaskStateManager + InMemoryStateChangelogStorage changelogStorage = new InMemoryStateChangelogStorage(); + return ChangelogBackendRestoreOperation.restore( + changelogStorage.createReader(), + env.getUserCodeClassLoader().asClassLoader(), + castHandles(stateHandles), + baseBackendBuilder, + (baseBackend, baseState) -> + new ChangelogKeyedStateBackend( + baseBackend, + env.getExecutionConfig(), + ttlTimeProvider, + changelogStorage.createWriter(operatorIdentifier, keyGroupRange), + baseState)); + } + + private Collection<ChangelogStateBackendHandle> castHandles( Collection<KeyedStateHandle> stateHandles) { return stateHandles.stream() - .flatMap( - keyedStateHandle -> - ((ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl) - keyedStateHandle) - .getMaterializedStateHandles().stream()) - .filter(Objects::nonNull) + .map(keyedStateHandle -> (ChangelogStateBackendHandle) keyedStateHandle) .collect(Collectors.toList()); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java index 0530170..d37026b 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogValueState.java @@ -21,8 +21,11 @@ package org.apache.flink.state.changelog; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.heap.InternalKeyContext; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; +import org.apache.flink.state.changelog.restore.StateChangeApplier; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; @@ -39,9 +42,14 @@ class ChangelogValueState<K, N, V> extends AbstractChangelogState<K, N, V, InternalValueState<K, N, V>> implements InternalValueState<K, N, V> { + private final InternalKeyContext<K> keyContext; + ChangelogValueState( - InternalValueState<K, N, V> delegatedState, KvStateChangeLogger<V, N> changeLogger) { + InternalValueState<K, N, V> delegatedState, + KvStateChangeLogger<V, N> changeLogger, + InternalKeyContext<K> keyContext) { super(delegatedState, changeLogger); + this.keyContext = keyContext; } @Override @@ -67,8 +75,16 @@ class ChangelogValueState<K, N, V> @SuppressWarnings("unchecked") static <K, N, SV, S extends State, IS extends S> IS create( - InternalKvState<K, N, SV> valueState, KvStateChangeLogger<SV, N> changeLogger) { + InternalKvState<K, N, SV> valueState, + KvStateChangeLogger<SV, N> changeLogger, + InternalKeyContext<K> keyContext) { return (IS) - new ChangelogValueState<>((InternalValueState<K, N, SV>) valueState, changeLogger); + new ChangelogValueState<>( + (InternalValueState<K, N, SV>) valueState, changeLogger, keyContext); + } + + @Override + public StateChangeApplier getChangeApplier(ChangelogApplierFactory factory) { + return factory.forValue(delegatedState, keyContext); } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java index 5615ce7..b4b2350 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java @@ -28,7 +28,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.io.IOException; import java.util.Collection; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS; +import static org.apache.flink.state.changelog.StateChangeOperation.MERGE_NS; import static org.apache.flink.util.Preconditions.checkNotNull; @NotThreadSafe diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java new file mode 100644 index 0000000..c0a75bd --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeOperation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog; + +import org.apache.flink.annotation.Internal; + +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The operation applied to {@link ChangelogState}. */ +@Internal +public enum StateChangeOperation { + /** Scope: key + namespace. */ + CLEAR((byte) 0), + /** Scope: key + namespace. */ + SET((byte) 1), + /** Scope: key + namespace. */ + SET_INTERNAL((byte) 2), + /** Scope: key + namespace. */ + ADD((byte) 3), + /** Scope: key + namespace, also affecting other (source) namespaces. */ + MERGE_NS((byte) 4), + /** Scope: key + namespace + element (e.g. user list append). */ + ADD_ELEMENT((byte) 5), + /** Scope: key + namespace + element (e.g. user map key put). */ + ADD_OR_UPDATE_ELEMENT((byte) 6), + /** Scope: key + namespace + element (e.g. user map remove or iterator remove). */ + REMOVE_ELEMENT((byte) 7), + /** Scope: key + namespace, first element (e.g. priority queue poll). */ + REMOVE_FIRST_ELEMENT((byte) 8), + /** State metadata (name, serializers, etc.). */ + METADATA((byte) 9); + private final byte code; + + StateChangeOperation(byte code) { + this.code = code; + } + + private static final Map<Byte, StateChangeOperation> BY_CODES = + Arrays.stream(StateChangeOperation.values()) + .collect(Collectors.toMap(o -> o.code, Function.identity())); + + public static StateChangeOperation byCode(byte opCode) { + return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode)); + } + + public byte getCode() { + return code; + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java new file mode 100644 index 0000000..aabe8cd --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/AggregatingStateChangeApplier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.state.changelog.StateChangeOperation; + +class AggregatingStateChangeApplier<K, N, IN, SV, OUT> extends KvStateChangeApplier<K, N> { + private final InternalAggregatingState<K, N, IN, SV, OUT> state; + + protected AggregatingStateChangeApplier( + InternalKeyContext<K> keyContext, InternalAggregatingState<K, N, IN, SV, OUT> state) { + super(keyContext); + this.state = state; + } + + @Override + protected InternalKvState<K, N, ?> getState() { + return state; + } + + protected void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception { + switch (operation) { + case SET: + case SET_INTERNAL: + state.updateInternal(state.getValueSerializer().deserialize(in)); + break; + case MERGE_NS: + applyMergeNamespaces(state, in); + break; + case CLEAR: + state.clear(); + break; + default: + throw new IllegalArgumentException("Unknown state change operation: " + operation); + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java new file mode 100644 index 0000000..7c6c059 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalReducingState; +import org.apache.flink.runtime.state.internal.InternalValueState; + +/** + * {@link StateChangeApplier} factory. It's purpose is to decouple restore/apply logic from state + * logic. + */ +@Internal +public interface ChangelogApplierFactory { + + <K, N, UK, UV> KvStateChangeApplier<K, N> forMap( + InternalMapState<K, N, UK, UV> map, InternalKeyContext<K> keyContext); + + <K, N, T> KvStateChangeApplier<K, N> forValue( + InternalValueState<K, N, T> value, InternalKeyContext<K> keyContext); + + <K, N, T> KvStateChangeApplier<K, N> forList( + InternalListState<K, N, T> list, InternalKeyContext<K> keyContext); + + <K, N, T> KvStateChangeApplier<K, N> forReducing( + InternalReducingState<K, N, T> reducing, InternalKeyContext<K> keyContext); + + <K, N, IN, SV, OUT> KvStateChangeApplier<K, N> forAggregating( + InternalAggregatingState<K, N, IN, SV, OUT> aggregating, + InternalKeyContext<K> keyContext); + + <T> StateChangeApplier forPriorityQueue( + KeyGroupedInternalPriorityQueue<T> priorityQueue, TypeSerializer<T> serializer); +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java new file mode 100644 index 0000000..b11df22 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogApplierFactoryImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalReducingState; +import org.apache.flink.runtime.state.internal.InternalValueState; + +class ChangelogApplierFactoryImpl implements ChangelogApplierFactory { + public static final ChangelogApplierFactoryImpl INSTANCE = new ChangelogApplierFactoryImpl(); + + private ChangelogApplierFactoryImpl() {} + + @Override + public <K, N, UK, UV> KvStateChangeApplier<K, N> forMap( + InternalMapState<K, N, UK, UV> map, InternalKeyContext<K> keyContext) { + return new MapStateChangeApplier<>(map, keyContext); + } + + @Override + public <K, N, T> KvStateChangeApplier<K, N> forList( + InternalListState<K, N, T> state, InternalKeyContext<K> keyContext) { + return new ListStateChangeApplier<>(keyContext, state); + } + + @Override + public <K, N, T> KvStateChangeApplier<K, N> forReducing( + InternalReducingState<K, N, T> state, InternalKeyContext<K> keyContext) { + return new ReducingStateChangeApplier<>(keyContext, state); + } + + @Override + public <K, N, IN, SV, OUT> KvStateChangeApplier<K, N> forAggregating( + InternalAggregatingState<K, N, IN, SV, OUT> state, InternalKeyContext<K> keyContext) { + return new AggregatingStateChangeApplier<>(keyContext, state); + } + + @Override + public <K, N, T> KvStateChangeApplier<K, N> forValue( + InternalValueState<K, N, T> state, InternalKeyContext<K> keyContext) { + return new ValueStateChangeApplier<>(keyContext, state); + } + + @Override + public <T> StateChangeApplier forPriorityQueue( + KeyGroupedInternalPriorityQueue<T> queue, TypeSerializer<T> serializer) { + return new PriorityQueueStateChangeApplier<>(queue, serializer); + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java new file mode 100644 index 0000000..bd26dbb --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; +import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoReader; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; +import org.apache.flink.state.changelog.ChangelogKeyedStateBackend; +import org.apache.flink.state.changelog.ChangelogState; +import org.apache.flink.state.changelog.StateChangeOperation; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE; +import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateAggregateFunction; +import static org.apache.flink.state.changelog.restore.FunctionDelegationHelper.delegateReduceFunction; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Applies {@link StateChange}s to a {@link ChangelogKeyedStateBackend}. */ +@SuppressWarnings({"rawtypes", "unchecked"}) +class ChangelogBackendLogApplier { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class); + + public static void apply( + StateChange stateChange, + ChangelogKeyedStateBackend<?> changelogBackend, + ClassLoader classLoader) + throws Exception { + DataInputViewStreamWrapper in = + new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange())); + applyOperation( + StateChangeOperation.byCode(in.readByte()), + stateChange.getKeyGroup(), + changelogBackend, + in, + classLoader, + ChangelogApplierFactoryImpl.INSTANCE); + } + + private static void applyOperation( + StateChangeOperation operation, + int keyGroup, + ChangelogKeyedStateBackend<?> backend, + DataInputView in, + ClassLoader classLoader, + ChangelogApplierFactory factory) + throws Exception { + LOG.debug("apply {} in key group {}", operation, keyGroup); + if (operation == METADATA) { + applyMetaDataChange(in, backend, classLoader); + } else if (backend.getKeyGroupRange().contains(keyGroup)) { + applyDataChange(in, factory, backend, operation); + } + } + + private static void applyMetaDataChange( + DataInputView in, ChangelogKeyedStateBackend<?> backend, ClassLoader classLoader) + throws Exception { + StateMetaInfoSnapshot snapshot = readStateMetaInfoSnapshot(in, classLoader); + switch (snapshot.getBackendStateType()) { + case KEY_VALUE: + restoreKvMetaData(backend, snapshot); + return; + case PRIORITY_QUEUE: + restorePqMetaData(backend, snapshot); + return; + default: + throw new RuntimeException( + "Unsupported state type: " + + snapshot.getBackendStateType() + + ", sate: " + + snapshot.getName()); + } + } + + private static void restoreKvMetaData( + ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) + throws Exception { + RegisteredKeyValueStateBackendMetaInfo meta = + new RegisteredKeyValueStateBackendMetaInfo(snapshot); + // Use regular API to create states in both changelog and the base backends the metadata is + // persisted in log before data changes. + // An alternative solution to load metadata "natively" by the base backends would require + // base state to be always present, i.e. the 1st checkpoint would have to be "full" always. + backend.getOrCreateKeyedState(meta.getNamespaceSerializer(), toStateDescriptor(meta)); + } + + private static StateDescriptor toStateDescriptor(RegisteredKeyValueStateBackendMetaInfo meta) { + switch (meta.getStateType()) { + case VALUE: + return new ValueStateDescriptor(meta.getName(), meta.getStateSerializer()); + case MAP: + MapSerializer mapSerializer = (MapSerializer) meta.getStateSerializer(); + return new MapStateDescriptor( + meta.getName(), + mapSerializer.getKeySerializer(), + mapSerializer.getValueSerializer()); + case LIST: + return new ListStateDescriptor( + meta.getName(), + ((ListSerializer) meta.getStateSerializer()).getElementSerializer()); + case AGGREGATING: + return new AggregatingStateDescriptor( + meta.getName(), delegateAggregateFunction(), meta.getStateSerializer()); + case REDUCING: + return new ReducingStateDescriptor( + meta.getName(), delegateReduceFunction(), meta.getStateSerializer()); + default: + throw new IllegalArgumentException(meta.getStateType().toString()); + } + } + + private static void restorePqMetaData( + ChangelogKeyedStateBackend<?> backend, StateMetaInfoSnapshot snapshot) { + RegisteredPriorityQueueStateBackendMetaInfo meta = + new RegisteredPriorityQueueStateBackendMetaInfo(snapshot); + backend.create(meta.getName(), meta.getElementSerializer()); + } + + private static StateMetaInfoSnapshot readStateMetaInfoSnapshot( + DataInputView in, ClassLoader classLoader) throws IOException { + int version = in.readInt(); + StateMetaInfoReader reader = + StateMetaInfoSnapshotReadersWriters.getReader(version, KEYED_STATE); + return reader.readStateMetaInfoSnapshot(in, classLoader); + } + + private static void applyDataChange( + DataInputView in, + ChangelogApplierFactory factory, + ChangelogKeyedStateBackend<?> backend, + StateChangeOperation operation) + throws Exception { + String name = checkNotNull(in.readUTF()); + BackendStateType type = BackendStateType.byCode(in.readByte()); + ChangelogState state = backend.getExistingState(name, type); + StateChangeApplier changeApplier = state.getChangeApplier(factory); + changeApplier.apply(operation, in); + } + + private ChangelogBackendLogApplier() {} +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java new file mode 100644 index 0000000..528e0ec --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ChangelogBackendRestoreOperation.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; +import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.changelog.StateChangelogHandle; +import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader; +import org.apache.flink.state.changelog.ChangelogKeyedStateBackend; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.BiFunctionWithException; +import org.apache.flink.util.function.FunctionWithException; + +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Restores {@link ChangelogKeyedStateBackend} from the provided {@link ChangelogStateBackendHandle + * handles}. + */ +@Internal +public class ChangelogBackendRestoreOperation { + /** Builds base backend for {@link ChangelogKeyedStateBackend} from state. */ + @FunctionalInterface + public interface BaseBackendBuilder<K> + extends FunctionWithException< + Collection<KeyedStateHandle>, AbstractKeyedStateBackend<K>, Exception> {} + + /** Builds {@link ChangelogKeyedStateBackend} from the base backend and state. */ + @FunctionalInterface + public interface DeltaBackendBuilder<K> + extends BiFunctionWithException< + AbstractKeyedStateBackend<K>, + Collection<ChangelogStateBackendHandle>, + ChangelogKeyedStateBackend<K>, + Exception> {} + + public static <K, T extends StateChangelogHandle> ChangelogKeyedStateBackend<K> restore( + StateChangelogHandleReader<T> changelogHandleReader, + ClassLoader classLoader, + Collection<ChangelogStateBackendHandle> stateHandles, + BaseBackendBuilder<K> baseBackendBuilder, + DeltaBackendBuilder<K> changelogBackendBuilder) + throws Exception { + Collection<KeyedStateHandle> baseState = extractBaseState(stateHandles); + AbstractKeyedStateBackend<K> baseBackend = baseBackendBuilder.apply(baseState); + ChangelogKeyedStateBackend<K> changelogBackend = + changelogBackendBuilder.apply(baseBackend, stateHandles); + + for (ChangelogStateBackendHandle handle : stateHandles) { + if (handle != null) { // null is empty state (no change) + readBackendHandle(changelogBackend, handle, changelogHandleReader, classLoader); + } + } + return changelogBackend; + } + + @SuppressWarnings("unchecked") + private static <T extends StateChangelogHandle> void readBackendHandle( + ChangelogKeyedStateBackend<?> backend, + ChangelogStateBackendHandle backendHandle, + StateChangelogHandleReader<T> changelogHandleReader, + ClassLoader classLoader) + throws Exception { + for (StateChangelogHandle changelogHandle : + backendHandle.getNonMaterializedStateHandles()) { + try (CloseableIterator<StateChange> changes = + changelogHandleReader.getChanges((T) changelogHandle)) { + while (changes.hasNext()) { + ChangelogBackendLogApplier.apply(changes.next(), backend, classLoader); + } + } + } + } + + private static Collection<KeyedStateHandle> extractBaseState( + Collection<ChangelogStateBackendHandle> stateHandles) { + Preconditions.checkNotNull(stateHandles); + return stateHandles.stream() + .filter(Objects::nonNull) + .map(ChangelogStateBackendHandle::getMaterializedStateHandles) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + } + + private ChangelogBackendRestoreOperation() {} +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java new file mode 100644 index 0000000..4714e6c --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * {@link DelegatingFunction Delegating functions} are used to create metadata on recovery when the + * actual function code is not known yet. Once the actual function is known, backend updates the + * delegate which starts receiving the calls. + */ +@SuppressWarnings({"unchecked", "rawtypes"}) +@Internal +public class FunctionDelegationHelper { + private static final Logger LOG = LoggerFactory.getLogger(FunctionDelegationHelper.class); + + public static <T> ReduceFunction<T> delegateReduceFunction() { + return new DelegatingReduceFunction<>(); + } + + public static <IN, ACC, OUT> AggregateFunction<IN, ACC, OUT> delegateAggregateFunction() { + return new DelegatingAggregateFunction<>(); + } + + private interface DelegatingFunction<F> extends Function { + void delegateIfNeeded(F delegated); + } + + private final Map<String, DelegatingFunction> delegatingFunctions = new HashMap<>(); + + public <T, S extends State, F> void addOrUpdate(StateDescriptor<S, T> stateDescriptor) { + F function = tryGetFunction(stateDescriptor); + String name = stateDescriptor.getName(); + if (function instanceof DelegatingFunction) { + LOG.debug("add delegate: {}", name); + delegatingFunctions.putIfAbsent(name, (DelegatingFunction<?>) function); + } else { + DelegatingFunction<F> delegating = delegatingFunctions.get(name); + if (delegating != null) { + LOG.debug("update delegate: {}", name); + checkState(function != null, "unable to extract function for state " + name); + delegating.delegateIfNeeded(function); + } + } + } + + @Nullable + private static <F extends Function> F tryGetFunction(StateDescriptor<?, ?> stateDescriptor) { + if (stateDescriptor instanceof ReducingStateDescriptor) { + return (F) ((ReducingStateDescriptor) stateDescriptor).getReduceFunction(); + } else if (stateDescriptor instanceof AggregatingStateDescriptor) { + return (F) ((AggregatingStateDescriptor) stateDescriptor).getAggregateFunction(); + } else { + return null; + } + } + + static class DelegatingAggregateFunction<IN, ACC, OUT> + implements AggregateFunction<IN, ACC, OUT>, + DelegatingFunction<AggregateFunction<IN, ACC, OUT>> { + private static final long serialVersionUID = 1L; + + @Nullable private AggregateFunction<IN, ACC, OUT> delegated; + + @Override + public void delegateIfNeeded(AggregateFunction<IN, ACC, OUT> delegated) { + if (this.delegated == null) { + this.delegated = checkNotNull(delegated); + } + } + + @Override + public ACC createAccumulator() { + checkNotNull(delegated); + return delegated.createAccumulator(); + } + + @Override + public ACC add(IN value, ACC accumulator) { + checkNotNull(delegated); + return delegated.add(value, accumulator); + } + + @Override + public OUT getResult(ACC accumulator) { + checkNotNull(delegated); + return delegated.getResult(accumulator); + } + + @Override + public ACC merge(ACC a, ACC b) { + checkNotNull(delegated); + return delegated.merge(a, b); + } + } + + static class DelegatingReduceFunction<T> + implements ReduceFunction<T>, DelegatingFunction<ReduceFunction<T>> { + private static final long serialVersionUID = 1L; + + @Nullable private ReduceFunction<T> delegated; + + @Override + public T reduce(T left, T right) throws Exception { + checkNotNull(delegated); + return delegated.reduce(left, right); + } + + @Override + public void delegateIfNeeded(ReduceFunction<T> delegated) { + if (this.delegated == null) { + this.delegated = checkNotNull(delegated); + } + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java new file mode 100644 index 0000000..0fe8b99 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/KvStateChangeApplier.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.state.changelog.StateChangeOperation; + +import java.util.ArrayList; +import java.util.Collection; + +abstract class KvStateChangeApplier<K, N> implements StateChangeApplier { + private final InternalKeyContext<K> keyContext; + + protected abstract InternalKvState<K, N, ?> getState(); + + protected KvStateChangeApplier(InternalKeyContext<K> keyContext) { + this.keyContext = keyContext; + } + + @Override + public void apply(StateChangeOperation operation, DataInputView in) throws Exception { + K key = getState().getKeySerializer().deserialize(in); + keyContext.setCurrentKey(key); + keyContext.setCurrentKeyGroupIndex( + KeyGroupRangeAssignment.assignToKeyGroup(key, keyContext.getNumberOfKeyGroups())); + getState().setCurrentNamespace(getState().getNamespaceSerializer().deserialize(in)); + applyInternal(operation, in); + } + + protected abstract void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception; + + protected static <K, N, T> void applyMergeNamespaces( + InternalMergingState<K, N, T, ?, ?> state, DataInputView in) throws Exception { + N target = state.getNamespaceSerializer().deserialize(in); + int sourcesSize = in.readInt(); + Collection<N> sources = new ArrayList<>(sourcesSize); + for (int i = 0; i < sourcesSize; i++) { + sources.add(state.getNamespaceSerializer().deserialize(in)); + } + state.mergeNamespaces(target, sources); + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java new file mode 100644 index 0000000..1f7b05e --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ListStateChangeApplier.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.state.changelog.StateChangeOperation; + +class ListStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> { + private final InternalListState<K, N, T> state; + + protected ListStateChangeApplier( + InternalKeyContext<K> keyContext, InternalListState<K, N, T> state) { + super(keyContext); + this.state = state; + } + + @Override + protected InternalKvState<K, N, ?> getState() { + return state; + } + + protected void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception { + switch (operation) { + case ADD: + state.addAll(state.getValueSerializer().deserialize(in)); + break; + case ADD_ELEMENT: + state.add( + ((ListSerializer<T>) state.getValueSerializer()) + .getElementSerializer() + .deserialize(in)); + break; + case SET: + state.update(state.getValueSerializer().deserialize(in)); + break; + case SET_INTERNAL: + state.updateInternal(state.getValueSerializer().deserialize(in)); + break; + case MERGE_NS: + applyMergeNamespaces(state, in); + break; + case CLEAR: + state.clear(); + break; + default: + throw new IllegalArgumentException("Unknown state change operation: " + operation); + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java new file mode 100644 index 0000000..b498329 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/MapStateChangeApplier.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.state.changelog.StateChangeOperation; + +class MapStateChangeApplier<K, N, UK, UV> extends KvStateChangeApplier<K, N> { + private final InternalMapState<K, N, UK, UV> mapState; + private final MapSerializer<UK, UV> mapSerializer; + + protected MapStateChangeApplier( + InternalMapState<K, N, UK, UV> mapState, InternalKeyContext<K> keyContext) { + super(keyContext); + this.mapState = mapState; + this.mapSerializer = (MapSerializer<UK, UV>) mapState.getValueSerializer(); + } + + @Override + protected InternalKvState<K, N, ?> getState() { + return mapState; + } + + protected void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception { + switch (operation) { + case ADD: + mapState.putAll(mapSerializer.deserialize(in)); + break; + case ADD_ELEMENT: + case ADD_OR_UPDATE_ELEMENT: + mapState.put( + mapSerializer.getKeySerializer().deserialize(in), + mapSerializer.getValueSerializer().deserialize(in)); + break; + case REMOVE_ELEMENT: + mapState.remove(mapSerializer.getKeySerializer().deserialize(in)); + break; + case CLEAR: + mapState.clear(); + break; + default: + throw new IllegalArgumentException("Unknown state change operation: " + operation); + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java new file mode 100644 index 0000000..b3a5894 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/PriorityQueueStateChangeApplier.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.state.changelog.StateChangeOperation; + +class PriorityQueueStateChangeApplier<T> implements StateChangeApplier { + private final KeyGroupedInternalPriorityQueue<T> queue; + private final TypeSerializer<T> serializer; + + public PriorityQueueStateChangeApplier( + KeyGroupedInternalPriorityQueue<T> queue, TypeSerializer<T> serializer) { + this.queue = queue; + this.serializer = serializer; + } + + @Override + public void apply(StateChangeOperation operation, DataInputView in) throws Exception { + switch (operation) { + case REMOVE_FIRST_ELEMENT: + queue.poll(); + break; + case ADD_ELEMENT: + int numElements = in.readInt(); + for (int i = 0; i < numElements; i++) { + queue.add(serializer.deserialize(in)); + } + break; + case REMOVE_ELEMENT: + queue.remove(serializer.deserialize(in)); + break; + default: + throw new UnsupportedOperationException(operation.name()); + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java new file mode 100644 index 0000000..80aeaf9 --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ReducingStateChangeApplier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalReducingState; +import org.apache.flink.state.changelog.StateChangeOperation; + +class ReducingStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> { + private final InternalReducingState<K, N, T> state; + + protected ReducingStateChangeApplier( + InternalKeyContext<K> keyContext, InternalReducingState<K, N, T> state) { + super(keyContext); + this.state = state; + } + + @Override + protected InternalKvState<K, N, ?> getState() { + return state; + } + + protected void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception { + switch (operation) { + case SET: + case SET_INTERNAL: + state.updateInternal(state.getValueSerializer().deserialize(in)); + break; + case CLEAR: + state.clear(); + break; + case MERGE_NS: + applyMergeNamespaces(state, in); + break; + default: + throw new IllegalArgumentException("Unknown state change operation: " + operation); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java similarity index 53% copy from flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java copy to flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java index 7ae60f0..3b120ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/StateChangeApplier.java @@ -15,34 +15,14 @@ * limitations under the License. */ -package org.apache.flink.runtime.state.changelog; +package org.apache.flink.state.changelog.restore; import org.apache.flink.annotation.Internal; -import org.apache.flink.util.Preconditions; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.state.changelog.StateChangeOperation; -/** Change of state of a keyed operator. Used for generic incremental checkpoints. */ +/** Applies state data change to some state. */ @Internal -public class StateChange { - - private final int keyGroup; - private final byte[] change; - - public StateChange(int keyGroup, byte[] change) { - Preconditions.checkArgument(keyGroup >= 0); - this.keyGroup = keyGroup; - this.change = Preconditions.checkNotNull(change); - } - - @Override - public String toString() { - return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length); - } - - public int getKeyGroup() { - return keyGroup; - } - - public byte[] getChange() { - return change; - } +public interface StateChangeApplier { + void apply(StateChangeOperation operation, DataInputView in) throws Exception; } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java new file mode 100644 index 0000000..c32db7d --- /dev/null +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/ValueStateChangeApplier.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.changelog.restore; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.runtime.state.heap.InternalKeyContext; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.state.changelog.StateChangeOperation; + +class ValueStateChangeApplier<K, N, T> extends KvStateChangeApplier<K, N> { + private final InternalValueState<K, N, T> state; + + protected ValueStateChangeApplier( + InternalKeyContext<K> keyContext, InternalValueState<K, N, T> state) { + super(keyContext); + this.state = state; + } + + @Override + protected InternalKvState<K, N, ?> getState() { + return state; + } + + protected void applyInternal(StateChangeOperation operation, DataInputView in) + throws Exception { + switch (operation) { + case SET: + state.update(state.getValueSerializer().deserialize(in)); + break; + case CLEAR: + state.clear(); + break; + default: + throw new IllegalArgumentException("Unknown state change operation: " + operation); + } + } +} diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java index 950546c..9a7420b 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogListStateTest.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.heap.InternalKeyContextImpl; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; @@ -202,7 +204,10 @@ public class ChangelogListStateTest { private static ChangelogListState createState(List<String> data, TestChangeLoggerKv logger) { ChangelogListState state = - new ChangelogListState<>(new TestingInternalListState(data), logger); + new ChangelogListState<>( + new TestingInternalListState(data), + logger, + new InternalKeyContextImpl<>(KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 0)); state.setCurrentNamespace("ns0"); return state; } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java index c6bf06d..f28befc 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMapStateTest.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.heap.InternalKeyContextImpl; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; @@ -246,7 +248,10 @@ public class ChangelogMapStateTest { private static ChangelogMapState createState( Map<String, String> data, TestChangeLoggerKv logger) { ChangelogMapState state = - new ChangelogMapState<>(new TestingInternalMapState(data), logger); + new ChangelogMapState<>( + new TestingInternalMapState(data), + logger, + new InternalKeyContextImpl<>(KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 0)); state.setCurrentNamespace("ns0"); return state; } diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java index bfea119..c52a424 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java @@ -21,14 +21,13 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.heap.InternalKeyContextImpl; -import org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation; import java.io.IOException; import java.util.Collections; import java.util.Optional; import static org.apache.flink.api.common.state.StateDescriptor.Type.VALUE; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS; +import static org.apache.flink.state.changelog.StateChangeOperation.MERGE_NS; /** {@link KvStateChangeLoggerImpl} test. */ public class KvStateChangeLoggerImplTest extends StateChangeLoggerTestBase<String> { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java index b6bf280..672682d 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java @@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo; import org.apache.flink.runtime.state.heap.InternalKeyContextImpl; -import org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation; import java.io.IOException; import java.util.Optional; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_FIRST_ELEMENT; +import static org.apache.flink.state.changelog.StateChangeOperation.REMOVE_FIRST_ELEMENT; /** {@link PriorityQueueStateChangeLoggerImpl} test. */ public class PriorityQueueStateChangeLoggerImplTest extends StateChangeLoggerTestBase<Void> { diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java index 1d2572c..ae87745 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java @@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChangelogWriter; import org.apache.flink.runtime.state.heap.InternalKeyContextImpl; -import org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation; import org.junit.Test; @@ -33,7 +32,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA; +import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; import static org.junit.Assert.assertEquals; abstract class StateChangeLoggerTestBase<Namespace> {
