This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new f10a7d8 [FLINK-11833] [State Backends] Cleanup unnecessary createKeyedStateBackend methods in StateBackend f10a7d8 is described below commit f10a7d8b5da5d9836a8fb3b2f38e099152f1d75f Author: Yu Li <l...@apache.org> AuthorDate: Wed Mar 6 17:26:58 2019 +0800 [FLINK-11833] [State Backends] Cleanup unnecessary createKeyedStateBackend methods in StateBackend This closes #7909. --- .../flink/queryablestate/network/ClientTest.java | 21 ++-- .../queryablestate/network/KvStateServerTest.java | 9 +- .../apache/flink/runtime/state/StateBackend.java | 119 +++------------------ .../HeapKeyedStateBackendAsyncByDefaultTest.java | 9 +- .../state/StateBackendMigrationTestBase.java | 15 ++- .../flink/runtime/state/StateBackendTestBase.java | 25 +++-- .../runtime/state/ttl/StateBackendTestContext.java | 15 ++- .../streaming/state/RocksDBAsyncSnapshotTest.java | 10 +- .../state/RocksDBRocksStateKeysIteratorTest.java | 10 +- .../state/RocksDBStateBackendConfigTest.java | 69 +++++++----- .../api/operators/StreamingRuntimeContextTest.java | 39 ++++--- .../operators/windowing/TriggerTestHarness.java | 23 ++-- 12 files changed, 185 insertions(+), 179 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index bceb361..85b29fa 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; @@ -40,6 +42,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; @@ -630,13 +633,17 @@ public class ClientTest { dummyEnv.setKvStateRegistry(dummyRegistry); AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend( - dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - numKeyGroups, - new KeyGroupRange(0, 0), - dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID())); + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + numKeyGroups, + new KeyGroupRange(0, 0), + dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java index 79c23ad..6aace48 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.client.VoidNamespace; import org.apache.flink.queryablestate.client.VoidNamespaceSerializer; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; @@ -39,6 +41,7 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -117,7 +120,11 @@ public class KvStateServerTest { IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), - registry.createTaskRegistry(jobId, new JobVertexID())); + registry.createTaskRegistry(jobId, new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); final KvStateServerHandlerTest.TestRegistryListener registryListener = new KvStateServerHandlerTest.TestRegistryListener(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index 0fccd60..bd3cee8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; @@ -32,7 +31,6 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.util.Collection; -import java.util.Collections; /** * A <b>State Backend</b> defines how the state of a streaming application is stored and @@ -122,117 +120,24 @@ public interface StateBackend extends java.io.Serializable { // ------------------------------------------------------------------------ // Structure Backends // ------------------------------------------------------------------------ - - /** - * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> - * and checkpointing it. Uses default TTL time provider. - * - * <p><i>Keyed State</i> is state where each value is bound to a key. - * - * @param <K> The type of the keys by which the state is organized. - * - * @return The Keyed State Backend for the given job, operator, and key group range. - * - * @throws Exception This method may forward all exceptions that occur while instantiating the backend. - */ - default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { - return createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - TtlTimeProvider.DEFAULT - ); - } - - /** - * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> - * and checkpointing it. - * - * <p><i>Keyed State</i> is state where each value is bound to a key. - * - * @param <K> The type of the keys by which the state is organized. - * - * @return The Keyed State Backend for the given job, operator, and key group range. - * - * @throws Exception This method may forward all exceptions that occur while instantiating the backend. - */ - default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry, - TtlTimeProvider ttlTimeProvider - ) throws Exception { - return createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - new UnregisteredMetricsGroup(), - Collections.emptyList(), - new CloseableRegistry()); - } - - /** - * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> - * and checkpointing it. - * - * <p><i>Keyed State</i> is state where each value is bound to a key. - * - * @param <K> The type of the keys by which the state is organized. - * @return The Keyed State Backend for the given job, operator, and key group range. - * @throws Exception This method may forward all exceptions that occur while instantiating the backend. - */ - default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry, - TtlTimeProvider ttlTimeProvider, - Collection<KeyedStateHandle> stateHandles - ) throws Exception { - return createKeyedStateBackend( - env, - jobID, - operatorIdentifier, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - kvStateRegistry, - ttlTimeProvider, - new UnregisteredMetricsGroup(), - stateHandles, - new CloseableRegistry()); - } - /** * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b> * and checkpointing it. * * <p><i>Keyed State</i> is state where each value is bound to a key. * - * @param <K> The type of the keys by which the state is organized. + * @param env The environment of the task. + * @param jobID The ID of the job that the task belongs to. + * @param operatorIdentifier The identifier text of the operator. + * @param keySerializer The key-serializer for the operator. + * @param numberOfKeyGroups The number of key-groups aka max parallelism. + * @param keyGroupRange Range of key-groups for which the to-be-created backend is responsible. + * @param kvStateRegistry KvStateRegistry helper for this task. + * @param ttlTimeProvider Provider for TTL logic to judge about state expiration. + * @param metricGroup The parent metric group for all state backend metrics. + * @param stateHandles The state handles for restore. + * @param cancelStreamRegistry The registry to which created closeable objects will be registered during restore. + * @param <K> The type of the keys by which the state is organized. * * @return The Keyed State Backend for the given job, operator, and key group range. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java index 6424a7a..4075ad0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -31,6 +33,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.util.Collections; + import static org.junit.Assert.assertTrue; /** @@ -72,7 +76,10 @@ public class HeapKeyedStateBackendAsyncByDefaultTest { 1, new KeyGroupRange(0, 0), null, - TtlTimeProvider.DEFAULT + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry() ); assertTrue(keyedStateBackend.supportsAsynchronousSnapshots()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 90f9db1..3ba8876 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.execution.Environment; @@ -1001,7 +1002,11 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); return backend; } @@ -1034,9 +1039,11 @@ public abstract class StateBackendMigrationTestBase<B extends AbstractStateBacke keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry() - , TtlTimeProvider.DEFAULT, - state); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + state, + new CloseableRegistry()); return backend; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 0d0c7a4..08cc6ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -53,9 +53,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -172,14 +174,17 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten Environment env) throws Exception { AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend( - env, - new JobID(), - "test_op", - keySerializer, - numberOfKeyGroups, - keyGroupRange, - env.getTaskKvStateRegistry(), - TtlTimeProvider.DEFAULT); + env, + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); return backend; } @@ -216,7 +221,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten keyGroupRange, env.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, - state); + new UnregisteredMetricsGroup(), + state, + new CloseableRegistry()); return backend; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java index 3fdad0a..887dedf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -92,8 +94,17 @@ public abstract class StateBackendTestContext { try { disposeKeyedStateBackend(); keyedStateBackend = stateBackend.createKeyedStateBackend( - env, new JobID(), "test", StringSerializer.INSTANCE, numberOfKeyGroups, - new KeyGroupRange(0, numberOfKeyGroups - 1), env.getTaskKvStateRegistry(), timeProvider, stateHandles); + env, + new JobID(), + "test", + StringSerializer.INSTANCE, + numberOfKeyGroups, + new KeyGroupRange(0, numberOfKeyGroups - 1), + env.getTaskKvStateRegistry(), + timeProvider, + new UnregisteredMetricsGroup(), + stateHandles, + new CloseableRegistry()); } catch (Exception e) { throw new RuntimeException("unexpected", e); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index d0b69fa..90d91b5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -25,7 +25,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -56,6 +58,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.BackendForTestStream; import org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory; import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.util.BlockingCheckpointOutputStream; @@ -84,6 +87,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -396,7 +400,11 @@ public class RocksDBAsyncSnapshotTest extends TestLogger { VoidSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - null); + null, + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); try { // register a state so that the state backend has to checkpoint something diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java index 87e06b6..a0e78fc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java @@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Assert; import org.junit.Rule; @@ -38,6 +41,7 @@ import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyHandle; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.function.Function; @@ -90,7 +94,11 @@ public class RocksDBRocksStateKeysIteratorTest { keySerializer, maxKeyGroupNumber, new KeyGroupRange(0, maxKeyGroupNumber - 1), - mock(TaskKvStateRegistry.class)); + mock(TaskKvStateRegistry.class), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); try { ValueState<String> testState = keyedStateBackend.getPartitionedState( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8044a3a..c00ffdb 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -25,8 +25,10 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -38,6 +40,7 @@ import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.IOUtils; @@ -54,6 +57,7 @@ import org.rocksdb.util.SizeUnit; import java.io.File; import java.io.IOException; +import java.util.Collections; import static org.hamcrest.CoreMatchers.anyOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -276,14 +280,18 @@ public class RocksDBStateBackendConfigTest { Environment env = getMockEnvironment(dir1, dir2); RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. - createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry()); + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); try { File instanceBasePath = keyedBackend.getInstanceBasePath(); @@ -316,13 +324,17 @@ public class RocksDBStateBackendConfigTest { try { Environment env = getMockEnvironment(tempFolder.newFolder()); rocksDbBackend.createKeyedStateBackend( - env, - env.getJobID(), - "foobar", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); } catch (Exception e) { assertTrue(e.getMessage().contains("No local storage directories available")); @@ -363,7 +375,11 @@ public class RocksDBStateBackendConfigTest { IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); IOUtils.closeQuietly(keyedStateBackend); keyedStateBackend.dispose(); @@ -633,15 +649,18 @@ public class RocksDBStateBackendConfigTest { static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend( RocksDBStateBackend rocksDbBackend, Environment env) throws Exception { - return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. - createKeyedStateBackend( - env, - env.getJobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - env.getTaskKvStateRegistry()); + return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); } static Environment getMockEnvironment(File... tempDirs) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java index e04cedd..7b678c3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java @@ -38,7 +38,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.Path; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -52,6 +54,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Test; import org.mockito.Matchers; @@ -322,13 +325,17 @@ public class StreamingRuntimeContextTest { (ListStateDescriptor<String>) invocationOnMock.getArguments()[2]; AbstractKeyedStateBackend<Integer> backend = new MemoryStateBackend().createKeyedStateBackend( - new DummyEnvironment("test_task", 1, 0), - new JobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + new DummyEnvironment("test_task", 1, 0), + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); backend.setCurrentKey(0); return backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, descr); } @@ -359,13 +366,17 @@ public class StreamingRuntimeContextTest { (MapStateDescriptor<Integer, String>) invocationOnMock.getArguments()[2]; AbstractKeyedStateBackend<Integer> backend = new MemoryStateBackend().createKeyedStateBackend( - new DummyEnvironment("test_task", 1, 0), - new JobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + new DummyEnvironment("test_task", 1, 0), + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); backend.setCurrentKey(0); return backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, descr); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java index bc5bb1b..50abe75 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java @@ -28,7 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.KvStateRegistry; @@ -37,6 +39,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalMergingState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.KeyContext; import org.apache.flink.streaming.api.operators.TestInternalTimerService; @@ -48,6 +51,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; /** * Utility for testing {@link Trigger} behaviour. @@ -74,13 +78,18 @@ public class TriggerTestHarness<T, W extends Window> { MemoryStateBackend backend = new MemoryStateBackend(); @SuppressWarnings("unchecked") - HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv, - new JobID(), - "test_op", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + HeapKeyedStateBackend<Integer> stateBackend = (HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend( + dummyEnv, + new JobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), + TtlTimeProvider.DEFAULT, + new UnregisteredMetricsGroup(), + Collections.emptyList(), + new CloseableRegistry()); this.stateBackend = stateBackend; this.stateBackend.setCurrentKey(KEY);