Repository: flink
Updated Branches:
  refs/heads/release-1.5 a8ef87510 -> c75c15298


[FLINK-9373][statebackend] Introduce RocksIteratorWrapper to wrap `Seek(), 
Next(), SeekToFirst(), SeekToLast(), SeekForPrev(), and Prev()` to check the 
iterator status.

This closes #6020.

(cherry picked from commit 105b306)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c75c1529
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c75c1529
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c75c1529

Branch: refs/heads/release-1.5
Commit: c75c152984983cad05c609dfbd99c392f9fbcec0
Parents: a8ef875
Author: sihuazhou <summerle...@163.com>
Authored: Wed May 16 09:47:05 2018 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 15:16:09 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  68 ++++++---
 .../streaming/state/RocksDBMapState.java        |   3 +-
 .../streaming/state/RocksIteratorWrapper.java   | 103 +++++++++++++
 .../state/RocksDBMergeIteratorTest.java         |   8 +-
 .../RocksDBRocksIteratorForKeysWrapperTest.java | 152 ++++++++++++++++++
 .../state/RocksDBRocksIteratorWrapperTest.java  | 153 -------------------
 .../state/benchmark/RocksDBPerformanceTest.java |   4 +-
 7 files changed, 304 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 0ec2ef0..0de16c2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -99,7 +99,6 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.Snapshot;
 import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
@@ -322,7 +321,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
                }
 
-               RocksIterator iterator = db.newIterator(columnInfo.f0);
+               RocksIteratorWrapper iterator = getRocksIterator(db, 
columnInfo.f0);
                iterator.seekToFirst();
 
                final RocksIteratorForKeysWrapper<K> iteratorWrapper = new 
RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, 
keyGroupPrefixBytes,
@@ -734,7 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle);
                                } else {
                                        throw new 
IllegalStateException("Unexpected state handle type, " +
-                                               "expected " + 
IncrementalKeyedStateHandle.class +
+                                               "expected " + 
IncrementalKeyedStateHandle.class + " or " + 
IncrementalLocalKeyedStateHandle.class +
                                                ", but found " + 
rawStateHandle.getClass());
                                }
                        }
@@ -1079,7 +1078,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                                ColumnFamilyHandle 
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
 
-                                               try (RocksIterator iterator = 
restoreDb.newIterator(columnFamilyHandle)) {
+                                               try (RocksIteratorWrapper 
iterator = getRocksIterator(restoreDb, columnFamilyHandle)) {
 
                                                        int startKeyGroup = 
stateBackend.getKeyGroupRange().getStartKeyGroup();
                                                        byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
@@ -1309,7 +1308,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                int count = 0;
 
                for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column : 
kvStateInformation.values()) {
-                       try (RocksIterator rocksIterator = 
db.newIterator(column.f0)) {
+                       try (RocksIteratorWrapper rocksIterator = 
getRocksIterator(db, column.f0)) {
                                rocksIterator.seekToFirst();
 
                                while (rocksIterator.isValid()) {
@@ -1357,7 +1356,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
 
-               RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
+               RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, 
Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws 
RocksDBException {
                        Preconditions.checkNotNull(kvStateIterators);
                        Preconditions.checkArgument(keyGroupPrefixByteCount >= 
1);
 
@@ -1369,8 +1368,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
                                        new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
-                               for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
-                                       final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
+                               for (Tuple2<RocksIteratorWrapper, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
+                                       final RocksIteratorWrapper 
rocksIterator = rocksIteratorWithKVStateId.f0;
                                        rocksIterator.seekToFirst();
                                        if (rocksIterator.isValid()) {
                                                iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
@@ -1398,15 +1397,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
                 * calls to {@link #next()}.
                 */
-               public void next() {
+               public void next() throws RocksDBException {
                        newKeyGroup = false;
                        newKVState = false;
 
-                       final RocksIterator rocksIterator = 
currentSubIterator.getIterator();
+                       final RocksIteratorWrapper rocksIterator = 
currentSubIterator.getIterator();
                        rocksIterator.next();
 
                        byte[] oldKey = currentSubIterator.getCurrentKey();
                        if (rocksIterator.isValid()) {
+
                                currentSubIterator.currentKey = 
rocksIterator.key();
 
                                if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
@@ -1523,13 +1523,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @param iterator  The #RocksIterator to wrap .
                 * @param kvStateId Id of the K/V state to which this iterator 
belongs.
                 */
-               MergeIterator(RocksIterator iterator, int kvStateId) {
+               MergeIterator(RocksIteratorWrapper iterator, int kvStateId) {
                        this.iterator = Preconditions.checkNotNull(iterator);
                        this.currentKey = iterator.key();
                        this.kvStateId = kvStateId;
                }
 
-               private final RocksIterator iterator;
+               private final RocksIteratorWrapper iterator;
                private byte[] currentKey;
                private final int kvStateId;
 
@@ -1541,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        this.currentKey = currentKey;
                }
 
-               public RocksIterator getIterator() {
+               public RocksIteratorWrapper getIterator() {
                        return iterator;
                }
 
@@ -1556,13 +1556,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        /**
-        * Adapter class to bridge between {@link RocksIterator} and {@link 
Iterator} to iterate over the keys. This class
+        * Adapter class to bridge between {@link RocksIteratorWrapper} and 
{@link Iterator} to iterate over the keys. This class
         * is not thread safe.
         *
         * @param <K> the type of the iterated objects, which are keys in 
RocksDB.
         */
        static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, 
AutoCloseable {
-               private final RocksIterator iterator;
+               private final RocksIteratorWrapper iterator;
                private final String state;
                private final TypeSerializer<K> keySerializer;
                private final int keyGroupPrefixBytes;
@@ -1571,7 +1571,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private K nextKey;
 
                RocksIteratorForKeysWrapper(
-                       RocksIterator iterator,
+                       RocksIteratorWrapper iterator,
                        String state,
                        TypeSerializer<K> keySerializer,
                        int keyGroupPrefixBytes,
@@ -1588,8 +1588,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                @Override
                public boolean hasNext() {
-                       while (nextKey == null && iterator.isValid()) {
-                               try {
+                       try {
+                               while (nextKey == null && iterator.isValid()) {
+
                                        byte[] key = iterator.key();
                                        if (isMatchingNameSpace(key)) {
                                                ByteArrayInputStreamWithPos 
inputStream =
@@ -1603,9 +1604,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                nextKey = value;
                                        }
                                        iterator.next();
-                               } catch (IOException e) {
-                                       throw new FlinkRuntimeException("Failed 
to access state [" + state + "]", e);
                                }
+                       } catch (Exception e) {
+                               throw new FlinkRuntimeException("Failed to 
access state [" + state + "]", e);
                        }
                        return nextKey != null;
                }
@@ -1869,7 +1870,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 */
                private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
 
-               private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
+               private List<Tuple2<RocksIteratorWrapper, Integer>> 
kvStateIterators;
 
                private CheckpointStreamWithResultProvider 
checkpointStreamWithResultProvider;
                private DataOutputView outputView;
@@ -1928,7 +1929,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 *
                 * @throws IOException
                 */
-               public void writeDBSnapshot() throws IOException, 
InterruptedException {
+               public void writeDBSnapshot() throws IOException, 
InterruptedException, RocksDBException {
 
                        if (null == snapshot) {
                                throw new IOException("No snapshot available. 
Might be released due to cancellation.");
@@ -1966,7 +1967,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        checkpointStreamWithResultProvider = null;
 
                        if (null != kvStateIterators) {
-                               for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
+                               for (Tuple2<RocksIteratorWrapper, Integer> 
kvStateIterator : kvStateIterators) {
                                        
IOUtils.closeQuietly(kvStateIterator.f0);
                                }
                                kvStateIterators = null;
@@ -2001,7 +2002,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        for (ColumnFamilyHandle columnFamilyHandle : 
copiedColumnFamilyHandles) {
 
                                kvStateIterators.add(
-                                       new 
Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), 
kvStateId));
+                                       new 
Tuple2<>(getRocksIterator(stateBackend.db, columnFamilyHandle, readOptions), 
kvStateId));
 
                                ++kvStateId;
                        }
@@ -2019,7 +2020,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        serializationProxy.write(outputView);
                }
 
-               private void writeKVStateData() throws IOException, 
InterruptedException {
+               private void writeKVStateData() throws IOException, 
InterruptedException, RocksDBException {
 
                        byte[] previousKey = null;
                        byte[] previousValue = null;
@@ -2519,4 +2520,21 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
        }
+
+       public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
+               return new RocksIteratorWrapper(db.newIterator());
+       }
+
+       public static RocksIteratorWrapper getRocksIterator(
+               RocksDB db,
+               ColumnFamilyHandle columnFamilyHandle) {
+               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
+       }
+
+       public static RocksIteratorWrapper getRocksIterator(
+               RocksDB db,
+               ColumnFamilyHandle columnFamilyHandle,
+               ReadOptions readOptions) {
+               return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 219f3ae..b84b7a2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -549,7 +548,7 @@ public class RocksDBMapState<K, N, UK, UV>
 
                        // use try-with-resources to ensure RocksIterator can 
be release even some runtime exception
                        // occurred in the below code block.
-                       try (RocksIterator iterator = 
db.newIterator(columnFamily)) {
+                       try (RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
                                /*
                                 * The iteration starts from the prefix bytes 
at the first loading. The cache then is

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
new file mode 100644
index 0000000..f98e3f5
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksIteratorInterface;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+
+/**
+ * This is a wrapper around {@link RocksIterator} to check the iterator status 
for all the methods mentioned
+ * to require this check in the wiki documentation: seek, next, seekToFirst, 
seekToLast, seekForPrev, and prev.
+ * This is required because the iterator may pass the blocks or files it had 
difficulties in reading (because
+ * of IO error, data corruption or other issues) and continue with the next 
available keys. The status flag may not be
+ * OK, even if the iterator is valid. More information can be found
+ * <a 
href="https://github.com/facebook/rocksdb/wiki/Iterator#error-handling";>here</a>.
+ */
+public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable 
{
+
+       private RocksIterator iterator;
+
+       public RocksIteratorWrapper(@Nonnull RocksIterator iterator) {
+               this.iterator = iterator;
+       }
+
+       @Override
+       public boolean isValid() {
+               return this.iterator.isValid();
+       }
+
+       @Override
+       public void seekToFirst() {
+               iterator.seekToFirst();
+               status();
+       }
+
+       @Override
+       public void seekToLast() {
+               iterator.seekToFirst();
+               status();
+       }
+
+       @Override
+       public void seek(byte[] target) {
+               iterator.seek(target);
+               status();
+       }
+
+       @Override
+       public void next() {
+               iterator.next();
+               status();
+       }
+
+       @Override
+       public void prev() {
+               iterator.prev();
+               status();
+       }
+
+       @Override
+       public void status() {
+               try {
+                       iterator.status();
+               } catch (RocksDBException ex) {
+                       throw new FlinkRuntimeException("Internal exception 
found in RocksDB", ex);
+               }
+       }
+
+       public byte[] key() {
+               return iterator.key();
+       }
+
+       public byte[] value() {
+               return iterator.value();
+       }
+
+       @Override
+       public void close() {
+               iterator.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
index 19f49f8..cb2b202 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -30,10 +30,8 @@ import org.junit.rules.TemporaryFolder;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
 
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -52,7 +50,7 @@ public class RocksDBMergeIteratorTest {
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
        @Test
-       public void testEmptyMergeIterator() throws IOException {
+       public void testEmptyMergeIterator() throws Exception {
                RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
                                new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
                Assert.assertFalse(emptyIterator.isValid());
@@ -76,7 +74,7 @@ public class RocksDBMergeIteratorTest {
                Random random = new Random(1234);
 
                try (RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
-                       List<Tuple2<RocksIterator, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
+                       List<Tuple2<RocksIteratorWrapper, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
                        List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
 
                        int totalKeysExpected = 0;
@@ -109,7 +107,7 @@ public class RocksDBMergeIteratorTest {
 
                        int id = 0;
                        for (Tuple2<ColumnFamilyHandle, Integer> 
columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id));
+                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(RocksDBKeyedStateBackend.getRocksIterator(rocksDB, 
columnFamilyHandle.f0), id));
                                ++id;
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
new file mode 100644
index 0000000..f560998
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorForKeysWrapperTest {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @Test
+       public void testIterator() throws Exception{
+
+               // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+               testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+               // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+               testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+               // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+               testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+               // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+               testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+       }
+
+       <K> void testIteratorHelper(
+               TypeSerializer<K> keySerializer,
+               TypeSerializer namespaceSerializer,
+               int maxKeyGroupNumber,
+               Function<Integer, K> getKeyFunc) throws Exception {
+
+               String testStateName = "aha";
+               String namespace = "ns";
+
+               String dbPath = tmp.newFolder().getAbsolutePath();
+               String checkpointPath = tmp.newFolder().toURI().toString();
+               RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+               backend.setDbStoragePath(dbPath);
+
+               Environment env = new DummyEnvironment("TestTask", 1, 0);
+               RocksDBKeyedStateBackend<K> keyedStateBackend = 
(RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
+                       env,
+                       new JobID(),
+                       "Test",
+                       keySerializer,
+                       maxKeyGroupNumber,
+                       new KeyGroupRange(0, maxKeyGroupNumber - 1),
+                       mock(TaskKvStateRegistry.class));
+
+               try {
+                       keyedStateBackend.restore(null);
+                       ValueState<String> testState = 
keyedStateBackend.getPartitionedState(
+                               namespace,
+                               namespaceSerializer,
+                               new ValueStateDescriptor<String>(testStateName, 
String.class));
+
+                       // insert record
+                       for (int i = 0; i < 1000; ++i) {
+                               
keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
+                               testState.update(String.valueOf(i));
+                       }
+
+                       ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(8);
+                       boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+                       RocksDBKeySerializationUtils.writeNameSpace(
+                               namespace,
+                               namespaceSerializer,
+                               outputStream,
+                               new DataOutputViewStreamWrapper(outputStream),
+                               ambiguousKeyPossible);
+
+                       byte[] nameSpaceBytes = outputStream.toByteArray();
+
+                       try (
+                               ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
+                               RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle);
+                               
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
+                                       new 
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
+                                               iterator,
+                                               testStateName,
+                                               keySerializer,
+                                               
keyedStateBackend.getKeyGroupPrefixBytes(),
+                                               ambiguousKeyPossible,
+                                               nameSpaceBytes)) {
+
+                               iterator.seekToFirst();
+
+                               // valid record
+                               List<Integer> fetchedKeys = new 
ArrayList<>(1000);
+                               while (iteratorWrapper.hasNext()) {
+                                       
fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
+                               }
+
+                               fetchedKeys.sort(Comparator.comparingInt(a -> 
a));
+                               Assert.assertEquals(1000, fetchedKeys.size());
+
+                               for (int i = 0; i < 1000; ++i) {
+                                       Assert.assertEquals(i, 
fetchedKeys.get(i).intValue());
+                               }
+                       }
+               } finally {
+                       if (keyedStateBackend != null) {
+                               keyedStateBackend.dispose();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
deleted file mode 100644
index 0cdda4b..0000000
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksIterator;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the RocksIteratorWrapper.
- */
-public class RocksDBRocksIteratorWrapperTest {
-
-       @Rule
-       public final TemporaryFolder tmp = new TemporaryFolder();
-
-       @Test
-       public void testIterator() throws Exception{
-
-               // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
-               testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
-
-               // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
-               testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
-
-               // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
-               testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
-
-               // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
-               testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
-       }
-
-       <K> void testIteratorHelper(
-               TypeSerializer<K> keySerializer,
-               TypeSerializer namespaceSerializer,
-               int maxKeyGroupNumber,
-               Function<Integer, K> getKeyFunc) throws Exception {
-
-               String testStateName = "aha";
-               String namespace = "ns";
-
-               String dbPath = tmp.newFolder().getAbsolutePath();
-               String checkpointPath = tmp.newFolder().toURI().toString();
-               RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
-               backend.setDbStoragePath(dbPath);
-
-               Environment env = new DummyEnvironment("TestTask", 1, 0);
-               RocksDBKeyedStateBackend<K> keyedStateBackend = 
(RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend(
-                       env,
-                       new JobID(),
-                       "Test",
-                       keySerializer,
-                       maxKeyGroupNumber,
-                       new KeyGroupRange(0, maxKeyGroupNumber - 1),
-                       mock(TaskKvStateRegistry.class));
-
-               try {
-                       keyedStateBackend.restore(null);
-                       ValueState<String> testState = 
keyedStateBackend.getPartitionedState(
-                               namespace,
-                               namespaceSerializer,
-                               new ValueStateDescriptor<String>(testStateName, 
String.class));
-
-                       // insert record
-                       for (int i = 0; i < 1000; ++i) {
-                               
keyedStateBackend.setCurrentKey(getKeyFunc.apply(i));
-                               testState.update(String.valueOf(i));
-                       }
-
-                       ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(8);
-                       boolean ambiguousKeyPossible = 
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
-                       RocksDBKeySerializationUtils.writeNameSpace(
-                               namespace,
-                               namespaceSerializer,
-                               outputStream,
-                               new DataOutputViewStreamWrapper(outputStream),
-                               ambiguousKeyPossible);
-
-                       byte[] nameSpaceBytes = outputStream.toByteArray();
-
-                       try (
-                               ColumnFamilyHandle handle = 
keyedStateBackend.getColumnFamilyHandle(testStateName);
-                               RocksIterator iterator = 
keyedStateBackend.db.newIterator(handle);
-                               
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper =
-                                       new 
RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>(
-                                               iterator,
-                                               testStateName,
-                                               keySerializer,
-                                               
keyedStateBackend.getKeyGroupPrefixBytes(),
-                                               ambiguousKeyPossible,
-                                               nameSpaceBytes)) {
-
-                               iterator.seekToFirst();
-
-                               // valid record
-                               List<Integer> fetchedKeys = new 
ArrayList<>(1000);
-                               while (iteratorWrapper.hasNext()) {
-                                       
fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString()));
-                               }
-
-                               fetchedKeys.sort(Comparator.comparingInt(a -> 
a));
-                               Assert.assertEquals(1000, fetchedKeys.size());
-
-                               for (int i = 0; i < 1000; ++i) {
-                                       Assert.assertEquals(i, 
fetchedKeys.get(i).intValue());
-                               }
-                       }
-               } finally {
-                       if (keyedStateBackend != null) {
-                               keyedStateBackend.dispose();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
index e05e7ae..8724615 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.contrib.streaming.state.benchmark;
 
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
 import org.apache.flink.core.memory.MemoryUtils;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
@@ -33,7 +34,6 @@ import org.rocksdb.CompactionStyle;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
 import org.rocksdb.WriteOptions;
 import sun.misc.Unsafe;
 
@@ -166,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger {
 
                        int pos = 0;
 
-                       try (final RocksIterator iterator = 
rocksDB.newIterator()) {
+                       try (final RocksIteratorWrapper iterator = 
RocksDBKeyedStateBackend.getRocksIterator(rocksDB)) {
                                // seek to start
                                unsafe.putInt(keyTemplate, offset, 0);
                                iterator.seek(keyTemplate);

Reply via email to