http://git-wip-us.apache.org/repos/asf/flink/blob/1619fa8a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/IncrementalRocksStreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/IncrementalRocksStreamOperatorSnapshotRestoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/IncrementalRocksStreamOperatorSnapshotRestoreTest.java
new file mode 100644
index 0000000..3753633
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/IncrementalRocksStreamOperatorSnapshotRestoreTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import 
org.apache.flink.streaming.api.operators.StreamOperatorSnapshotRestoreTest;
+
+import java.io.IOException;
+
+/**
+ * Test snapshot/restore of stream operators for RocksDB (incremental 
snapshots).
+ */
+public class IncrementalRocksStreamOperatorSnapshotRestoreTest extends 
StreamOperatorSnapshotRestoreTest {
+
+       @Override
+       protected StateBackend createStateBackend() throws IOException {
+               FsStateBackend stateBackend = createStateBackendInternal();
+               return new RocksDBStateBackend(stateBackend, true);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1619fa8a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
index 98f937a..0cdda4b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
@@ -120,13 +120,14 @@ public class RocksDBRocksIteratorWrapperTest {
                        try (
                                ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
                                RocksIterator iterator = 
keyedStateBackend.db.newIterator(handle);
-                               
RocksDBKeyedStateBackend.RocksIteratorWrapper<K> iteratorWrapper = new 
RocksDBKeyedStateBackend.RocksIteratorWrapper(
-                               iterator,
-                               testStateName,
-                               keySerializer,
-                               keyedStateBackend.getKeyGroupPrefixBytes(),
-                               ambiguousKeyPossible,
-                               nameSpaceBytes)) {
+                               
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
+                                       new 
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+                                               iterator,
+                                               testStateName,
+                                               keySerializer,
+                                               
keyedStateBackend.getKeyGroupPrefixBytes(),
+                                               ambiguousKeyPossible,
+                                               nameSpaceBytes)) {
 
                                iterator.seekToFirst();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1619fa8a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksStreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksStreamOperatorSnapshotRestoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksStreamOperatorSnapshotRestoreTest.java
new file mode 100644
index 0000000..0ff3211
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksStreamOperatorSnapshotRestoreTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import 
org.apache.flink.streaming.api.operators.StreamOperatorSnapshotRestoreTest;
+
+import java.io.IOException;
+
+/**
+ * Test snapshot/restore of stream operators for RocksDB (full snapshots).
+ */
+public class RocksStreamOperatorSnapshotRestoreTest extends 
StreamOperatorSnapshotRestoreTest {
+
+       @Override
+       protected StateBackend createStateBackend() throws IOException {
+               FsStateBackend stateBackend = createStateBackendInternal();
+               return new RocksDBStateBackend(stateBackend, false);
+       }
+}

Reply via email to