[FLINK-9701] Introduce TTL configuration in state descriptors This closes #6313.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f45b7f7f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f45b7f7f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f45b7f7f Branch: refs/heads/master Commit: f45b7f7ff27df019e9045895e718fa112d12139c Parents: b407ba7 Author: Andrey Zagrebin <azagre...@gmail.com> Authored: Tue Jul 3 19:23:41 2018 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Jul 12 20:18:23 2018 +0200 ---------------------------------------------------------------------- .../flink/api/common/state/StateDescriptor.java | 28 ++ .../api/common/state/StateTtlConfiguration.java | 3 - .../common/typeutils/CompositeSerializer.java | 56 ++- .../KVStateRequestSerializerRocksDBTest.java | 9 +- .../network/KvStateRequestSerializerTest.java | 9 +- .../network/KvStateServerHandlerTest.java | 4 +- .../state/AbstractKeyedStateBackend.java | 47 ++- .../runtime/state/AbstractStateBackend.java | 16 +- .../flink/runtime/state/KeyedStateFactory.java | 2 +- .../flink/runtime/state/StateBackend.java | 39 ++- .../state/filesystem/FsStateBackend.java | 21 +- .../state/heap/HeapKeyedStateBackend.java | 9 +- .../state/memory/MemoryStateBackend.java | 7 +- .../runtime/state/ttl/AbstractTtlDecorator.java | 2 - .../flink/runtime/state/ttl/TtlMapState.java | 97 ++++-- .../runtime/state/ttl/TtlReducingState.java | 2 +- .../runtime/state/ttl/TtlStateFactory.java | 22 +- .../runtime/state/ttl/TtlTimeProvider.java | 4 +- .../CheckpointSettingsSerializableTest.java | 4 +- ...HeapKeyedStateBackendAsyncByDefaultTest.java | 4 +- .../runtime/state/StateBackendTestBase.java | 15 +- .../state/StateSnapshotCompressionTest.java | 17 +- ...pKeyedStateBackendSnapshotMigrationTest.java | 4 +- .../state/heap/HeapStateBackendTestBase.java | 4 +- .../runtime/state/ttl/HeapTtlStateTest.java | 35 ++ .../runtime/state/ttl/MockTimeProvider.java | 28 -- .../runtime/state/ttl/MockTtlStateTest.java | 35 ++ .../runtime/state/ttl/MockTtlTimeProvider.java | 28 ++ .../state/ttl/StateBackendTestContext.java | 125 +++++++ .../state/ttl/TtlAggregatingStateTest.java | 90 ----- .../ttl/TtlAggregatingStateTestContext.java | 103 ++++++ .../runtime/state/ttl/TtlFoldingStateTest.java | 54 --- .../state/ttl/TtlFoldingStateTestContext.java | 67 ++++ .../runtime/state/ttl/TtlListStateTest.java | 75 ---- .../state/ttl/TtlListStateTestContext.java | 87 +++++ .../ttl/TtlMapStateAllEntriesTestContext.java | 66 ++++ .../state/ttl/TtlMapStatePerElementTest.java | 42 --- .../ttl/TtlMapStatePerElementTestContext.java | 53 +++ .../runtime/state/ttl/TtlMapStateTest.java | 55 --- .../runtime/state/ttl/TtlMapStateTestBase.java | 33 -- .../state/ttl/TtlMapStateTestContext.java | 35 ++ .../runtime/state/ttl/TtlMergingStateBase.java | 126 ------- .../state/ttl/TtlMergingStateTestContext.java | 99 ++++++ .../runtime/state/ttl/TtlReducingStateTest.java | 71 ---- .../state/ttl/TtlReducingStateTestContext.java | 84 +++++ .../runtime/state/ttl/TtlStateTestBase.java | 340 +++++++++++++++---- .../state/ttl/TtlStateTestContextBase.java | 52 +++ .../runtime/state/ttl/TtlValueStateTest.java | 51 --- .../state/ttl/TtlValueStateTestContext.java | 64 ++++ .../state/ttl/mock/MockInternalKvState.java | 29 +- .../state/ttl/mock/MockInternalMapState.java | 10 +- .../state/ttl/mock/MockKeyedStateBackend.java | 218 ++++++++++++ .../state/ttl/mock/MockKeyedStateFactory.java | 64 ---- .../state/ttl/mock/MockStateBackend.java | 111 ++++++ .../state/RocksDBKeyedStateBackend.java | 9 +- .../streaming/state/RocksDBStateBackend.java | 7 +- .../state/RocksDBStateBackendTest.java | 4 +- .../streaming/state/RocksDBTtlStateTest.java | 62 ++++ .../StreamTaskStateInitializerImpl.java | 4 +- .../StreamTaskStateInitializerImplTest.java | 4 +- .../tasks/StreamTaskTerminationTest.java | 4 +- .../tasks/TestSpyWrapperStateBackend.java | 7 +- .../streaming/runtime/StateBackendITCase.java | 16 +- 63 files changed, 1933 insertions(+), 939 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 6c54e71..956fd05 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -92,6 +93,10 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl @Nullable private String queryableStateName; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable + private StateTtlConfiguration ttlConfig; + /** The default value returned by the state when no other value is bound to a key. */ @Nullable protected transient T defaultValue; @@ -203,6 +208,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl * @throws IllegalStateException If queryable state name already set */ public void setQueryable(String queryableStateName) { + Preconditions.checkArgument(ttlConfig == null, + "Queryable state is currently not supported with TTL"); if (this.queryableStateName == null) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); } else { @@ -230,6 +237,27 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl return queryableStateName != null; } + /** + * Configures optional activation of state time-to-live (TTL). + * + * <p>State user value will expire, become unavailable and be cleaned up in storage + * depending on configured {@link StateTtlConfiguration}. + * + * @param ttlConfig configuration of state TTL + */ + public void enableTimeToLive(StateTtlConfiguration ttlConfig) { + Preconditions.checkNotNull(ttlConfig); + Preconditions.checkArgument(queryableStateName == null, + "Queryable state is currently not supported with TTL"); + this.ttlConfig = ttlConfig; + } + + @Nullable + @Internal + public StateTtlConfiguration getTtlConfig() { + return ttlConfig; + } + // ------------------------------------------------------------------------ /** http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java index 8ef2046..9bd8b15 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java @@ -27,15 +27,12 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT /** * Configuration of state TTL logic. - * TODO: builder */ public class StateTtlConfiguration { /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum TtlUpdateType { - /** TTL is disabled. State does not expire. */ - Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as <code>OnCreateAndWrite</code> but also updated on read. */ http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index 43c5533..2db7a30 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -50,7 +50,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { @SuppressWarnings("unchecked") protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) { this( - new PrecomputedParameters(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers), + PrecomputedParameters.precompute(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers), fieldSerializers); } @@ -187,6 +187,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { return 31 * Boolean.hashCode(precomputed.immutableTargetType) + Arrays.hashCode(fieldSerializers); } + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @Override public boolean equals(Object obj) { if (canEqual(obj)) { @@ -205,17 +206,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new CompositeTypeSerializerConfigSnapshot(fieldSerializers) { - @Override - public int getVersion() { - return 0; - } - }; + return new ConfigSnapshot(fieldSerializers); } @Override public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) { + if (configSnapshot instanceof ConfigSnapshot) { List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = ((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); if (previousSerializersAndConfigs.size() == fieldSerializers.length) { @@ -242,11 +238,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { } } } - PrecomputedParameters precomputed = - new PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers); - return requiresMigration ? - CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)) : - CompatibilityResult.compatible(); + return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible(); } private CompatibilityResult<Object> resolveFieldCompatibility( @@ -256,6 +248,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + private CompatibilityResult<T> createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) { + PrecomputedParameters precomputed = + PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers); + return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)); + } + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ protected static class PrecomputedParameters implements Serializable { private static final long serialVersionUID = 1L; @@ -272,7 +270,14 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { /** Whether any field serializer is stateful. */ final boolean stateful; - PrecomputedParameters( + private PrecomputedParameters(boolean immutableTargetType, boolean immutable, int length, boolean stateful) { + this.immutableTargetType = immutableTargetType; + this.immutable = immutable; + this.length = length; + this.stateful = stateful; + } + + static PrecomputedParameters precompute( boolean immutableTargetType, TypeSerializer<Object>[] fieldSerializers) { Preconditions.checkNotNull(fieldSerializers); @@ -292,11 +297,26 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> { } totalLength = totalLength >= 0 ? totalLength + fieldSerializer.getLength() : totalLength; } + return new PrecomputedParameters(immutableTargetType, fieldsImmutable, totalLength, stateful); + } + } - this.immutableTargetType = immutableTargetType; - this.immutable = immutableTargetType && fieldsImmutable; - this.length = totalLength; - this.stateful = stateful; + /** Snapshot field serializers of composite type. */ + public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + private static final int VERSION = 0; + + /** This empty nullary constructor is required for deserializing the configuration. */ + @SuppressWarnings("unused") + public ConfigSnapshot() { + } + + ConfigSnapshot(@Nonnull TypeSerializer<?>... nestedSerializers) { + super(nestedSerializers); + } + + @Override + public int getVersion() { + return VERSION; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index 9ea3198..176e55a 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Rule; import org.junit.Test; @@ -80,12 +81,13 @@ public final class KVStateRequestSerializerRocksDBTest { new ExecutionConfig(), false, TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP + RocksDBStateBackend.PriorityQueueStateType.HEAP, + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE, + final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); KvStateRequestSerializerTest.testListSerialization(key, listState); @@ -121,7 +123,8 @@ public final class KVStateRequestSerializerRocksDBTest { new ExecutionConfig(), false, TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP); + RocksDBStateBackend.PriorityQueueStateType.HEAP, + TtlTimeProvider.DEFAULT); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index 73f8831..d539066 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Test; import org.junit.runner.RunWith; @@ -198,11 +199,12 @@ public class KvStateRequestSerializerTest { async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState( + final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); @@ -306,7 +308,8 @@ public class KvStateRequestSerializerTest { async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.setCurrentKey(key); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 9947dac..adcf3ae 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -761,6 +762,7 @@ public class KvStateServerHandlerTest extends TestLogger { IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()), + TtlTimeProvider.DEFAULT); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 8ce25b6..c7f1bd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -27,6 +27,8 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend<K> implements private final ExecutionConfig executionConfig; + private final TtlTimeProvider ttlTimeProvider; + /** Decorates the input and output streams to write key-groups compressed. */ protected final StreamCompressionDecorator keyGroupCompressionDecorator; @@ -93,7 +97,8 @@ public abstract class AbstractKeyedStateBackend<K> implements ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig) { + ExecutionConfig executionConfig, + TtlTimeProvider ttlTimeProvider) { this.kvStateRegistry = kvStateRegistry; this.keySerializer = Preconditions.checkNotNull(keySerializer); @@ -104,6 +109,7 @@ public abstract class AbstractKeyedStateBackend<K> implements this.keyValueStatesByName = new HashMap<>(); this.executionConfig = executionConfig; this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig); + this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider); } private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) { @@ -220,40 +226,33 @@ public abstract class AbstractKeyedStateBackend<K> implements public <N, S extends State, V> S getOrCreateKeyedState( final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception { - checkNotNull(namespaceSerializer, "Namespace serializer"); + checkNotNull(keySerializer, "State key serializer has not been configured in the config. " + + "This operation cannot use partitioned state."); - if (keySerializer == null) { - throw new UnsupportedOperationException( - "State key serializer has not been configured in the config. " + - "This operation cannot use partitioned state."); - } - - if (!stateDescriptor.isSerializerInitialized()) { - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - } - - InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName()); - if (existing != null) { - @SuppressWarnings("unchecked") - S typedState = (S) existing; - return typedState; + InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName()); + if (kvState == null) { + if (!stateDescriptor.isSerializerInitialized()) { + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + } + kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled( + namespaceSerializer, stateDescriptor, this, ttlTimeProvider); + keyValueStatesByName.put(stateDescriptor.getName(), kvState); + publishQueryableStateIfEnabled(stateDescriptor, kvState); } + return (S) kvState; + } - InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor); - keyValueStatesByName.put(stateDescriptor.getName(), kvState); - - // Publish queryable state + private void publishQueryableStateIfEnabled( + StateDescriptor<?, ?> stateDescriptor, + InternalKvState<?, ?, ?> kvState) { if (stateDescriptor.isQueryable()) { if (kvStateRegistry == null) { throw new IllegalStateException("State backend has not been initialized for job."); } - String name = stateDescriptor.getQueryableStateName(); kvStateRegistry.registerKvState(keyGroupRange, name, kvState); } - - return (S) kvState; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 7e9c357..d397a88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import java.io.IOException; @@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri @Override public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException; + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws IOException; @Override public abstract OperatorStateBackend createOperatorStateBackend( http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java index dd251bd..de35979 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java @@ -36,7 +36,7 @@ public interface KeyedStateFactory { * @param <S> The type of the public API state. * @param <IS> The type of internal state. */ - <N, SV, S extends State, IS extends S> IS createState( + <N, SV, S extends State, IS extends S> IS createInternalState( TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index f34cd9b..2775b71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import java.io.IOException; @@ -117,7 +118,7 @@ public interface StateBackend extends java.io.Serializable { /** * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> - * and checkpointing it. + * and checkpointing it. Uses default TTL time provider. * * <p><i>Keyed State</i> is state where each value is bound to a key. * @@ -127,14 +128,46 @@ public interface StateBackend extends java.io.Serializable { * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. */ - <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry) throws Exception { + return createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + TtlTimeProvider.DEFAULT); + } + + /** + * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> + * and checkpointing it. + * + * <p><i>Keyed State</i> is state where each value is bound to a key. + * + * @param <K> The type of the keys by which the state is organized. + * + * @return The Keyed State Backend for the given job, operator, and key group range. + * + * @throws Exception This method may forward all exceptions that occur while instantiating the backend. + */ + <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws Exception; /** * Creates a new {@link OperatorStateBackend} that can be used for storing operator state. http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index ad1581b..f5a86e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TernaryBoolean; import org.slf4j.LoggerFactory; @@ -93,7 +94,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; // ------------------------------------------------------------------------ @@ -448,13 +449,14 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur @Override public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) { + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { TaskStateManager taskStateManager = env.getTaskStateManager(); LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); @@ -470,7 +472,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur isUsingAsynchronousSnapshots(), env.getExecutionConfig(), localRecoveryConfig, - priorityQueueSetFactory); + priorityQueueSetFactory, + ttlTimeProvider); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 562c93d..495dfe0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StateSnapshot; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -171,9 +172,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { boolean asynchronousSnapshots, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, - PriorityQueueSetFactory priorityQueueSetFactory) { + PriorityQueueSetFactory priorityQueueSetFactory, + TtlTimeProvider ttlTimeProvider) { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + super(kvStateRegistry, keySerializer, userCodeClassLoader, + numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider); this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = asynchronousSnapshots ? @@ -241,7 +244,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } @Override - public <N, SV, S extends State, IS extends S> IS createState( + public <N, SV, S extends State, IS extends S> IS createInternalState( TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc) throws Exception { StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index d78944c..1c464d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TernaryBoolean; import javax.annotation.Nullable; @@ -307,7 +308,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { TaskStateManager taskStateManager = env.getTaskStateManager(); HeapPriorityQueueSetFactory priorityQueueSetFactory = @@ -321,7 +323,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf isUsingAsynchronousSnapshots(), env.getExecutionConfig(), taskStateManager.createLocalRecoveryConfig(), - priorityQueueSetFactory); + priorityQueueSetFactory, + ttlTimeProvider); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java index 29a575a..1b72c54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java @@ -55,8 +55,6 @@ abstract class AbstractTtlDecorator<T> { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); - Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled, - "State does not need to be wrapped with TTL if it is configured as disabled."); this.original = original; this.config = config; this.timeProvider = timeProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index 98d7c52..21145e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -23,13 +23,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.FlinkRuntimeException; +import javax.annotation.Nonnull; + import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import java.util.NoSuchElementException; +import java.util.function.Function; /** * This class wraps map state with TTL logic. @@ -84,52 +86,89 @@ class TtlMapState<K, N, UK, UV> @Override public Iterable<Map.Entry<UK, UV>> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception { + private <R> Iterable<R> entries( + Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception { Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable<UK> keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable<UV> values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator<Map.Entry<UK, UV>> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator<R> implements Iterator<R> { + private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator; + private final Function<Map.Entry<UK, UV>, R> resultMapper; + private Map.Entry<UK, UV> nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs, + @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { + originalIterator.remove(); + } else { + throw new IllegalStateException("next() has not been called or hasNext() has been called afterwards," + + " remove() is supported only right after calling next()"); + } + } + + private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) { + UV unexpiredValue; + try { + unexpiredValue = getWithTtlCheckAndUpdate( + e::getValue, + v -> original.put(e.getKey(), v), + originalIterator::remove); + } catch (Exception ex) { + throw new FlinkRuntimeException(ex); + } + return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java index 01e4be9..c0aa465 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java @@ -49,7 +49,7 @@ class TtlReducingState<K, N, T> @Override public void add(T value) throws Exception { - original.add(wrapWithTs(value, Long.MAX_VALUE)); + original.add(wrapWithTs(value)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 82096a6..5909ac7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -49,16 +49,14 @@ public class TtlStateFactory { TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc, KeyedStateFactory originalStateFactory, - StateTtlConfiguration ttlConfig, TtlTimeProvider timeProvider) throws Exception { Preconditions.checkNotNull(namespaceSerializer); Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); - Preconditions.checkNotNull(ttlConfig); Preconditions.checkNotNull(timeProvider); - return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ? - originalStateFactory.createState(namespaceSerializer, stateDesc) : - new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + return stateDesc.getTtlConfig() == null ? + originalStateFactory.createInternalState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider) .createState(namespaceSerializer, stateDesc); } @@ -96,7 +94,7 @@ public class TtlStateFactory { stateDesc.getClass(), TtlStateFactory.class); throw new FlinkRuntimeException(message); } - return stateFactory.createState(namespaceSerializer, stateDesc); + return stateFactory.createInternalState(namespaceSerializer, stateDesc); } @SuppressWarnings("unchecked") @@ -106,7 +104,7 @@ public class TtlStateFactory { ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlValueState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } @@ -118,7 +116,7 @@ public class TtlStateFactory { ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); return (IS) new TtlListState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, listStateDesc.getSerializer()); } @@ -132,7 +130,7 @@ public class TtlStateFactory { mapStateDesc.getKeySerializer(), new TtlSerializer<>(mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, mapStateDesc.getSerializer()); } @@ -146,7 +144,7 @@ public class TtlStateFactory { new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlReducingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } @@ -161,7 +159,7 @@ public class TtlStateFactory { AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction); } @@ -178,7 +176,7 @@ public class TtlStateFactory { new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlFoldingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java index bac9d36..84809cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.ttl; /** * Provides time to TTL logic to judge about state expiration. */ -interface TtlTimeProvider { +public interface TtlTimeProvider { + TtlTimeProvider DEFAULT = System::currentTimeMillis; + long currentTimestamp(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 9456f10..ffebc52 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -168,7 +169,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger { TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws Exception { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java index 1325431..6424a7a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.junit.Rule; @@ -70,7 +71,8 @@ public class HeapKeyedStateBackendAsyncByDefaultTest { IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - null + null, + TtlTimeProvider.DEFAULT ); assertTrue(keyedStateBackend.supportsAsynchronousSnapshots()); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 3c5756b..bfdc05d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -73,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader; import org.apache.flink.types.IntValue; @@ -182,7 +183,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT); backend.restore(null); @@ -219,7 +221,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT); backend.restore(new StateObjectCollection<>(state)); @@ -3545,7 +3548,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } // insert some data to the backend. - InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState( + InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3602,7 +3605,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten try { backend = createKeyedBackend(IntSerializer.INSTANCE); - InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState( + InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3649,7 +3652,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten try { backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle); - InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState( + InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3791,7 +3794,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten return; } - InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState( + InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index dfcdffc..558f629 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TestLogger; import org.apache.commons.io.IOUtils; @@ -54,7 +55,8 @@ public class StateSnapshotCompressionTest extends TestLogger { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { Assert.assertTrue( @@ -77,7 +79,8 @@ public class StateSnapshotCompressionTest extends TestLogger { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { Assert.assertTrue( @@ -118,12 +121,13 @@ public class StateSnapshotCompressionTest extends TestLogger { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { InternalValueState<String, VoidNamespace, String> state = - stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor); + stateBackend.createInternalState(new VoidNamespaceSerializer(), stateDescriptor); stateBackend.setCurrentKey("A"); state.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -160,12 +164,13 @@ public class StateSnapshotCompressionTest extends TestLogger { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { stateBackend.restore(StateObjectCollection.singleton(stateHandle)); - InternalValueState<String, VoidNamespace, String> state = stateBackend.createState( + InternalValueState<String, VoidNamespace, String> state = stateBackend.createInternalState( new VoidNamespaceSerializer(), stateDescriptor); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index 249d0c3..7b8d69f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot())); - InternalMapState<String, Integer, Long, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr); + InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr); keyedBackend.setCurrentKey("abc"); state.setCurrentNamespace(namespace1); @@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); - InternalListState<String, Integer, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr); + InternalListState<String, Integer, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr); assertEquals(7, keyedBackend.numStateEntries()); http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index cf6aef4..0eddf3c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -61,6 +62,7 @@ public abstract class HeapStateBackendTestBase { async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128)); + new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128), + TtlTimeProvider.DEFAULT); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java new file mode 100644 index 0000000..06de4be --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; + +/** Test suite for heap state TTL. */ +public class HeapTtlStateTest extends TtlStateTestBase { + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return new MemoryStateBackend(false); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java deleted file mode 100644 index e14c3f8..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl; - -class MockTimeProvider implements TtlTimeProvider { - long time = 0; - - @Override - public long currentTimestamp() { - return time; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java new file mode 100644 index 0000000..392bdba --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; + +/** Test suite for mock state TTL. */ +public class MockTtlStateTest extends TtlStateTestBase { + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return new MockStateBackend(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java new file mode 100644 index 0000000..f980043 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +class MockTtlTimeProvider implements TtlTimeProvider { + long time = 0; + + @Override + public long currentTimestamp() { + return time; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java new file mode 100644 index 0000000..eaec234 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend<String> keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + disposeKeyedStateBackend(); + keyedStateBackend = stateBackend.createKeyedStateBackend( + env, new JobID(), "test", StringSerializer.INSTANCE, 10, + new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider); + keyedStateBackend.setCurrentKey("defaultKey"); + } catch (Exception e) { + throw new RuntimeException("unexpected"); + } + } + + void disposeKeyedStateBackend() { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); + keyedStateBackend = null; + } + } + + @Nonnull + KeyedStateHandle takeSnapshot() throws Exception { + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture = + keyedStateBackend.snapshot(682375462392L, 10L, + checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation()); + if (!snapshotRunnableFuture.isDone()) { + snapshotRunnableFuture.run(); + } + return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot(); + } + + void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws Exception { + Collection<KeyedStateHandle> restoreState = + snapshot == null ? null : new StateObjectCollection<>(Collections.singleton(snapshot)); + keyedStateBackend.restore(restoreState); + if (snapshot != null) { + snapshot.discardState(); + } + } + + void setCurrentKey(String key) { + Preconditions.checkNotNull(keyedStateBackend, "keyed backend is not initialised"); + keyedStateBackend.setCurrentKey(key); + } + + @SuppressWarnings("unchecked") + <N, S extends State, V> S createState( + StateDescriptor<S, V> stateDescriptor, + @SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception { + S state = keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, stateDescriptor); + ((InternalKvState<?, N, ?>) state).setCurrentNamespace(defaultNamespace); + return state; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java deleted file mode 100644 index 5d9c682..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl; - -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.tuple.Tuple2; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** Test suite for {@link TtlAggregatingState}. */ -public class TtlAggregatingStateTest - extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> { - private static final long DEFAULT_ACCUMULATOR = 3L; - - @Override - void initTestValues() { - updater = v -> ttlState.add(v); - getter = () -> ttlState.get(); - originalGetter = () -> ttlState.original.get(); - - updateEmpty = 5; - updateUnexpired = 7; - updateExpired = 6; - - getUpdateEmpty = "8"; - getUnexpired = "15"; - getUpdateExpired = "9"; - } - - @Override - TtlAggregatingState<?, String, Integer, Long, String> createState() { - AggregatingStateDescriptor<Integer, Long, String> aggregatingStateDes = - new AggregatingStateDescriptor<>("TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE); - return (TtlAggregatingState<?, String, Integer, Long, String>) wrapMockState(aggregatingStateDes); - } - - @Override - String getMergeResult( - List<Tuple2<String, Integer>> unexpiredUpdatesToMerge, - List<Tuple2<String, Integer>> finalUpdatesToMerge) { - Set<String> namespaces = new HashSet<>(); - unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0)); - finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0)); - return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) + - namespaces.size() * (int) DEFAULT_ACCUMULATOR); - } - - private static final AggregateFunction<Integer, Long, String> AGGREGATE = - new AggregateFunction<Integer, Long, String>() { - @Override - public Long createAccumulator() { - return DEFAULT_ACCUMULATOR; - } - - @Override - public Long add(Integer value, Long accumulator) { - return accumulator + value; - } - - @Override - public String getResult(Long accumulator) { - return accumulator.toString(); - } - - @Override - public Long merge(Long a, Long b) { - return a + b; - } - }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java new file mode 100644 index 0000000..b19391a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** Test suite for {@link TtlAggregatingState}. */ +class TtlAggregatingStateTestContext + extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> { + private static final long DEFAULT_ACCUMULATOR = 3L; + + @Override + void initTestValues() { + updateEmpty = 5; + updateUnexpired = 7; + updateExpired = 6; + + getUpdateEmpty = "8"; + getUnexpired = "15"; + getUpdateExpired = "9"; + } + + @SuppressWarnings("unchecked") + @Override + <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() { + return (StateDescriptor<US, SV>) new AggregatingStateDescriptor<>( + "TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE); + } + + @Override + void update(Integer value) throws Exception { + ttlState.add(value); + } + + @Override + String get() throws Exception { + return ttlState.get(); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.get(); + } + + @Override + String getMergeResult( + List<Tuple2<String, Integer>> unexpiredUpdatesToMerge, + List<Tuple2<String, Integer>> finalUpdatesToMerge) { + Set<String> namespaces = new HashSet<>(); + unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0)); + finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0)); + return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) + + namespaces.size() * (int) DEFAULT_ACCUMULATOR); + } + + private static final AggregateFunction<Integer, Long, String> AGGREGATE = + new AggregateFunction<Integer, Long, String>() { + @Override + public Long createAccumulator() { + return DEFAULT_ACCUMULATOR; + } + + @Override + public Long add(Integer value, Long accumulator) { + return accumulator + value; + } + + @Override + public String getResult(Long accumulator) { + return accumulator.toString(); + } + + @Override + public Long merge(Long a, Long b) { + return a + b; + } + }; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java deleted file mode 100644 index 8dac8ca..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.typeutils.base.StringSerializer; - -/** Test suite for {@link TtlFoldingState}. */ -@SuppressWarnings("deprecation") -public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, String, Long, String>, Long, String> { - @Override - void initTestValues() { - updater = v -> ttlState.add(v); - getter = () -> ttlState.get(); - originalGetter = () -> ttlState.original.get(); - - updateEmpty = 5L; - updateUnexpired = 7L; - updateExpired = 6L; - - getUpdateEmpty = "6"; - getUnexpired = "13"; - getUpdateExpired = "7"; - } - - @Override - TtlFoldingState<?, String, Long, String> createState() { - FoldingStateDescriptor<Long, String> foldingStateDesc = - new FoldingStateDescriptor<>("TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE); - return (TtlFoldingState<?, String, Long, String>) wrapMockState(foldingStateDesc); - } - - private static final FoldFunction<Long, String> FOLD = (acc, val) -> { - long lacc = acc == null ? 0 : Long.parseLong(acc); - return Long.toString(val == null ? lacc : lacc + val); - }; -} http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java new file mode 100644 index 0000000..2b072b9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; + +/** Test suite for {@link TtlFoldingState}. */ +@SuppressWarnings("deprecation") +class TtlFoldingStateTestContext extends TtlStateTestContextBase<TtlFoldingState<?, String, Long, String>, Long, String> { + @Override + void initTestValues() { + updateEmpty = 5L; + updateUnexpired = 7L; + updateExpired = 6L; + + getUpdateEmpty = "6"; + getUnexpired = "13"; + getUpdateExpired = "7"; + } + + @Override + void update(Long value) throws Exception { + ttlState.add(value); + } + + @Override + String get() throws Exception { + return ttlState.get(); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.get(); + } + + @SuppressWarnings("unchecked") + @Override + <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() { + return (StateDescriptor<US, SV>) new FoldingStateDescriptor<>( + "TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE); + } + + private static final FoldFunction<Long, String> FOLD = (acc, val) -> { + long lacc = acc == null ? 0 : Long.parseLong(acc); + return Long.toString(val == null ? lacc : lacc + val); + }; +}