This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new dbfc83d1cfd [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 (#25515) dbfc83d1cfd is described below commit dbfc83d1cfd16bb464fe8d2329b4b13cd440820d Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Wed Oct 30 11:35:47 2024 +0800 [FLINK-35268][state] Add ttl interface for Async State API && implement TtlListStateV2/TtlValueStateV2 (#25515) --- .../asyncprocessing/AbstractStateIterator.java | 5 +- .../runtime/asyncprocessing/StateRequestType.java | 6 - .../runtime/state/AsyncKeyedStateBackend.java | 19 + .../runtime/state/ttl/AbstractTtlDecorator.java | 41 +- .../flink/runtime/state/ttl/TtlReduceFunction.java | 4 +- .../flink/runtime/state/ttl/TtlStateContext.java | 14 +- .../apache/flink/runtime/state/ttl/TtlUtils.java | 9 +- .../runtime/state/v2/AbstractAggregatingState.java | 55 +-- .../runtime/state/v2/AbstractReducingState.java | 52 ++- .../flink/runtime/state/v2/MapStateDescriptor.java | 8 + .../state/v2/adaptor/AggregatingStateAdaptor.java | 41 ++ .../v2/adaptor/AsyncKeyedStateBackendAdaptor.java | 12 + .../state/v2/adaptor/CompleteStateIterator.java | 2 +- .../state/v2/adaptor/ReducingStateAdaptor.java | 39 +- .../v2/internal/InternalAggregatingState.java | 4 +- .../state/v2/internal/InternalAppendingState.java | 31 +- .../runtime/state/v2/ttl/AbstractTtlState.java | 57 +++ .../runtime/state/v2/ttl/TtlAggregateFunction.java | 84 ++++ .../runtime/state/v2/ttl/TtlAggregatingState.java | 104 +++++ .../flink/runtime/state/v2/ttl/TtlListState.java | 219 +++++++++++ .../flink/runtime/state/v2/ttl/TtlMapState.java | 281 +++++++++++++ .../runtime/state/v2/ttl/TtlReducingState.java | 94 +++++ .../runtime/state/v2/ttl/TtlStateFactory.java | 433 +++++++++++++++++++++ .../flink/runtime/state/v2/ttl/TtlValueState.java | 62 +++ .../flink/runtime/state/StateBackendTestUtils.java | 11 + .../state/v2/AbstractAggregatingStateTest.java | 12 +- .../state/v2/AbstractKeyedStateTestBase.java | 11 + .../state/v2/AbstractReducingStateTest.java | 14 +- .../runtime/state/v2/StateBackendTestV2Base.java | 62 +++ .../flink/state/forst/ForStAggregatingState.java | 5 +- .../forst/ForStDBTtlCompactFiltersManager.java | 24 +- .../flink/state/forst/ForStKeyedStateBackend.java | 34 +- .../state/forst/ForStKeyedStateBackendBuilder.java | 14 +- .../flink/state/forst/ForStOperationUtils.java | 31 +- .../flink/state/forst/ForStReducingState.java | 4 +- .../flink/state/forst/ForStStateBackend.java | 1 + .../completeness/TypeInfoTestCoverageTest.java | 4 +- .../TypeSerializerTestCoverageTest.java | 2 + 38 files changed, 1800 insertions(+), 105 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java index 12ffdaae427..7683fc6a482 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java @@ -106,7 +106,10 @@ public abstract class AbstractStateIterator<T> implements StateIterator<T> { Collection<StateFuture<? extends U>> resultFutures = new ArrayList<>(); for (T item : cache) { - resultFutures.add(iterating.apply(item)); + StateFuture<? extends U> resultFuture = iterating.apply(item); + if (resultFuture != null) { + resultFutures.add(resultFuture); + } } if (hasNext()) { return StateFutureUtils.combineAll(resultFutures) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java index 8d449692105..504115a48fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java @@ -108,15 +108,9 @@ public enum StateRequestType { /** Add element into reducing state, {@link ReducingState#asyncAdd(Object)}. */ REDUCING_ADD, - /** Remove element from reducing state. */ - REDUCING_REMOVE, - /** Get value from aggregating state by {@link AggregatingState#asyncGet()}. */ AGGREGATING_GET, - /** Remove element from aggregate state. */ - AGGREGATING_REMOVE, - /** Add element to aggregating state by {@link AggregatingState#asyncAdd(Object)}. */ AGGREGATING_ADD } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java index aebd8af24dc..d6e606529aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsyncKeyedStateBackend.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.asyncprocessing.StateExecutor; import org.apache.flink.runtime.asyncprocessing.StateRequestHandler; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import org.apache.flink.util.Disposable; import javax.annotation.Nonnull; @@ -72,6 +73,24 @@ public interface AsyncKeyedStateBackend<K> @Nonnull StateDescriptor<SV> stateDesc) throws Exception; + /** + * Creates and returns a new state for internal usage. + * + * @param <N> the type of namespace for partitioning. + * @param <S> The type of the public API state. + * @param <SV> The type of the stored state value. + * @param defaultNamespace the default namespace for this state. + * @param namespaceSerializer the serializer for namespace. + * @param stateDesc The {@code StateDescriptor} that contains the name of the state. + * @throws Exception Exceptions may occur during initialization of the state. + */ + @Nonnull + <N, S extends InternalKeyedState, SV> S createStateInternal( + @Nonnull N defaultNamespace, + @Nonnull TypeSerializer<N> namespaceSerializer, + @Nonnull StateDescriptor<SV> stateDesc) + throws Exception; + /** * Creates a {@code StateExecutor} which supports to execute a batch of state requests * asynchronously. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java index cd08292ec9c..ac42b72d009 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java @@ -29,24 +29,25 @@ import org.apache.flink.util.function.ThrowingRunnable; * * @param <T> Type of originally wrapped object */ -abstract class AbstractTtlDecorator<T> { +public abstract class AbstractTtlDecorator<T> { /** Wrapped original state handler. */ - final T original; + protected final T original; - final StateTtlConfig config; + protected final StateTtlConfig config; - final TtlTimeProvider timeProvider; + protected final TtlTimeProvider timeProvider; /** Whether to renew expiration timestamp on state read access. */ - final boolean updateTsOnRead; + protected final boolean updateTsOnRead; /** Whether to renew expiration timestamp on state read access. */ - final boolean returnExpired; + protected final boolean returnExpired; /** State value time to live in milliseconds. */ - final long ttl; + protected final long ttl; - AbstractTtlDecorator(T original, StateTtlConfig config, TtlTimeProvider timeProvider) { + protected AbstractTtlDecorator( + T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); @@ -60,25 +61,25 @@ abstract class AbstractTtlDecorator<T> { this.ttl = config.getTimeToLive().toMillis(); } - <V> V getUnexpired(TtlValue<V> ttlValue) { + public <V> V getUnexpired(TtlValue<V> ttlValue) { return ttlValue == null || (!returnExpired && expired(ttlValue)) ? null : ttlValue.getUserValue(); } - <V> boolean expired(TtlValue<V> ttlValue) { + public <V> boolean expired(TtlValue<V> ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); } - <V> TtlValue<V> wrapWithTs(V value) { + public <V> TtlValue<V> wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); } - <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) { + public <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) { return wrapWithTs(ttlValue.getUserValue()); } - <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> + public <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, @@ -88,7 +89,7 @@ abstract class AbstractTtlDecorator<T> { return ttlValue == null ? null : ttlValue.getUserValue(); } - <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> + public <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, @@ -107,4 +108,16 @@ abstract class AbstractTtlDecorator<T> { } return ttlValue; } + + protected <T> T getElementWithTtlCheck(TtlValue<T> ttlValue) { + if (ttlValue == null) { + return null; + } else if (expired(ttlValue)) { + // don't clear state here cause forst is LSM-tree based. + if (!returnExpired) { + return null; + } + } + return ttlValue.getUserValue(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java index 3d4ef4cc62e..eb30f0c70f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReduceFunction.java @@ -26,10 +26,10 @@ import org.apache.flink.api.common.state.StateTtlConfig; * * @param <T> Type of the user value of state with TTL */ -class TtlReduceFunction<T> extends AbstractTtlDecorator<ReduceFunction<T>> +public class TtlReduceFunction<T> extends AbstractTtlDecorator<ReduceFunction<T>> implements ReduceFunction<TtlValue<T>> { - TtlReduceFunction( + public TtlReduceFunction( ReduceFunction<T> originalReduceFunction, StateTtlConfig config, TtlTimeProvider timeProvider) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java index c6f629c326c..994b68c9fa7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateContext.java @@ -21,20 +21,20 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -class TtlStateContext<T, SV> { +public class TtlStateContext<T, SV> { /** Wrapped original state handler. */ - final T original; + public final T original; - final StateTtlConfig config; - final TtlTimeProvider timeProvider; + public final StateTtlConfig config; + public final TtlTimeProvider timeProvider; /** Serializer of original user stored value without timestamp. */ - final TypeSerializer<SV> valueSerializer; + public final TypeSerializer<SV> valueSerializer; /** This registered callback is to be called whenever state is accessed for read or write. */ - final Runnable accessCallback; + public final Runnable accessCallback; - TtlStateContext( + public TtlStateContext( T original, StateTtlConfig config, TtlTimeProvider timeProvider, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java index dbe937647ca..e4ce8927eb9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java @@ -22,17 +22,18 @@ import javax.annotation.Nullable; /** Common functions related to State TTL. */ public class TtlUtils { - static <V> boolean expired( + public static <V> boolean expired( @Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } - static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) { + public static <V> boolean expired( + @Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } - static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) { + public static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) { return expired(ts, ttl, timeProvider.currentTimestamp()); } @@ -45,7 +46,7 @@ public class TtlUtils { return ts + ttlWithoutOverflow; } - static <V> TtlValue<V> wrapWithTs(V value, long ts) { + public static <V> TtlValue<V> wrapWithTs(V value, long ts) { return new TtlValue<>(value, ts); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java index 899aeec1c48..95850d928ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractAggregatingState.java @@ -58,49 +58,53 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS this.aggregateFunction = stateDescriptor.getAggregateFunction(); } - protected StateFuture<ACC> asyncGetAccumulator() { - return handleRequest(StateRequestType.AGGREGATING_GET, null); - } - @Override public StateFuture<OUT> asyncGet() { - return asyncGetAccumulator() + return asyncGetInternal() .thenApply(acc -> (acc == null) ? null : this.aggregateFunction.getResult(acc)); } @Override public StateFuture<Void> asyncAdd(IN value) { - return asyncGetAccumulator() + return asyncGetInternal() .thenAccept( acc -> { final ACC safeAcc = (acc == null) ? this.aggregateFunction.createAccumulator() : acc; - handleRequest( - StateRequestType.AGGREGATING_ADD, - this.aggregateFunction.add(value, safeAcc)); + asyncUpdateInternal(this.aggregateFunction.add(value, safeAcc)); }); } + @Override + public StateFuture<ACC> asyncGetInternal() { + return handleRequest(StateRequestType.AGGREGATING_GET, null); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) { + return handleRequest(StateRequestType.AGGREGATING_ADD, valueToStore); + } + @Override public OUT get() { - return handleRequestSync(StateRequestType.AGGREGATING_GET, null); + ACC acc = getInternal(); + return acc == null ? null : this.aggregateFunction.getResult(acc); } @Override public void add(IN value) { - ACC acc = handleRequestSync(StateRequestType.AGGREGATING_GET, null); + ACC acc = getInternal(); try { ACC newValue = acc == null ? this.aggregateFunction.createAccumulator() : this.aggregateFunction.add(value, acc); - handleRequestSync(StateRequestType.AGGREGATING_ADD, newValue); + updateInternal(newValue); } catch (Exception e) { throw new RuntimeException(e); } - handleRequestSync(StateRequestType.AGGREGATING_ADD, value); } @Override @@ -113,16 +117,16 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS for (N source : sources) { if (source != null) { setCurrentNamespace(source); - futures.add(handleRequest(StateRequestType.AGGREGATING_GET, null)); + futures.add(asyncGetInternal()); } } setCurrentNamespace(target); - futures.add(handleRequest(StateRequestType.AGGREGATING_GET, null)); + futures.add(asyncGetInternal()); // phase 2: merge the sources to the target return StateFutureUtils.combineAll(futures) .thenCompose( values -> { - List<StateFuture<ACC>> updateFutures = + List<StateFuture<Void>> updateFutures = new ArrayList<>(sources.size() + 1); ACC current = null; Iterator<ACC> valueIterator = values.iterator(); @@ -130,9 +134,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS ACC value = valueIterator.next(); if (value != null) { setCurrentNamespace(source); - updateFutures.add( - handleRequest( - StateRequestType.AGGREGATING_REMOVE, null)); + updateFutures.add(asyncUpdateInternal(null)); if (current == null) { current = value; } else { @@ -146,8 +148,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS current = aggregateFunction.merge(current, targetValue); } setCurrentNamespace(target); - updateFutures.add( - handleRequest(StateRequestType.AGGREGATING_ADD, current)); + updateFutures.add(asyncUpdateInternal(current)); } return StateFutureUtils.combineAll(updateFutures) .thenAccept(ignores -> {}); @@ -168,7 +169,7 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS ACC oldValue = handleRequestSync(StateRequestType.AGGREGATING_GET, null); if (oldValue != null) { - handleRequestSync(StateRequestType.AGGREGATING_REMOVE, null); + handleRequestSync(StateRequestType.AGGREGATING_ADD, null); if (current != null) { current = aggregateFunction.merge(current, oldValue); @@ -194,4 +195,14 @@ public class AbstractAggregatingState<K, N, IN, ACC, OUT> extends AbstractKeyedS throw new RuntimeException("merge namespace fail.", e); } } + + @Override + public ACC getInternal() { + return handleRequestSync(StateRequestType.AGGREGATING_GET, null); + } + + @Override + public void updateInternal(ACC valueToStore) { + handleRequestSync(StateRequestType.AGGREGATING_ADD, valueToStore); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java index de7a19fd157..4fb7967bd6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/AbstractReducingState.java @@ -50,33 +50,33 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> @Override public StateFuture<V> asyncGet() { - return handleRequest(StateRequestType.REDUCING_GET, null); + return asyncGetInternal(); } @Override public StateFuture<Void> asyncAdd(V value) { - return handleRequest(StateRequestType.REDUCING_GET, null) + return asyncGetInternal() .thenAccept( oldValue -> { V newValue = oldValue == null ? value : reduceFunction.reduce((V) oldValue, value); - handleRequest(StateRequestType.REDUCING_ADD, newValue); + asyncUpdateInternal(newValue); }); } @Override public V get() { - return handleRequestSync(StateRequestType.REDUCING_GET, null); + return getInternal(); } @Override public void add(V value) { - V oldValue = handleRequestSync(StateRequestType.REDUCING_GET, null); + V oldValue = getInternal(); try { V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, value); - handleRequestSync(StateRequestType.REDUCING_ADD, newValue); + updateInternal(newValue); } catch (Exception e) { throw new RuntimeException(e); } @@ -92,16 +92,16 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> for (N source : sources) { if (source != null) { setCurrentNamespace(source); - futures.add(handleRequest(StateRequestType.REDUCING_GET, null)); + futures.add(asyncGetInternal()); } } setCurrentNamespace(target); - futures.add(handleRequest(StateRequestType.REDUCING_GET, null)); + futures.add(asyncGetInternal()); // phase 2: merge the sources to the target return StateFutureUtils.combineAll(futures) .thenCompose( values -> { - List<StateFuture<V>> updateFutures = + List<StateFuture<Void>> updateFutures = new ArrayList<>(sources.size() + 1); V current = null; Iterator<V> valueIterator = values.iterator(); @@ -109,8 +109,7 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> V value = valueIterator.next(); if (value != null) { setCurrentNamespace(source); - updateFutures.add( - handleRequest(StateRequestType.REDUCING_REMOVE, null)); + updateFutures.add(asyncUpdateInternal(null)); if (current != null) { current = reduceFunction.reduce(current, value); } else { @@ -124,8 +123,7 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> current = reduceFunction.reduce(current, targetValue); } setCurrentNamespace(target); - updateFutures.add( - handleRequest(StateRequestType.REDUCING_ADD, current)); + updateFutures.add(asyncUpdateInternal(current)); } return StateFutureUtils.combineAll(updateFutures) .thenAccept(ignores -> {}); @@ -143,10 +141,10 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> for (N source : sources) { if (source != null) { setCurrentNamespace(source); - V oldValue = handleRequestSync(StateRequestType.REDUCING_GET, null); + V oldValue = getInternal(); if (oldValue != null) { - handleRequestSync(StateRequestType.REDUCING_REMOVE, null); + updateInternal(null); if (current != null) { current = reduceFunction.reduce(current, oldValue); @@ -161,15 +159,35 @@ public class AbstractReducingState<K, N, V> extends AbstractKeyedState<K, N, V> if (current != null) { // create the target full-binary-key setCurrentNamespace(target); - V targetValue = handleRequestSync(StateRequestType.REDUCING_GET, null); + V targetValue = getInternal(); if (targetValue != null) { current = reduceFunction.reduce(current, targetValue); } - handleRequestSync(StateRequestType.REDUCING_ADD, current); + updateInternal(current); } } catch (Exception e) { throw new RuntimeException("merge namespace fail.", e); } } + + @Override + public StateFuture<V> asyncGetInternal() { + return handleRequest(StateRequestType.REDUCING_GET, null); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(V valueToStore) { + return handleRequest(StateRequestType.REDUCING_ADD, valueToStore); + } + + @Override + public V getInternal() { + return handleRequestSync(StateRequestType.REDUCING_GET, null); + } + + @Override + public void updateInternal(V valueToStore) { + handleRequestSync(StateRequestType.REDUCING_ADD, valueToStore); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java index bae29db5502..64138e340dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/MapStateDescriptor.java @@ -34,6 +34,8 @@ import javax.annotation.Nonnull; * @param <UV> The type of the values that the map state can hold. */ public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> { + /** The type of the user key in the state. */ + @Nonnull private final TypeInformation<UK> userKeyTypeInfo; /** The serializer for the user key. */ @Nonnull private final TypeSerializer<UK> userKeySerializer; @@ -67,9 +69,15 @@ public class MapStateDescriptor<UK, UV> extends StateDescriptor<UV> { TypeInformation<UV> userValueTypeInfo, SerializerConfig serializerConfig) { super(stateId, userValueTypeInfo, serializerConfig); + this.userKeyTypeInfo = userKeyTypeInfo; this.userKeySerializer = userKeyTypeInfo.createSerializer(serializerConfig); } + @Nonnull + public TypeInformation<UK> getUserKeyType() { + return userKeyTypeInfo; + } + @Nonnull public TypeSerializer<UK> getUserKeySerializer() { return userKeySerializer.duplicate(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java index 1b7c90be71b..08c158e08e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AggregatingStateAdaptor.java @@ -45,4 +45,45 @@ public class AggregatingStateAdaptor<K, N, IN, ACC, OUT> throw new RuntimeException("Error while get value from raw AggregatingState", e); } } + + @Override + public StateFuture<ACC> asyncGetInternal() { + try { + return StateFutureUtils.completedFuture(delegatedState.getInternal()); + } catch (Exception e) { + throw new RuntimeException( + "Error while get internal value from raw AggregatingState", e); + } + } + + @Override + public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) { + try { + delegatedState.updateInternal(valueToStore); + return StateFutureUtils.completedVoidFuture(); + } catch (Exception e) { + throw new RuntimeException( + "Error while update internal value to raw AggregatingState", e); + } + } + + @Override + public ACC getInternal() { + try { + return delegatedState.getInternal(); + } catch (Exception e) { + throw new RuntimeException( + "Error while get internal value from raw AggregatingState", e); + } + } + + @Override + public void updateInternal(ACC valueToStore) { + try { + delegatedState.updateInternal(valueToStore); + } catch (Exception e) { + throw new RuntimeException( + "Error while update internal value to raw AggregatingState", e); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java index bf48415801f..28c6993c9c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/AsyncKeyedStateBackendAdaptor.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.StateDescriptorUtils; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import javax.annotation.Nonnull; @@ -75,6 +76,17 @@ public class AsyncKeyedStateBackendAdaptor<K> implements AsyncKeyedStateBackend< @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception { + return createStateInternal(defaultNamespace, namespaceSerializer, stateDesc); + } + + @Nonnull + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public <N, S extends InternalKeyedState, SV> S createStateInternal( + @Nonnull N defaultNamespace, + @Nonnull TypeSerializer<N> namespaceSerializer, + @Nonnull StateDescriptor<SV> stateDesc) + throws Exception { org.apache.flink.api.common.state.StateDescriptor rawStateDesc = StateDescriptorUtils.transformFromV2ToV1(stateDesc); org.apache.flink.api.common.state.State rawState = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java index fc121a70da7..bf2f3f23a09 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/CompleteStateIterator.java @@ -35,7 +35,7 @@ public class CompleteStateIterator<T> implements StateIterator<T> { final Iterator<T> iterator; final boolean empty; - CompleteStateIterator(Iterable<T> iterable) { + public CompleteStateIterator(Iterable<T> iterable) { this.iterator = iterable.iterator(); this.empty = !iterator.hasNext(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java index 276cef760ff..37a0f4d98b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/adaptor/ReducingStateAdaptor.java @@ -39,7 +39,44 @@ public class ReducingStateAdaptor<K, N, V> extends MergingStateAdaptor<K, N, V, try { return StateFutureUtils.completedFuture(delegatedState.get()); } catch (Exception e) { - throw new RuntimeException("Error while get value from raw AggregatingState", e); + throw new RuntimeException("Error while get value from raw ReducingState", e); + } + } + + @Override + public StateFuture<V> asyncGetInternal() { + try { + return StateFutureUtils.completedFuture(delegatedState.getInternal()); + } catch (Exception e) { + throw new RuntimeException("Error while get value from raw ReducingState", e); + } + } + + @Override + public StateFuture<Void> asyncUpdateInternal(V valueToStore) { + try { + delegatedState.updateInternal(valueToStore); + return StateFutureUtils.completedVoidFuture(); + } catch (Exception e) { + throw new RuntimeException("Error while update value to raw ReducingState", e); + } + } + + @Override + public V getInternal() { + try { + return delegatedState.getInternal(); + } catch (Exception e) { + throw new RuntimeException("Error while get internal value from raw ReducingState", e); + } + } + + @Override + public void updateInternal(V valueToStore) { + try { + delegatedState.updateInternal(valueToStore); + } catch (Exception e) { + throw new RuntimeException("Error while update internal value to raw ReducingState", e); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java index 8b5d7b2dfa2..bc469825d85 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAggregatingState.java @@ -29,4 +29,6 @@ import org.apache.flink.api.common.state.v2.AggregatingState; * @param <OUT> The type of the values that are returned from the state. */ public interface InternalAggregatingState<K, N, IN, ACC, OUT> - extends InternalMergingState<K, N, IN, ACC, OUT, OUT>, AggregatingState<IN, OUT> {} + extends InternalMergingState<K, N, IN, ACC, OUT, OUT>, + AggregatingState<IN, OUT>, + InternalKeyedState<K, N, ACC> {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java index 6b9b323d8fe..7e938c6fbe9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/internal/InternalAppendingState.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.v2.internal; import org.apache.flink.api.common.state.v2.AppendingState; +import org.apache.flink.api.common.state.v2.StateFuture; /** * This class defines the internal interface for appending state. @@ -30,4 +31,32 @@ import org.apache.flink.api.common.state.v2.AppendingState; * @param <SYNCOUT> Type of the value that can be retrieved from the state by synchronous interface. */ public interface InternalAppendingState<K, N, IN, SV, OUT, SYNCOUT> - extends InternalKeyedState<K, N, SV>, AppendingState<IN, OUT, SYNCOUT> {} + extends InternalKeyedState<K, N, SV>, AppendingState<IN, OUT, SYNCOUT> { + /** + * Get internally stored value. + * + * @return internally stored value. + */ + StateFuture<SV> asyncGetInternal(); + + /** + * Update internally stored value. + * + * @param valueToStore new value to store. + */ + StateFuture<Void> asyncUpdateInternal(SV valueToStore); + + /** + * Get internally stored value. + * + * @return internally stored value. + */ + SV getInternal(); + + /** + * Update internally stored value. + * + * @param valueToStore new value to store. + */ + void updateInternal(SV valueToStore); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java new file mode 100644 index 00000000000..5c06a739844 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/AbstractTtlState.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; + +/** + * Base class for TTL logic wrappers of state objects. state V2 does not support + * FULL_STATE_SCAN_SNAPSHOT and INCREMENTAL_CLEANUP, only supports ROCKSDB_COMPACTION_FILTER. + * UpdateType#OnReadAndWrite is also not supported in state V2. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <SV> The type of values kept internally in state without TTL + * @param <TTLSV> The type of values kept internally in state with TTL + * @param <S> Type of originally wrapped state object + */ +abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKeyedState<K, N, TTLSV>> + extends AbstractTtlDecorator<S> implements InternalKeyedState<K, N, SV> { + /** This registered callback is to be called whenever state is accessed for read or write. */ + protected AbstractTtlState(TtlStateContext<S, SV> ttlStateContext) { + super(ttlStateContext.original, ttlStateContext.config, ttlStateContext.timeProvider); + } + + @Override + public StateFuture<Void> asyncClear() { + return original.asyncClear(); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public void setCurrentNamespace(N namespace) { + original.setCurrentNamespace(namespace); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java new file mode 100644 index 00000000000..45f235758e5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregateFunction.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.runtime.state.ttl.AbstractTtlDecorator; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.util.FlinkRuntimeException; + +/** + * This class wraps aggregating function with TTL logic. + * + * @param <IN> The type of the values that are aggregated (input values) + * @param <ACC> The type of the accumulator (intermediate aggregate state). + * @param <OUT> The type of the aggregated result + */ +public class TtlAggregateFunction<IN, ACC, OUT> + extends AbstractTtlDecorator<AggregateFunction<IN, ACC, OUT>> + implements AggregateFunction<IN, TtlValue<ACC>, OUT> { + + public TtlAggregateFunction( + AggregateFunction<IN, ACC, OUT> aggFunction, + StateTtlConfig config, + TtlTimeProvider timeProvider) { + super(aggFunction, config, timeProvider); + } + + @Override + public TtlValue<ACC> createAccumulator() { + return wrapWithTs(original.createAccumulator()); + } + + @Override + public TtlValue<ACC> add(IN value, TtlValue<ACC> accumulator) { + ACC userAcc = getUnexpired(accumulator); + userAcc = userAcc == null ? original.createAccumulator() : userAcc; + return wrapWithTs(original.add(value, userAcc)); + } + + @Override + public OUT getResult(TtlValue<ACC> accumulator) { + ACC userAcc; + try { + userAcc = getElementWithTtlCheck(accumulator); + } catch (Exception e) { + throw new FlinkRuntimeException( + "Failed to retrieve original internal aggregating state", e); + } + return userAcc == null ? null : original.getResult(userAcc); + } + + @Override + public TtlValue<ACC> merge(TtlValue<ACC> a, TtlValue<ACC> b) { + ACC userA = getUnexpired(a); + ACC userB = getUnexpired(b); + if (userA != null && userB != null) { + return wrapWithTs(original.merge(userA, userB)); + } else if (userA != null) { + return rewrapWithNewTs(a); + } else if (userB != null) { + return rewrapWithNewTs(b); + } else { + return null; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java new file mode 100644 index 00000000000..78c88cc4dfe --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlAggregatingState.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalAggregatingState; + +import java.util.Collection; + +/** + * This class wraps aggregating state with TTL logic. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <IN> Type of the value added to the state + * @param <ACC> The type of the accumulator (intermediate aggregate state). + * @param <OUT> Type of the value extracted from the state + */ +class TtlAggregatingState<K, N, IN, ACC, OUT> + extends AbstractTtlState< + K, N, ACC, TtlValue<ACC>, InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>> + implements InternalAggregatingState<K, N, IN, ACC, OUT> { + + TtlAggregatingState( + TtlStateContext<InternalAggregatingState<K, N, IN, TtlValue<ACC>, OUT>, ACC> + ttlStateContext, + TtlAggregateFunction<IN, ACC, OUT> aggregateFunction) { + super(ttlStateContext); + } + + @Override + public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) { + return original.asyncMergeNamespaces(target, sources); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { + original.mergeNamespaces(target, sources); + } + + @Override + public StateFuture<OUT> asyncGet() { + return original.asyncGet(); + } + + @Override + public StateFuture<Void> asyncAdd(IN value) { + return original.asyncAdd(value); + } + + @Override + public OUT get() { + return original.get(); + } + + @Override + public void add(IN value) { + original.add(value); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public StateFuture<ACC> asyncGetInternal() { + return original.asyncGetInternal().thenApply(ttlValue -> getElementWithTtlCheck(ttlValue)); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(ACC valueToStore) { + return original.asyncUpdateInternal(wrapWithTs(valueToStore)); + } + + @Override + public ACC getInternal() { + TtlValue<ACC> ttlValue = original.getInternal(); + return getElementWithTtlCheck(ttlValue); + } + + @Override + public void updateInternal(ACC valueToStore) { + original.updateInternal(wrapWithTs(valueToStore)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java new file mode 100644 index 00000000000..7cd52632866 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlListState.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlUtils; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalListState; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * This class wraps list state with TTL logic. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <T> Type of the user entry value of state with TTL + */ +class TtlListState<K, N, T> + extends AbstractTtlState<K, N, T, TtlValue<T>, InternalListState<K, N, TtlValue<T>>> + implements InternalListState<K, N, T> { + + protected TtlListState( + TtlStateContext<InternalListState<K, N, TtlValue<T>>, T> ttlStateContext) { + super(ttlStateContext); + } + + @Override + public StateFuture<Void> asyncUpdate(List<T> values) { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + return original.asyncUpdate(withTs(values)); + } + + @Override + public StateFuture<Void> asyncAddAll(List<T> values) { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + return original.asyncAddAll(withTs(values)); + } + + @Override + public StateFuture<StateIterator<T>> asyncGet() { + // 1. The timestamp of elements in list state isn't updated when get even if updateTsOnRead + // is true. + // 2. we don't clear state here cause forst is LSM-tree based. + return original.asyncGet().thenApply(stateIter -> new AsyncIteratorWrapper(stateIter)); + } + + @Override + public StateFuture<Void> asyncAdd(T value) { + return original.asyncAdd(value == null ? null : wrapWithTs(value)); + } + + @Override + public Iterable<T> get() { + Iterable<TtlValue<T>> ttlValue = original.get(); + ttlValue = ttlValue == null ? Collections.emptyList() : ttlValue; + final Iterable<TtlValue<T>> finalResult = ttlValue; + return () -> new IteratorWithCleanup(finalResult.iterator()); + } + + @Override + public void add(T value) { + original.add(value == null ? null : wrapWithTs(value)); + } + + @Override + public void update(List<T> values) { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.update(withTs(values)); + } + + @Override + public void addAll(List<T> values) { + Preconditions.checkNotNull(values, "List of values to add cannot be null."); + original.addAll(withTs(values)); + } + + private <E> List<E> collect(Iterable<E> iterable) { + if (iterable instanceof List) { + return (List<E>) iterable; + } else { + List<E> list = new ArrayList<>(); + for (E element : iterable) { + list.add(element); + } + return list; + } + } + + private List<TtlValue<T>> withTs(List<T> values) { + long currentTimestamp = timeProvider.currentTimestamp(); + List<TtlValue<T>> withTs = new ArrayList<>(values.size()); + for (T value : values) { + Preconditions.checkNotNull(value, "You cannot have null element in a ListState."); + withTs.add(TtlUtils.wrapWithTs(value, currentTimestamp)); + } + return withTs; + } + + private class IteratorWithCleanup implements Iterator<T> { + private final Iterator<TtlValue<T>> originalIterator; + private boolean anyUnexpired = false; + private boolean uncleared = true; + private T nextUnexpired = null; + + private IteratorWithCleanup(Iterator<TtlValue<T>> ttlIterator) { + this.originalIterator = ttlIterator; + } + + @Override + public boolean hasNext() { + findNextUnexpired(); + cleanupIfEmpty(); + return nextUnexpired != null; + } + + private void cleanupIfEmpty() { + boolean endOfIter = !originalIterator.hasNext() && nextUnexpired == null; + if (uncleared && !anyUnexpired && endOfIter) { + original.clear(); + uncleared = false; + } + } + + @Override + public T next() { + if (hasNext()) { + T result = nextUnexpired; + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + // Once a null element is encountered, the subsequent elements will no longer be returned. + private void findNextUnexpired() { + while (nextUnexpired == null && originalIterator.hasNext()) { + TtlValue<T> ttlValue = originalIterator.next(); + if (ttlValue == null) { + break; + } + boolean unexpired = !expired(ttlValue); + if (unexpired) { + anyUnexpired = true; + } + if (unexpired || returnExpired) { + nextUnexpired = ttlValue.getUserValue(); + } + } + } + } + + private class AsyncIteratorWrapper implements StateIterator<T> { + + private final StateIterator<TtlValue<T>> originalIterator; + + public AsyncIteratorWrapper(StateIterator<TtlValue<T>> originalIterator) { + this.originalIterator = originalIterator; + } + + @Override + public <U> StateFuture<Collection<U>> onNext( + Function<T, StateFuture<? extends U>> iterating) { + Function<TtlValue<T>, StateFuture<? extends U>> ttlIterating = + (item) -> { + T element = getElementWithTtlCheck(item); + if (element != null) { + return iterating.apply(element); + } else { + return null; + } + }; + return originalIterator.onNext(ttlIterating); + } + + @Override + public StateFuture<Void> onNext(Consumer<T> iterating) { + Consumer<TtlValue<T>> ttlIterating = + (item) -> { + T element = getElementWithTtlCheck(item); + if (element != null) { + iterating.accept(element); + } + }; + return originalIterator.onNext(ttlIterating); + } + + @Override + public boolean isEmpty() { + return originalIterator.isEmpty(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java new file mode 100644 index 00000000000..15b5e751aea --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlMapState.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlUtils; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalMapState; + +import javax.annotation.Nonnull; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * This class wraps map state with TTL logic. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <UK> Type of the user entry key of state with TTL + * @param <UV> Type of the user entry value of state with TTL + */ +class TtlMapState<K, N, UK, UV> + extends AbstractTtlState<K, N, UV, TtlValue<UV>, InternalMapState<K, N, UK, TtlValue<UV>>> + implements InternalMapState<K, N, UK, UV> { + + protected TtlMapState( + TtlStateContext<InternalMapState<K, N, UK, TtlValue<UV>>, UV> ttlStateContext) { + super(ttlStateContext); + } + + @Override + public void clear() { + original.clear(); + } + + @Override + public StateFuture<UV> asyncGet(UK key) { + return original.asyncGet(key).thenApply(ttlValue -> getElementWithTtlCheck(ttlValue)); + } + + @Override + public StateFuture<Void> asyncPut(UK key, UV value) { + return original.asyncPut(key, value == null ? null : wrapWithTs(value)); + } + + @Override + public StateFuture<Void> asyncPutAll(Map<UK, UV> map) { + Map<UK, TtlValue<UV>> withTs = new HashMap(); + for (Map.Entry<UK, UV> entry : map.entrySet()) { + withTs.put( + entry.getKey(), entry.getValue() == null ? null : wrapWithTs(entry.getValue())); + } + return original.asyncPutAll(withTs); + } + + @Override + public StateFuture<Void> asyncRemove(UK key) { + return original.asyncRemove(key); + } + + @Override + public StateFuture<Boolean> asyncContains(UK key) { + return original.asyncGet(key) + .thenApply(ttlValue -> getElementWithTtlCheck(ttlValue) != null); + } + + @Override + public StateFuture<StateIterator<Map.Entry<UK, UV>>> asyncEntries() { + return original.asyncEntries().thenApply(iter -> new AsyncEntriesIterator<>(iter, e -> e)); + } + + @Override + public StateFuture<StateIterator<UK>> asyncKeys() { + return original.asyncEntries() + .thenApply(iter -> new AsyncEntriesIterator<>(iter, e -> e.getKey())); + } + + @Override + public StateFuture<StateIterator<UV>> asyncValues() { + return original.asyncEntries() + .thenApply(iter -> new AsyncEntriesIterator<>(iter, e -> e.getValue())); + } + + @Override + public StateFuture<Boolean> asyncIsEmpty() { + // the result may be wrong if state is expired. + return original.asyncIsEmpty(); + } + + @Override + public UV get(UK key) { + return getElementWithTtlCheck(original.get(key)); + } + + @Override + public void put(UK key, UV value) { + original.put(key, value == null ? null : wrapWithTs(value)); + } + + @Override + public void putAll(Map<UK, UV> map) { + Map<UK, TtlValue<UV>> withTs = new HashMap(); + long currentTimestamp = timeProvider.currentTimestamp(); + for (Map.Entry<UK, UV> entry : map.entrySet()) { + withTs.put( + entry.getKey(), + entry.getValue() == null + ? null + : TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp)); + } + original.putAll(withTs); + } + + @Override + public void remove(UK key) { + original.remove(key); + } + + @Override + public boolean contains(UK key) { + return getElementWithTtlCheck(original.get(key)) != null; + } + + @Override + public Iterable<Map.Entry<UK, UV>> entries() { + return entries(e -> e); + } + + @Override + public Iterable<UK> keys() { + return entries(e -> e.getKey()); + } + + @Override + public Iterable<UV> values() { + return entries(e -> e.getValue()); + } + + private <R> Iterable<R> entries(Function<Map.Entry<UK, UV>, R> resultMapper) { + Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries(); + return () -> + new EntriesIterator<>( + withTs == null ? Collections.emptyList() : withTs, resultMapper); + } + + @Override + public Iterator<Map.Entry<UK, UV>> iterator() { + return entries().iterator(); + } + + @Override + public boolean isEmpty() { + // todo: poor performance, if return `original.isEmpty()` directly, the result may be wrong. + return iterator().hasNext(); + } + + private class EntriesIterator<R> implements Iterator<R> { + private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator; + private final Function<Map.Entry<UK, UV>, R> resultMapper; + private Map.Entry<UK, UV> nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs, + @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + Map.Entry<UK, TtlValue<UV>> ttlEntry = originalIterator.next(); + UV value = getElementWithTtlCheck(ttlEntry.getValue()); + nextUnexpired = + value == null + ? null + : new AbstractMap.SimpleEntry<>(ttlEntry.getKey(), value); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { + originalIterator.remove(); + } else { + throw new IllegalStateException( + "next() has not been called or hasNext() has been called afterwards," + + " remove() is supported only right after calling next()"); + } + } + } + + private class AsyncEntriesIterator<R> implements StateIterator<R> { + private final StateIterator<Map.Entry<UK, TtlValue<UV>>> originalIterator; + private final Function<Map.Entry<UK, UV>, R> resultMapper; + + public AsyncEntriesIterator( + @Nonnull StateIterator<Map.Entry<UK, TtlValue<UV>>> originalIterator, + @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) { + this.originalIterator = originalIterator; + this.resultMapper = resultMapper; + } + + @Override + public <U> StateFuture<Collection<U>> onNext( + Function<R, StateFuture<? extends U>> iterating) { + Function<Map.Entry<UK, TtlValue<UV>>, StateFuture<? extends U>> ttlIterating = + (item) -> { + UV value = getElementWithTtlCheck(item.getValue()); + if (value == null) { + return null; + } + R result = + resultMapper.apply( + new AbstractMap.SimpleEntry<>(item.getKey(), value)); + return iterating.apply(result); + }; + return originalIterator.onNext(ttlIterating); + } + + @Override + public StateFuture<Void> onNext(Consumer<R> iterating) { + Consumer<Map.Entry<UK, TtlValue<UV>>> ttlIterating = + (item) -> { + UV value = getElementWithTtlCheck(item.getValue()); + if (value == null) { + return; + } + iterating.accept( + resultMapper.apply( + new AbstractMap.SimpleEntry<>(item.getKey(), value))); + }; + return originalIterator.onNext(ttlIterating); + } + + @Override + public boolean isEmpty() { + return false; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java new file mode 100644 index 00000000000..31549ecb99a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlReducingState.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalReducingState; + +import java.util.Collection; + +/** + * This class wraps reducing state with TTL logic. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <T> Type of the user value of state with TTL + */ +class TtlReducingState<K, N, T> + extends AbstractTtlState<K, N, T, TtlValue<T>, InternalReducingState<K, N, TtlValue<T>>> + implements InternalReducingState<K, N, T> { + + protected TtlReducingState( + TtlStateContext<InternalReducingState<K, N, TtlValue<T>>, T> ttlStateContext) { + super(ttlStateContext); + } + + @Override + public StateFuture<Void> asyncMergeNamespaces(N target, Collection<N> sources) { + return original.asyncMergeNamespaces(target, sources); + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) { + original.mergeNamespaces(target, sources); + } + + @Override + public StateFuture<T> asyncGet() { + return asyncGetInternal(); + } + + @Override + public StateFuture<Void> asyncAdd(T value) { + return asyncUpdateInternal(value); + } + + @Override + public T get() { + return getInternal(); + } + + @Override + public void add(T value) { + original.add(wrapWithTs(value)); + } + + @Override + public StateFuture<T> asyncGetInternal() { + return original.asyncGetInternal().thenApply(ttlValue -> getElementWithTtlCheck(ttlValue)); + } + + @Override + public StateFuture<Void> asyncUpdateInternal(T valueToStore) { + return original.asyncUpdateInternal(wrapWithTs(valueToStore)); + } + + @Override + public T getInternal() { + TtlValue<T> ttlValue = original.getInternal(); + return getElementWithTtlCheck(ttlValue); + } + + @Override + public void updateInternal(T valueToStore) { + original.updateInternal(wrapWithTs(valueToStore)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java new file mode 100644 index 00000000000..1f3050734fb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlStateFactory.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.state.StateTtlConfig; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.AsyncKeyedStateBackend; +import org.apache.flink.runtime.state.ttl.TtlReduceFunction; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; +import org.apache.flink.runtime.state.v2.MapStateDescriptor; +import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; +import org.apache.flink.runtime.state.v2.StateDescriptor; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkArgument; + +public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS> { + public static <K, N, SV, TTLSV, S extends State, IS extends S> + IS createStateAndWrapWithTtlIfEnabled( + N defaultNamespace, + TypeSerializer<N> namespaceSerializer, + StateDescriptor<SV> stateDesc, + AsyncKeyedStateBackend<K> stateBackend, + TtlTimeProvider timeProvider) + throws Exception { + Preconditions.checkNotNull(namespaceSerializer); + Preconditions.checkNotNull(stateDesc); + Preconditions.checkNotNull(stateBackend); + Preconditions.checkNotNull(timeProvider); + if (stateDesc.getTtlConfig().isEnabled()) { + if (!stateDesc.getTtlConfig().getCleanupStrategies().inRocksdbCompactFilter()) { + throw new UnsupportedOperationException( + "Only ROCKSDB_COMPACTION_FILTER strategy is supported in state V2."); + } + if (stateDesc + .getTtlConfig() + .getUpdateType() + .equals(StateTtlConfig.UpdateType.OnReadAndWrite)) { + throw new UnsupportedOperationException( + "OnReadAndWrite update type is not supported in state V2."); + } + } + return stateDesc.getTtlConfig().isEnabled() + ? new TtlStateFactory<K, N, SV, TTLSV, S, IS>( + defaultNamespace, + namespaceSerializer, + stateDesc, + stateBackend, + timeProvider) + .createState() + : stateBackend.createStateInternal( + defaultNamespace, namespaceSerializer, stateDesc); + } + + public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) { + // element's serializer in state descriptor. + boolean ttlSerializer = typeSerializer instanceof TtlSerializer; + return ttlSerializer; + } + + private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories; + private N defaultNamespace; + private TypeSerializer<N> namespaceSerializer; + private StateDescriptor<SV> stateDesc; + private AsyncKeyedStateBackend<K> stateBackend; + private TtlTimeProvider timeProvider; + + @Nonnull private final StateTtlConfig ttlConfig; + private final long ttl; + + private TtlStateFactory( + N defaultNamespace, + TypeSerializer<N> namespaceSerializer, + StateDescriptor<SV> stateDesc, + AsyncKeyedStateBackend<K> stateBackend, + TtlTimeProvider timeProvider) { + this.defaultNamespace = defaultNamespace; + this.namespaceSerializer = namespaceSerializer; + this.stateDesc = stateDesc; + this.stateBackend = stateBackend; + this.ttlConfig = stateDesc.getTtlConfig(); + this.timeProvider = timeProvider; + this.ttl = ttlConfig.getTimeToLive().toMillis(); + this.stateFactories = createStateFactories(); + } + + private Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> createStateFactories() { + return Stream.of( + Tuple2.of( + StateDescriptor.Type.VALUE, + (SupplierWithException<IS, Exception>) this::createValueState), + Tuple2.of( + StateDescriptor.Type.LIST, + (SupplierWithException<IS, Exception>) this::createListState), + Tuple2.of( + StateDescriptor.Type.MAP, + (SupplierWithException<IS, Exception>) this::createMapState), + Tuple2.of( + StateDescriptor.Type.REDUCING, + (SupplierWithException<IS, Exception>) this::createReducingState), + Tuple2.of( + StateDescriptor.Type.AGGREGATING, + (SupplierWithException<IS, Exception>) + this::createAggregatingState)) + .collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + } + + @SuppressWarnings("unchecked") + private IS createState() throws Exception { + SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getType()); + if (stateFactory == null) { + String message = + String.format( + "State type: %s is not supported by %s", + stateDesc.getType(), TtlStateFactory.class); + throw new FlinkRuntimeException(message); + } + return stateFactory.get(); + } + + @SuppressWarnings("unchecked") + private IS createValueState() throws Exception { + ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = + stateDesc.getSerializer() instanceof TtlSerializer + ? (ValueStateDescriptor<TtlValue<SV>>) stateDesc + : new ValueStateDescriptor<>( + stateDesc.getStateId(), + new TtlTypeInformation<>( + new TtlSerializer<>( + LongSerializer.INSTANCE, + stateDesc.getSerializer()))); + return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor)); + } + + @SuppressWarnings("unchecked") + private <T> IS createListState() throws Exception { + ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc; + ListStateDescriptor<TtlValue<T>> ttlDescriptor = + listStateDesc.getSerializer() instanceof TtlSerializer + ? (ListStateDescriptor<TtlValue<T>>) stateDesc + : new ListStateDescriptor<>( + stateDesc.getStateId(), + new TtlTypeInformation<>( + new TtlSerializer<>( + LongSerializer.INSTANCE, + listStateDesc.getSerializer()))); + return (IS) new TtlListState<>(createTtlStateContext(ttlDescriptor)); + } + + @SuppressWarnings("unchecked") + private <UK, UV> IS createMapState() throws Exception { + MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc; + MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = + mapStateDesc.getSerializer() instanceof TtlSerializer + ? (MapStateDescriptor<UK, TtlValue<UV>>) stateDesc + : new MapStateDescriptor<>( + stateDesc.getStateId(), + mapStateDesc.getUserKeyType(), + new TtlTypeInformation<>( + new TtlSerializer<>( + LongSerializer.INSTANCE, + mapStateDesc.getSerializer()))); + return (IS) new TtlMapState<>(createTtlStateContext(ttlDescriptor)); + } + + @SuppressWarnings("unchecked") + private IS createReducingState() throws Exception { + ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc; + ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = + stateDesc.getSerializer() instanceof TtlSerializer + ? (ReducingStateDescriptor<TtlValue<SV>>) stateDesc + : new ReducingStateDescriptor<>( + stateDesc.getStateId(), + new TtlReduceFunction<>( + reducingStateDesc.getReduceFunction(), + ttlConfig, + timeProvider), + new TtlTypeInformation<>( + new TtlSerializer<>( + LongSerializer.INSTANCE, + stateDesc.getSerializer()))); + return (IS) new TtlReducingState<>(createTtlStateContext(ttlDescriptor)); + } + + @SuppressWarnings("unchecked") + private <IN, OUT> IS createAggregatingState() throws Exception { + AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor = + (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc; + TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = + new TtlAggregateFunction<>( + aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); + AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = + stateDesc.getSerializer() instanceof TtlSerializer + ? (AggregatingStateDescriptor<IN, TtlValue<SV>, OUT>) stateDesc + : new AggregatingStateDescriptor<>( + stateDesc.getStateId(), + ttlAggregateFunction, + new TtlTypeInformation<>( + new TtlSerializer<>( + LongSerializer.INSTANCE, + stateDesc.getSerializer()))); + return (IS) + new TtlAggregatingState<>( + createTtlStateContext(ttlDescriptor), ttlAggregateFunction); + } + + @SuppressWarnings("unchecked") + private <OIS extends State, TTLS extends State, V, TTLV> + TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLV> ttlDescriptor) + throws Exception { + + ttlDescriptor.enableTimeToLive( + stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter + // config + OIS originalState = + (OIS) + stateBackend.createStateInternal( + defaultNamespace, namespaceSerializer, ttlDescriptor); + return new TtlStateContext<>( + originalState, + ttlConfig, + timeProvider, + (TypeSerializer<V>) stateDesc.getSerializer(), + () -> {}); + } + + public static class TtlTypeInformation<T> extends TypeInformation<TtlValue<T>> { + + Class<?> typeClass; + + TypeSerializer<TtlValue<T>> typeSerializer; + + TtlTypeInformation(TypeSerializer<TtlValue<T>> typeSerializer) { + this.typeSerializer = typeSerializer; + typeClass = TtlValue.class; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 2; + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public Class<TtlValue<T>> getTypeClass() { + return (Class<TtlValue<T>>) typeClass; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer<TtlValue<T>> createSerializer(SerializerConfig config) { + return typeSerializer; + } + + @Override + public String toString() { + return "TtlTypeInformation{}"; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + return typeSerializer.equals(((TtlTypeInformation<T>) obj).typeSerializer); + } + + @Override + public int hashCode() { + return typeSerializer.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TtlTypeInformation; + } + } + + /** + * Serializer for user state value with TTL. Visibility is public for usage with external tools. + */ + public static class TtlSerializer<T> extends CompositeSerializer<TtlValue<T>> { + private static final long serialVersionUID = 131020282727167064L; + + @SuppressWarnings("WeakerAccess") + public TtlSerializer( + TypeSerializer<Long> timestampSerializer, TypeSerializer<T> userValueSerializer) { + super(true, timestampSerializer, userValueSerializer); + checkArgument(!(userValueSerializer instanceof TtlSerializer)); + } + + @SuppressWarnings("WeakerAccess") + public TtlSerializer( + PrecomputedParameters precomputed, TypeSerializer<?>... fieldSerializers) { + super(precomputed, fieldSerializers); + } + + @SuppressWarnings("unchecked") + @Override + public TtlValue<T> createInstance(@Nonnull Object... values) { + Preconditions.checkArgument(values.length == 2); + return new TtlValue<>((T) values[1], (long) values[0]); + } + + @Override + protected void setField(@Nonnull TtlValue<T> v, int index, Object fieldValue) { + throw new UnsupportedOperationException("TtlValue is immutable"); + } + + @Override + protected Object getField(@Nonnull TtlValue<T> v, int index) { + return index == 0 ? v.getLastAccessTimestamp() : v.getUserValue(); + } + + @SuppressWarnings("unchecked") + @Override + protected CompositeSerializer<TtlValue<T>> createSerializerInstance( + PrecomputedParameters precomputed, TypeSerializer<?>... originalSerializers) { + Preconditions.checkNotNull(originalSerializers); + Preconditions.checkArgument(originalSerializers.length == 2); + return new TtlSerializer<>(precomputed, originalSerializers); + } + + @SuppressWarnings("unchecked") + TypeSerializer<Long> getTimestampSerializer() { + return (TypeSerializer<Long>) (TypeSerializer<?>) fieldSerializers[0]; + } + + @SuppressWarnings("unchecked") + TypeSerializer<T> getValueSerializer() { + return (TypeSerializer<T>) fieldSerializers[1]; + } + + @Override + public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() { + return new TtlSerializerSnapshot<>(this); + } + } + + /** A {@link TypeSerializerSnapshot} for TtlSerializer. */ + public static final class TtlSerializerSnapshot<T> + extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> { + + private static final int VERSION = 2; + + @SuppressWarnings({"WeakerAccess", "unused"}) + public TtlSerializerSnapshot() {} + + TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) { + super(serializerInstance); + } + + @Override + protected int getCurrentOuterSnapshotVersion() { + return VERSION; + } + + @Override + protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> outerSerializer) { + return new TypeSerializer[] { + outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer() + }; + } + + @Override + @SuppressWarnings("unchecked") + protected TtlSerializer<T> createOuterSerializerWithNestedSerializers( + TypeSerializer<?>[] nestedSerializers) { + TypeSerializer<Long> timestampSerializer = (TypeSerializer<Long>) nestedSerializers[0]; + TypeSerializer<T> valueSerializer = (TypeSerializer<T>) nestedSerializers[1]; + + return new TtlSerializer<>(timestampSerializer, valueSerializer); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java new file mode 100644 index 00000000000..b5878c7a9ff --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/v2/ttl/TtlValueState.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.v2.ttl; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.runtime.state.ttl.TtlStateContext; +import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.internal.InternalValueState; + +/** + * This class wraps value state with TTL logic. + * + * @param <K> The type of key the state is associated to + * @param <N> The type of the namespace + * @param <T> Type of the user value of state with TTL + */ +class TtlValueState<K, N, T> + extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>> + implements InternalValueState<K, N, T> { + + protected TtlValueState( + TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> ttlStateContext) { + super(ttlStateContext); + } + + @Override + public StateFuture<T> asyncValue() { + return original.asyncValue().thenApply((ttlValue) -> getElementWithTtlCheck(ttlValue)); + } + + @Override + public StateFuture<Void> asyncUpdate(T value) { + return original.asyncUpdate(value == null ? null : wrapWithTs(value)); + } + + @Override + public T value() { + TtlValue<T> ttlValue = original.value(); + return getElementWithTtlCheck(ttlValue); + } + + @Override + public void update(T value) { + original.update(value == null ? null : wrapWithTs(value)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java index ff01605809d..eca13bc180f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestUtils.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nonnull; @@ -140,6 +141,16 @@ public class StateBackendTestUtils { return (S) innerStateSupplier.get(); } + @Nonnull + @Override + public <N, S extends InternalKeyedState, SV> S createStateInternal( + @Nonnull N defaultNamespace, + @Nonnull TypeSerializer<N> namespaceSerializer, + @Nonnull org.apache.flink.runtime.state.v2.StateDescriptor<SV> stateDesc) + throws Exception { + return (S) innerStateSupplier.get(); + } + @Nonnull @Override public StateExecutor createStateExecutor() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java index bea212db17c..0426d43e519 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java @@ -233,14 +233,16 @@ class AbstractAggregatingStateTest extends AbstractKeyedStateTestBase { String key = (String) stateRequest.getRecordContext().getKey(); String namespace = (String) stateRequest.getNamespace(); if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_ADD) { - hashMap.put(Tuple2.of(key, namespace), (Integer) stateRequest.getPayload()); - stateRequest.getFuture().complete(null); + if (stateRequest.getPayload() == null) { + hashMap.remove(Tuple2.of(key, namespace)); + stateRequest.getFuture().complete(null); + } else { + hashMap.put(Tuple2.of(key, namespace), (Integer) stateRequest.getPayload()); + stateRequest.getFuture().complete(null); + } } else if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_GET) { Integer val = hashMap.get(Tuple2.of(key, namespace)); stateRequest.getFuture().complete(val); - } else if (stateRequest.getRequestType() == StateRequestType.AGGREGATING_REMOVE) { - hashMap.remove(Tuple2.of(key, namespace)); - stateRequest.getFuture().complete(null); } else { throw new UnsupportedOperationException("Unsupported type"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java index bb09983de6f..56ae6c13470 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.PriorityComparable; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -182,6 +183,16 @@ public class AbstractKeyedStateTestBase { return null; } + @Nonnull + @Override + public <N, S extends InternalKeyedState, SV> S createStateInternal( + @Nonnull N defaultNamespace, + @Nonnull TypeSerializer<N> namespaceSerializer, + @Nonnull StateDescriptor<SV> stateDesc) + throws Exception { + return null; + } + @Override public StateExecutor createStateExecutor() { return new TestStateExecutor(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java index ac0c6785061..1c8bfa276f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java @@ -163,13 +163,13 @@ public class AbstractReducingStateTest extends AbstractKeyedStateTestBase { } else if (request.getRequestType() == StateRequestType.REDUCING_ADD) { String key = (String) request.getRecordContext().getKey(); String namespace = (String) request.getNamespace(); - hashMap.put(Tuple2.of(key, namespace), (Integer) request.getPayload()); - request.getFuture().complete(null); - } else if (request.getRequestType() == StateRequestType.REDUCING_REMOVE) { - String key = (String) request.getRecordContext().getKey(); - String namespace = (String) request.getNamespace(); - hashMap.remove(Tuple2.of(key, namespace)); - request.getFuture().complete(null); + if (request.getPayload() == null) { + hashMap.remove(Tuple2.of(key, namespace)); + request.getFuture().complete(null); + } else { + hashMap.put(Tuple2.of(key, namespace), (Integer) request.getPayload()); + request.getFuture().complete(null); + } } else { throw new UnsupportedOperationException("Unsupported request type"); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java index b28ca4d7fa2..321e57efed1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java @@ -19,10 +19,13 @@ package org.apache.flink.runtime.state.v2; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.state.StateFutureImpl; import org.apache.flink.metrics.MetricGroup; @@ -59,6 +62,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -405,6 +409,64 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { } } + void testValueStateWorkWithTtl() throws Exception { + TestAsyncFrameworkExceptionHandler testExceptionHandler = + new TestAsyncFrameworkExceptionHandler(); + AsyncKeyedStateBackend<Long> backend = + createAsyncKeyedBackend(LongSerializer.INSTANCE, 128, env); + AsyncExecutionController<Long> aec = + new AsyncExecutionController<>( + new SyncMailboxExecutor(), + testExceptionHandler, + backend.createStateExecutor(), + 128, + 1, + -1, + 1, + null); + backend.setup(aec); + try { + ValueStateDescriptor<Long> kvId = + new ValueStateDescriptor<>("id", TypeInformation.of(Long.class)); + kvId.enableTimeToLive(StateTtlConfig.newBuilder(Duration.ofSeconds(1)).build()); + + ValueState<Long> state = + backend.createState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + RecordContext recordContext = aec.buildContext("record-1", 1L); + recordContext.retain(); + aec.setCurrentContext(recordContext); + state.update(1L); + assertThat(state.value()).isEqualTo(1L); + Thread.sleep(1000); + assertThat(state.value()).isNull(); + recordContext.release(); + + RecordContext recordContext1 = aec.buildContext("record-2", 2L); + aec.setCurrentContext(recordContext1); + state.asyncUpdate(2L) + .thenAccept( + (val) -> { + state.asyncValue() + .thenAccept( + (val1) -> { + assertThat(val1).isEqualTo(2); + Thread.sleep(1000); + state.asyncValue() + .thenAccept( + (val2) -> { + assertThat(val2).isNull(); + }); + }); + }); + Thread.sleep(3000); + recordContext1.release(); + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + static class TestAsyncFrameworkExceptionHandler implements StateFutureImpl.AsyncFrameworkExceptionHandler { String message = null; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java index 0843317cc83..abec5d6c83c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStAggregatingState.java @@ -148,16 +148,13 @@ public class ForStAggregatingState<K, N, IN, ACC, OUT> public ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.AGGREGATING_ADD - || stateRequest.getRequestType() == StateRequestType.AGGREGATING_REMOVE || stateRequest.getRequestType() == StateRequestType.CLEAR); ContextKey<K, N> contextKey = new ContextKey<>( (RecordContext<K>) stateRequest.getRecordContext(), (N) stateRequest.getNamespace()); ACC aggregate = - (stateRequest.getRequestType() == StateRequestType.CLEAR - || stateRequest.getRequestType() - == StateRequestType.AGGREGATING_REMOVE) + stateRequest.getRequestType() == StateRequestType.CLEAR ? null : (ACC) stateRequest.getPayload(); return ForStDBPutRequest.of( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java index 4e39492ae61..a1bc9ef2f92 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStDBTtlCompactFiltersManager.java @@ -27,10 +27,10 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlUtils; import org.apache.flink.runtime.state.ttl.TtlValue; +import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -92,8 +92,26 @@ public class ForStDBTtlCompactFiltersManager { if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) { RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase; - if (TtlStateFactory.TtlSerializer.isTtlStateSerializer( - kvMetaInfoBase.getStateSerializer())) { + if (org.apache.flink.runtime.state.ttl.TtlStateFactory.TtlSerializer + .isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) { + createAndSetCompactFilterFactory(metaInfoBase.getName(), options); + } + } + } + + public void setAndRegisterCompactFilterIfStateTtlV2( + @Nonnull RegisteredStateMetaInfoBase metaInfoBase, + @Nonnull ColumnFamilyOptions options) { + + if (metaInfoBase + instanceof + org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo) { + org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo + kvMetaInfoBase = + (org.apache.flink.runtime.state.v2 + .RegisteredKeyValueStateBackendMetaInfo) + metaInfoBase; + if (TtlStateFactory.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) { createAndSetCompactFilterFactory(metaInfoBase.getName(), options); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 4d3c82acc4a..61c62956313 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -47,12 +47,15 @@ import org.apache.flink.runtime.state.StateSnapshotTransformer; import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; import org.apache.flink.runtime.state.v2.ListStateDescriptor; import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.v2.StateDescriptor; import org.apache.flink.runtime.state.v2.ValueStateDescriptor; +import org.apache.flink.runtime.state.v2.internal.InternalKeyedState; +import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory; import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; @@ -95,6 +98,8 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { /** The key groups which this state backend is responsible for. */ private final KeyGroupRange keyGroupRange; + protected final TtlTimeProvider ttlTimeProvider; + /** The key serializer. */ protected final TypeSerializer<K> keySerializer; @@ -166,6 +171,8 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { @GuardedBy("lock") private boolean disposed = false; + private final ForStDBTtlCompactFiltersManager ttlCompactFiltersManager; + public ForStKeyedStateBackend( UUID backendUID, ForStResourceContainer optionsContainer, @@ -183,7 +190,9 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { PriorityQueueSetFactory priorityQueueFactory, CloseableRegistry cancelStreamRegistry, ForStNativeMetricMonitor nativeMetricMonitor, - InternalKeyContext<K> keyContext) { + InternalKeyContext<K> keyContext, + TtlTimeProvider ttlTimeProvider, + ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) { this.backendUID = backendUID; this.optionsContainer = Preconditions.checkNotNull(optionsContainer); this.keyGroupPrefixBytes = keyGroupPrefixBytes; @@ -199,6 +208,8 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { this.snapshotStrategy = snapshotStrategy; this.cancelStreamRegistry = cancelStreamRegistry; this.nativeMetricMonitor = nativeMetricMonitor; + this.ttlTimeProvider = ttlTimeProvider; + this.ttlCompactFiltersManager = ttlCompactFiltersManager; this.managedStateExecutors = new HashSet<>(1); this.priorityQueueFactory = priorityQueueFactory; if (priorityQueueFactory instanceof HeapPriorityQueueSetFactory) { @@ -226,6 +237,18 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception { + return TtlStateFactory.createStateAndWrapWithTtlIfEnabled( + defaultNamespace, namespaceSerializer, stateDesc, this, ttlTimeProvider); + } + + @Nonnull + @Override + @SuppressWarnings("unchecked") + public <N, S extends InternalKeyedState, SV> S createStateInternal( + @Nonnull N defaultNamespace, + @Nonnull TypeSerializer<N> namespaceSerializer, + @Nonnull StateDescriptor<SV> stateDesc) + throws Exception { Preconditions.checkNotNull( stateRequestHandler, "A non-null stateRequestHandler must be setup before createState"); @@ -247,6 +270,7 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { namespaceSerializer::duplicate, valueSerializerView, valueDeserializerView); + case LIST: return (S) new ForStListState<>( @@ -335,8 +359,12 @@ public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend<K> { StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform()); newStateInfo = - ForStOperationUtils.createStateInfo( - newMetaInfo, db, columnFamilyOptionsFactory); + ForStOperationUtils.createAsyncStateInfo( + newMetaInfo, + db, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + optionsContainer.getWriteBufferManagerCapacity()); ForStOperationUtils.registerKvStateInformation( this.kvStateInformation, this.nativeMetricMonitor, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index db57bfc20d0..7568dd637f8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.state.StateBackendBuilder; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation; import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation; @@ -101,6 +102,7 @@ public class ForStKeyedStateBackendBuilder<K> private final int numberOfKeyGroups; private final KeyGroupRange keyGroupRange; + private final TtlTimeProvider ttlTimeProvider; private final Collection<KeyedStateHandle> restoreStateHandles; @@ -128,6 +130,7 @@ public class ForStKeyedStateBackendBuilder<K> int numberOfKeyGroups, KeyGroupRange keyGroupRange, ForStPriorityQueueConfig priorityQueueConfig, + TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> stateHandles, @@ -141,6 +144,7 @@ public class ForStKeyedStateBackendBuilder<K> this.numberOfKeyGroups = numberOfKeyGroups; this.keyGroupRange = keyGroupRange; this.priorityQueueConfig = priorityQueueConfig; + this.ttlTimeProvider = ttlTimeProvider; this.metricGroup = metricGroup; this.customInitializationMetrics = customInitializationMetrics; this.restoreStateHandles = stateHandles; @@ -172,6 +176,12 @@ public class ForStKeyedStateBackendBuilder<K> LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates = new LinkedHashMap<>(); + ForStDBTtlCompactFiltersManager ttlCompactFiltersManager = + new ForStDBTtlCompactFiltersManager( + ttlTimeProvider, + optionsContainer.getQueryTimeAfterNumEntries(), + optionsContainer.getPeriodicCompactionTime()); + RocksDB db = null; ForStRestoreOperation restoreOperation = null; // Number of bytes required to prefix the key groups. @@ -277,7 +287,9 @@ public class ForStKeyedStateBackendBuilder<K> priorityQueueFactory, cancelStreamRegistryForBackend, nativeMetricMonitor, - keyContext); + keyContext, + ttlTimeProvider, + ttlCompactFiltersManager); } private ForStRestoreOperation getForStRestoreOperation( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java index f852a0bb8de..f69bb7ec73a 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOperationUtils.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.fs.ICloseableRegistry; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo; import org.apache.flink.state.forst.sync.ForStIteratorWrapper; @@ -277,7 +278,13 @@ public class ForStOperationUtils { createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); if (ttlCompactFiltersManager != null) { - ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); + if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) { + ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2( + metaInfoBase, options); + } else { + ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtlV2( + metaInfoBase, options); + } } if (writeBufferManagerCapacity != null) { @@ -378,6 +385,28 @@ public class ForStOperationUtils { return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase); } + public static ForStKvStateInfo createAsyncStateInfo( + RegisteredStateMetaInfoBase metaInfoBase, + RocksDB db, + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + @Nullable ForStDBTtlCompactFiltersManager ttlCompactFiltersManager, + @Nullable Long writeBufferManagerCapacity) { + + ColumnFamilyDescriptor columnFamilyDescriptor = + createColumnFamilyDescriptor( + metaInfoBase, + columnFamilyOptionsFactory, + ttlCompactFiltersManager, + writeBufferManagerCapacity); + try { + ColumnFamilyHandle columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db); + return new ForStKvStateInfo(columnFamilyHandle, metaInfoBase); + } catch (Exception ex) { + IOUtils.closeQuietly(columnFamilyDescriptor.getOptions()); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex); + } + } + private static void throwExceptionIfPathLengthExceededOnWindows(String path, Exception cause) throws IOException { // max directory path length on Windows is 247. diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java index 5043b483588..da6e1dfbb7c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStReducingState.java @@ -139,15 +139,13 @@ public class ForStReducingState<K, N, V> extends AbstractReducingState<K, N, V> public ForStDBPutRequest<K, N, V> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) { Preconditions.checkArgument( stateRequest.getRequestType() == StateRequestType.REDUCING_ADD - || stateRequest.getRequestType() == StateRequestType.REDUCING_REMOVE || stateRequest.getRequestType() == StateRequestType.CLEAR); ContextKey<K, N> contextKey = new ContextKey<>( (RecordContext<K>) stateRequest.getRecordContext(), (N) stateRequest.getNamespace()); V value = - (stateRequest.getRequestType() == StateRequestType.REDUCING_REMOVE - || stateRequest.getRequestType() == StateRequestType.CLEAR) + stateRequest.getRequestType() == StateRequestType.CLEAR ? null // "Delete(key)" is equivalent to "Put(key, null)" : (V) stateRequest.getPayload(); return ForStDBPutRequest.of( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 6e61ecd3920..0826b29ca21 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -363,6 +363,7 @@ public class ForStStateBackend extends AbstractManagedMemoryStateBackend parameters.getNumberOfKeyGroups(), parameters.getKeyGroupRange(), priorityQueueConfig, + parameters.getTtlTimeProvider(), parameters.getMetricGroup(), parameters.getCustomInitializationMetrics(), parameters.getStateHandles(), diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java index db768bcc18f..b9d08f504d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeInfoTestCoverageTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory; import org.apache.flink.table.dataview.ListViewTypeInfo; import org.apache.flink.table.dataview.MapViewTypeInfo; import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo; @@ -82,7 +83,8 @@ public class TypeInfoTestCoverageTest extends TestLogger { BigDecimalTypeInfo.class.getName(), DecimalDataTypeInfo.class.getName(), GenericRecordAvroTypeInfo.class.getName(), - AvroTypeInfo.class.getName()); + AvroTypeInfo.class.getName(), + TtlStateFactory.TtlTypeInformation.class.getName()); // check if a test exists for each type information for (Class<? extends TypeInformation> typeInfo : typeInfos) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java index f4d1b53a486..494ee2e827e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/completeness/TypeSerializerTestCoverageTest.java @@ -145,6 +145,8 @@ public class TypeSerializerTestCoverageTest extends TestLogger { CoGroupedStreams.UnionSerializer.class.getName(), TtlStateFactory.TtlSerializer.class.getName(), TtlAwareSerializer.class.getName(), + org.apache.flink.runtime.state.v2.ttl.TtlStateFactory.TtlSerializer.class + .getName(), TimeWindow.Serializer.class.getName(), InternalTimersSnapshotReaderWriters.LegacyTimerSerializer.class.getName(), TwoPhaseCommitSinkFunction.StateSerializer.class.getName(),