This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f10a7d8  [FLINK-11833] [State Backends] Cleanup unnecessary 
createKeyedStateBackend methods in StateBackend
f10a7d8 is described below

commit f10a7d8b5da5d9836a8fb3b2f38e099152f1d75f
Author: Yu Li <l...@apache.org>
AuthorDate: Wed Mar 6 17:26:58 2019 +0800

    [FLINK-11833] [State Backends] Cleanup unnecessary createKeyedStateBackend 
methods in StateBackend
    
    This closes #7909.
---
 .../flink/queryablestate/network/ClientTest.java   |  21 ++--
 .../queryablestate/network/KvStateServerTest.java  |   9 +-
 .../apache/flink/runtime/state/StateBackend.java   | 119 +++------------------
 .../HeapKeyedStateBackendAsyncByDefaultTest.java   |   9 +-
 .../state/StateBackendMigrationTestBase.java       |  15 ++-
 .../flink/runtime/state/StateBackendTestBase.java  |  25 +++--
 .../runtime/state/ttl/StateBackendTestContext.java |  15 ++-
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  10 +-
 .../state/RocksDBRocksStateKeysIteratorTest.java   |  10 +-
 .../state/RocksDBStateBackendConfigTest.java       |  69 +++++++-----
 .../api/operators/StreamingRuntimeContextTest.java |  39 ++++---
 .../operators/windowing/TriggerTestHarness.java    |  23 ++--
 12 files changed, 185 insertions(+), 179 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index bceb361..85b29fa 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
@@ -40,6 +42,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -630,13 +633,17 @@ public class ClientTest {
                dummyEnv.setKvStateRegistry(dummyRegistry);
 
                AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
-                               dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               numKeyGroups,
-                               new KeyGroupRange(0, 0),
-                               dummyRegistry.createTaskRegistry(new JobID(), 
new JobVertexID()));
+                       dummyEnv,
+                       new JobID(),
+                       "test_op",
+                       IntSerializer.INSTANCE,
+                       numKeyGroups,
+                       new KeyGroupRange(0, 0),
+                       dummyRegistry.createTaskRegistry(new JobID(), new 
JobVertexID()),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
 
                final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
index 79c23ad..6aace48 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -39,6 +41,7 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -117,7 +120,11 @@ public class KvStateServerTest {
                                IntSerializer.INSTANCE,
                                numKeyGroups,
                                new KeyGroupRange(0, 0),
-                               registry.createTaskRegistry(jobId, new 
JobVertexID()));
+                               registry.createTaskRegistry(jobId, new 
JobVertexID()),
+                               TtlTimeProvider.DEFAULT,
+                               new UnregisteredMetricsGroup(),
+                               Collections.emptyList(),
+                               new CloseableRegistry());
 
                        final KvStateServerHandlerTest.TestRegistryListener 
registryListener =
                                        new 
KvStateServerHandlerTest.TestRegistryListener();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 0fccd60..bd3cee8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -32,7 +31,6 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 
 /**
  * A <b>State Backend</b> defines how the state of a streaming application is 
stored and
@@ -122,117 +120,24 @@ public interface StateBackend extends 
java.io.Serializable {
        // 
------------------------------------------------------------------------
        //  Structure Backends 
        // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
-        * and checkpointing it. Uses default TTL time provider.
-        *
-        * <p><i>Keyed State</i> is state where each value is bound to a key.
-        *
-        * @param <K> The type of the keys by which the state is organized.
-        *
-        * @return The Keyed State Backend for the given job, operator, and key 
group range.
-        *
-        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
-        */
-       default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-                       Environment env,
-                       JobID jobID,
-                       String operatorIdentifier,
-                       TypeSerializer<K> keySerializer,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws Exception {
-               return createKeyedStateBackend(
-                       env,
-                       jobID,
-                       operatorIdentifier,
-                       keySerializer,
-                       numberOfKeyGroups,
-                       keyGroupRange,
-                       kvStateRegistry,
-                       TtlTimeProvider.DEFAULT
-               );
-       }
-
-       /**
-        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
-        * and checkpointing it.
-        *
-        * <p><i>Keyed State</i> is state where each value is bound to a key.
-        *
-        * @param <K> The type of the keys by which the state is organized.
-        *
-        * @return The Keyed State Backend for the given job, operator, and key 
group range.
-        *
-        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
-        */
-       default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-               Environment env,
-               JobID jobID,
-               String operatorIdentifier,
-               TypeSerializer<K> keySerializer,
-               int numberOfKeyGroups,
-               KeyGroupRange keyGroupRange,
-               TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider
-       ) throws Exception {
-               return createKeyedStateBackend(
-                       env,
-                       jobID,
-                       operatorIdentifier,
-                       keySerializer,
-                       numberOfKeyGroups,
-                       keyGroupRange,
-                       kvStateRegistry,
-                       ttlTimeProvider,
-                       new UnregisteredMetricsGroup(),
-                       Collections.emptyList(),
-                       new CloseableRegistry());
-       }
-
-       /**
-        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
-        * and checkpointing it.
-        *
-        * <p><i>Keyed State</i> is state where each value is bound to a key.
-        *
-        * @param <K> The type of the keys by which the state is organized.
-        * @return The Keyed State Backend for the given job, operator, and key 
group range.
-        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
-        */
-       default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-               Environment env,
-               JobID jobID,
-               String operatorIdentifier,
-               TypeSerializer<K> keySerializer,
-               int numberOfKeyGroups,
-               KeyGroupRange keyGroupRange,
-               TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider,
-               Collection<KeyedStateHandle> stateHandles
-       ) throws Exception {
-               return createKeyedStateBackend(
-                       env,
-                       jobID,
-                       operatorIdentifier,
-                       keySerializer,
-                       numberOfKeyGroups,
-                       keyGroupRange,
-                       kvStateRegistry,
-                       ttlTimeProvider,
-                       new UnregisteredMetricsGroup(),
-                       stateHandles,
-                       new CloseableRegistry());
-       }
-
        /**
         * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
         * and checkpointing it.
         *
         * <p><i>Keyed State</i> is state where each value is bound to a key.
         *
-        * @param <K> The type of the keys by which the state is organized.
+        * @param env                  The environment of the task.
+        * @param jobID                The ID of the job that the task belongs 
to.
+        * @param operatorIdentifier   The identifier text of the operator.
+        * @param keySerializer        The key-serializer for the operator.
+        * @param numberOfKeyGroups    The number of key-groups aka max 
parallelism.
+        * @param keyGroupRange        Range of key-groups for which the 
to-be-created backend is responsible.
+        * @param kvStateRegistry      KvStateRegistry helper for this task.
+        * @param ttlTimeProvider      Provider for TTL logic to judge about 
state expiration.
+        * @param metricGroup          The parent metric group for all state 
backend metrics.
+        * @param stateHandles         The state handles for restore.
+        * @param cancelStreamRegistry The registry to which created closeable 
objects will be registered during restore.
+        * @param <K>                  The type of the keys by which the state 
is organized.
         *
         * @return The Keyed State Backend for the given job, operator, and key 
group range.
         *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
index 6424a7a..4075ad0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -31,6 +33,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Collections;
+
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -72,7 +76,10 @@ public class HeapKeyedStateBackendAsyncByDefaultTest {
                        1,
                        new KeyGroupRange(0, 0),
                        null,
-                       TtlTimeProvider.DEFAULT
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry()
                );
 
                assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 90f9db1..3ba8876 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
@@ -1001,7 +1002,11 @@ public abstract class StateBackendMigrationTestBase<B 
extends AbstractStateBacke
                        keySerializer,
                        numberOfKeyGroups,
                        keyGroupRange,
-                       env.getTaskKvStateRegistry());
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
                return backend;
        }
 
@@ -1034,9 +1039,11 @@ public abstract class StateBackendMigrationTestBase<B 
extends AbstractStateBacke
                        keySerializer,
                        numberOfKeyGroups,
                        keyGroupRange,
-                       env.getTaskKvStateRegistry()
-                       , TtlTimeProvider.DEFAULT,
-                       state);
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       state,
+                       new CloseableRegistry());
                return backend;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0d0c7a4..08cc6ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,9 +53,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -172,14 +174,17 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        Environment env) throws Exception {
 
                AbstractKeyedStateBackend<K> backend = 
getStateBackend().createKeyedStateBackend(
-                               env,
-                               new JobID(),
-                               "test_op",
-                               keySerializer,
-                               numberOfKeyGroups,
-                               keyGroupRange,
-                               env.getTaskKvStateRegistry(),
-                               TtlTimeProvider.DEFAULT);
+                       env,
+                       new JobID(),
+                       "test_op",
+                       keySerializer,
+                       numberOfKeyGroups,
+                       keyGroupRange,
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
 
                return backend;
        }
@@ -216,7 +221,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        keyGroupRange,
                        env.getTaskKvStateRegistry(),
                        TtlTimeProvider.DEFAULT,
-                       state);
+                       new UnregisteredMetricsGroup(),
+                       state,
+                       new CloseableRegistry());
 
                return backend;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index 3fdad0a..887dedf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -92,8 +94,17 @@ public abstract class StateBackendTestContext {
                try {
                        disposeKeyedStateBackend();
                        keyedStateBackend = 
stateBackend.createKeyedStateBackend(
-                               env, new JobID(), "test", 
StringSerializer.INSTANCE, numberOfKeyGroups,
-                               new KeyGroupRange(0, numberOfKeyGroups - 1), 
env.getTaskKvStateRegistry(), timeProvider, stateHandles);
+                               env,
+                               new JobID(),
+                               "test",
+                               StringSerializer.INSTANCE,
+                               numberOfKeyGroups,
+                               new KeyGroupRange(0, numberOfKeyGroups - 1),
+                               env.getTaskKvStateRegistry(),
+                               timeProvider,
+                               new UnregisteredMetricsGroup(),
+                               stateHandles,
+                               new CloseableRegistry());
                } catch (Exception e) {
                        throw new RuntimeException("unexpected", e);
                }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d0b69fa..90d91b5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -56,6 +58,7 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.BackendForTestStream;
 import 
org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory;
 import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
@@ -84,6 +87,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -396,7 +400,11 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
                        VoidSerializer.INSTANCE,
                        1,
                        new KeyGroupRange(0, 0),
-                       null);
+                       null,
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
 
                try {
                        // register a state so that the state backend has to 
checkpoint something
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
index 87e06b6..a0e78fc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksStateKeysIteratorTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -38,6 +41,7 @@ import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyHandle;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.function.Function;
@@ -90,7 +94,11 @@ public class RocksDBRocksStateKeysIteratorTest {
                        keySerializer,
                        maxKeyGroupNumber,
                        new KeyGroupRange(0, maxKeyGroupNumber - 1),
-                       mock(TaskKvStateRegistry.class));
+                       mock(TaskKvStateRegistry.class),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
 
                try {
                        ValueState<String> testState = 
keyedStateBackend.getPartitionedState(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 8044a3a..c00ffdb 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -25,8 +25,10 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -38,6 +40,7 @@ import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.IOUtils;
@@ -54,6 +57,7 @@ import org.rocksdb.util.SizeUnit;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -276,14 +280,18 @@ public class RocksDBStateBackendConfigTest {
 
                Environment env = getMockEnvironment(dir1, dir2);
                RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-                               createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               env.getTaskKvStateRegistry());
+                       createKeyedStateBackend(
+                               env,
+                               env.getJobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               1,
+                               new KeyGroupRange(0, 0),
+                               env.getTaskKvStateRegistry(),
+                               TtlTimeProvider.DEFAULT,
+                               new UnregisteredMetricsGroup(),
+                               Collections.emptyList(),
+                               new CloseableRegistry());
 
                try {
                        File instanceBasePath = 
keyedBackend.getInstanceBasePath();
@@ -316,13 +324,17 @@ public class RocksDBStateBackendConfigTest {
                        try {
                                Environment env = 
getMockEnvironment(tempFolder.newFolder());
                                rocksDbBackend.createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "foobar",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+                                       env,
+                                       env.getJobID(),
+                                       "foobar",
+                                       IntSerializer.INSTANCE,
+                                       1,
+                                       new KeyGroupRange(0, 0),
+                                       new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()),
+                                       TtlTimeProvider.DEFAULT,
+                                       new UnregisteredMetricsGroup(),
+                                       Collections.emptyList(),
+                                       new CloseableRegistry());
                        }
                        catch (Exception e) {
                                assertTrue(e.getMessage().contains("No local 
storage directories available"));
@@ -363,7 +375,11 @@ public class RocksDBStateBackendConfigTest {
                                        IntSerializer.INSTANCE,
                                        1,
                                        new KeyGroupRange(0, 0),
-                                       new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+                                       new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()),
+                                       TtlTimeProvider.DEFAULT,
+                                       new UnregisteredMetricsGroup(),
+                                       Collections.emptyList(),
+                                       new CloseableRegistry());
 
                                IOUtils.closeQuietly(keyedStateBackend);
                                keyedStateBackend.dispose();
@@ -633,15 +649,18 @@ public class RocksDBStateBackendConfigTest {
        static RocksDBKeyedStateBackend<Integer> createKeyedStateBackend(
                        RocksDBStateBackend rocksDbBackend, Environment env) 
throws Exception {
 
-               return (RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
-                               createKeyedStateBackend(
-                                               env,
-                                               env.getJobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               env.getTaskKvStateRegistry());
+               return (RocksDBKeyedStateBackend<Integer>) 
rocksDbBackend.createKeyedStateBackend(
+                       env,
+                       env.getJobID(),
+                       "test_op",
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       env.getTaskKvStateRegistry(),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
        }
 
        static Environment getMockEnvironment(File... tempDirs) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
index e04cedd..7b678c3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java
@@ -38,7 +38,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -52,6 +54,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -322,13 +325,17 @@ public class StreamingRuntimeContextTest {
                                                (ListStateDescriptor<String>) 
invocationOnMock.getArguments()[2];
 
                                AbstractKeyedStateBackend<Integer> backend = 
new MemoryStateBackend().createKeyedStateBackend(
-                                               new 
DummyEnvironment("test_task", 1, 0),
-                                               new JobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+                                       new DummyEnvironment("test_task", 1, 0),
+                                       new JobID(),
+                                       "test_op",
+                                       IntSerializer.INSTANCE,
+                                       1,
+                                       new KeyGroupRange(0, 0),
+                                       new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
+                                       TtlTimeProvider.DEFAULT,
+                                       new UnregisteredMetricsGroup(),
+                                       Collections.emptyList(),
+                                       new CloseableRegistry());
                                backend.setCurrentKey(0);
                                return 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, descr);
                        }
@@ -359,13 +366,17 @@ public class StreamingRuntimeContextTest {
                                                (MapStateDescriptor<Integer, 
String>) invocationOnMock.getArguments()[2];
 
                                AbstractKeyedStateBackend<Integer> backend = 
new MemoryStateBackend().createKeyedStateBackend(
-                                               new 
DummyEnvironment("test_task", 1, 0),
-                                               new JobID(),
-                                               "test_op",
-                                               IntSerializer.INSTANCE,
-                                               1,
-                                               new KeyGroupRange(0, 0),
-                                               new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+                                       new DummyEnvironment("test_task", 1, 0),
+                                       new JobID(),
+                                       "test_op",
+                                       IntSerializer.INSTANCE,
+                                       1,
+                                       new KeyGroupRange(0, 0),
+                                       new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
+                                       TtlTimeProvider.DEFAULT,
+                                       new UnregisteredMetricsGroup(),
+                                       Collections.emptyList(),
+                                       new CloseableRegistry());
                                backend.setCurrentKey(0);
                                return 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, descr);
                        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
index bc5bb1b..50abe75 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TriggerTestHarness.java
@@ -28,7 +28,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -37,6 +39,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalMergingState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.TestInternalTimerService;
@@ -48,6 +51,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Utility for testing {@link Trigger} behaviour.
@@ -74,13 +78,18 @@ public class TriggerTestHarness<T, W extends Window> {
                MemoryStateBackend backend = new MemoryStateBackend();
 
                @SuppressWarnings("unchecked")
-               HeapKeyedStateBackend<Integer> stateBackend = 
(HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(dummyEnv,
-                               new JobID(),
-                               "test_op",
-                               IntSerializer.INSTANCE,
-                               1,
-                               new KeyGroupRange(0, 0),
-                               new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+               HeapKeyedStateBackend<Integer> stateBackend = 
(HeapKeyedStateBackend<Integer>) backend.createKeyedStateBackend(
+                       dummyEnv,
+                       new JobID(),
+                       "test_op",
+                       IntSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       new KvStateRegistry().createTaskRegistry(new JobID(), 
new JobVertexID()),
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup(),
+                       Collections.emptyList(),
+                       new CloseableRegistry());
                this.stateBackend = stateBackend;
 
                this.stateBackend.setCurrentKey(KEY);

Reply via email to