Repository: flink Updated Branches: refs/heads/release-1.5 a8ef87510 -> c75c15298
[FLINK-9373][statebackend] Introduce RocksIteratorWrapper to wrap `Seek(), Next(), SeekToFirst(), SeekToLast(), SeekForPrev(), and Prev()` to check the iterator status. This closes #6020. (cherry picked from commit 105b306) Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c75c1529 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c75c1529 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c75c1529 Branch: refs/heads/release-1.5 Commit: c75c152984983cad05c609dfbd99c392f9fbcec0 Parents: a8ef875 Author: sihuazhou <summerle...@163.com> Authored: Wed May 16 09:47:05 2018 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu May 17 15:16:09 2018 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 68 ++++++--- .../streaming/state/RocksDBMapState.java | 3 +- .../streaming/state/RocksIteratorWrapper.java | 103 +++++++++++++ .../state/RocksDBMergeIteratorTest.java | 8 +- .../RocksDBRocksIteratorForKeysWrapperTest.java | 152 ++++++++++++++++++ .../state/RocksDBRocksIteratorWrapperTest.java | 153 ------------------- .../state/benchmark/RocksDBPerformanceTest.java | 4 +- 7 files changed, 304 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 0ec2ef0..0de16c2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -99,7 +99,6 @@ import org.rocksdb.DBOptions; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.rocksdb.Snapshot; import org.rocksdb.WriteOptions; import org.slf4j.Logger; @@ -322,7 +321,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); } - RocksIterator iterator = db.newIterator(columnInfo.f0); + RocksIteratorWrapper iterator = getRocksIterator(db, columnInfo.f0); iterator.seekToFirst(); final RocksIteratorForKeysWrapper<K> iteratorWrapper = new RocksIteratorForKeysWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, @@ -734,7 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { restoreInstance((IncrementalLocalKeyedStateHandle) rawStateHandle); } else { throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + + "expected " + IncrementalKeyedStateHandle.class + " or " + IncrementalLocalKeyedStateHandle.class + ", but found " + rawStateHandle.getClass()); } } @@ -1079,7 +1078,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; - try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { + try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb, columnFamilyHandle)) { int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; @@ -1309,7 +1308,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int count = 0; for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) { - try (RocksIterator rocksIterator = db.newIterator(column.f0)) { + try (RocksIteratorWrapper rocksIterator = getRocksIterator(db, column.f0)) { rocksIterator.seekToFirst(); while (rocksIterator.isValid()) { @@ -1357,7 +1356,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } - RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) { + RocksDBMergeIterator(List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) throws RocksDBException { Preconditions.checkNotNull(kvStateIterators); Preconditions.checkArgument(keyGroupPrefixByteCount >= 1); @@ -1369,8 +1368,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { PriorityQueue<MergeIterator> iteratorPriorityQueue = new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); - for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) { - final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; + for (Tuple2<RocksIteratorWrapper, Integer> rocksIteratorWithKVStateId : kvStateIterators) { + final RocksIteratorWrapper rocksIterator = rocksIteratorWithKVStateId.f0; rocksIterator.seekToFirst(); if (rocksIterator.isValid()) { iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); @@ -1398,15 +1397,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after * calls to {@link #next()}. */ - public void next() { + public void next() throws RocksDBException { newKeyGroup = false; newKVState = false; - final RocksIterator rocksIterator = currentSubIterator.getIterator(); + final RocksIteratorWrapper rocksIterator = currentSubIterator.getIterator(); rocksIterator.next(); byte[] oldKey = currentSubIterator.getCurrentKey(); if (rocksIterator.isValid()) { + currentSubIterator.currentKey = rocksIterator.key(); if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { @@ -1523,13 +1523,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @param iterator The #RocksIterator to wrap . * @param kvStateId Id of the K/V state to which this iterator belongs. */ - MergeIterator(RocksIterator iterator, int kvStateId) { + MergeIterator(RocksIteratorWrapper iterator, int kvStateId) { this.iterator = Preconditions.checkNotNull(iterator); this.currentKey = iterator.key(); this.kvStateId = kvStateId; } - private final RocksIterator iterator; + private final RocksIteratorWrapper iterator; private byte[] currentKey; private final int kvStateId; @@ -1541,7 +1541,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.currentKey = currentKey; } - public RocksIterator getIterator() { + public RocksIteratorWrapper getIterator() { return iterator; } @@ -1556,13 +1556,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } /** - * Adapter class to bridge between {@link RocksIterator} and {@link Iterator} to iterate over the keys. This class + * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link Iterator} to iterate over the keys. This class * is not thread safe. * * @param <K> the type of the iterated objects, which are keys in RocksDB. */ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseable { - private final RocksIterator iterator; + private final RocksIteratorWrapper iterator; private final String state; private final TypeSerializer<K> keySerializer; private final int keyGroupPrefixBytes; @@ -1571,7 +1571,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private K nextKey; RocksIteratorForKeysWrapper( - RocksIterator iterator, + RocksIteratorWrapper iterator, String state, TypeSerializer<K> keySerializer, int keyGroupPrefixBytes, @@ -1588,8 +1588,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override public boolean hasNext() { - while (nextKey == null && iterator.isValid()) { - try { + try { + while (nextKey == null && iterator.isValid()) { + byte[] key = iterator.key(); if (isMatchingNameSpace(key)) { ByteArrayInputStreamWithPos inputStream = @@ -1603,9 +1604,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { nextKey = value; } iterator.next(); - } catch (IOException e) { - throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); } + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); } return nextKey != null; } @@ -1869,7 +1870,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { */ private List<ColumnFamilyHandle> copiedColumnFamilyHandles; - private List<Tuple2<RocksIterator, Integer>> kvStateIterators; + private List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators; private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider; private DataOutputView outputView; @@ -1928,7 +1929,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * * @throws IOException */ - public void writeDBSnapshot() throws IOException, InterruptedException { + public void writeDBSnapshot() throws IOException, InterruptedException, RocksDBException { if (null == snapshot) { throw new IOException("No snapshot available. Might be released due to cancellation."); @@ -1966,7 +1967,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { checkpointStreamWithResultProvider = null; if (null != kvStateIterators) { - for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) { + for (Tuple2<RocksIteratorWrapper, Integer> kvStateIterator : kvStateIterators) { IOUtils.closeQuietly(kvStateIterator.f0); } kvStateIterators = null; @@ -2001,7 +2002,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) { kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId)); + new Tuple2<>(getRocksIterator(stateBackend.db, columnFamilyHandle, readOptions), kvStateId)); ++kvStateId; } @@ -2019,7 +2020,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.write(outputView); } - private void writeKVStateData() throws IOException, InterruptedException { + private void writeKVStateData() throws IOException, InterruptedException, RocksDBException { byte[] previousKey = null; byte[] previousValue = null; @@ -2519,4 +2520,21 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } } + + public static RocksIteratorWrapper getRocksIterator(RocksDB db) { + return new RocksIteratorWrapper(db.newIterator()); + } + + public static RocksIteratorWrapper getRocksIterator( + RocksDB db, + ColumnFamilyHandle columnFamilyHandle) { + return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); + } + + public static RocksIteratorWrapper getRocksIterator( + RocksDB db, + ColumnFamilyHandle columnFamilyHandle, + ReadOptions readOptions) { + return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 219f3ae..b84b7a2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -549,7 +548,7 @@ public class RocksDBMapState<K, N, UK, UV> // use try-with-resources to ensure RocksIterator can be release even some runtime exception // occurred in the below code block. - try (RocksIterator iterator = db.newIterator(columnFamily)) { + try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) { /* * The iteration starts from the prefix bytes at the first loading. The cache then is http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java new file mode 100644 index 0000000..f98e3f5 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksIteratorWrapper.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.RocksIteratorInterface; + +import javax.annotation.Nonnull; + +import java.io.Closeable; + +/** + * This is a wrapper around {@link RocksIterator} to check the iterator status for all the methods mentioned + * to require this check in the wiki documentation: seek, next, seekToFirst, seekToLast, seekForPrev, and prev. + * This is required because the iterator may pass the blocks or files it had difficulties in reading (because + * of IO error, data corruption or other issues) and continue with the next available keys. The status flag may not be + * OK, even if the iterator is valid. More information can be found + * <a href="https://github.com/facebook/rocksdb/wiki/Iterator#error-handling">here</a>. + */ +public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable { + + private RocksIterator iterator; + + public RocksIteratorWrapper(@Nonnull RocksIterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean isValid() { + return this.iterator.isValid(); + } + + @Override + public void seekToFirst() { + iterator.seekToFirst(); + status(); + } + + @Override + public void seekToLast() { + iterator.seekToFirst(); + status(); + } + + @Override + public void seek(byte[] target) { + iterator.seek(target); + status(); + } + + @Override + public void next() { + iterator.next(); + status(); + } + + @Override + public void prev() { + iterator.prev(); + status(); + } + + @Override + public void status() { + try { + iterator.status(); + } catch (RocksDBException ex) { + throw new FlinkRuntimeException("Internal exception found in RocksDB", ex); + } + } + + public byte[] key() { + return iterator.key(); + } + + public byte[] value() { + return iterator.value(); + } + + @Override + public void close() { + iterator.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java index 19f49f8..cb2b202 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java @@ -30,10 +30,8 @@ import org.junit.rules.TemporaryFolder; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; -import org.rocksdb.RocksIterator; import java.io.DataOutputStream; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -52,7 +50,7 @@ public class RocksDBMergeIteratorTest { public TemporaryFolder tempFolder = new TemporaryFolder(); @Test - public void testEmptyMergeIterator() throws IOException { + public void testEmptyMergeIterator() throws Exception { RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2); Assert.assertFalse(emptyIterator.isValid()); @@ -76,7 +74,7 @@ public class RocksDBMergeIteratorTest { Random random = new Random(1234); try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) { - List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); + List<Tuple2<RocksIteratorWrapper, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>(); int totalKeysExpected = 0; @@ -109,7 +107,7 @@ public class RocksDBMergeIteratorTest { int id = 0; for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) { - rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id)); + rocksIteratorsWithKVStateId.add(new Tuple2<>(RocksDBKeyedStateBackend.getRocksIterator(rocksDB, columnFamilyHandle.f0), id)); ++id; } http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java new file mode 100644 index 0000000..f560998 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorForKeysWrapperTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyHandle; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorForKeysWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + + <K> void testIteratorHelper( + TypeSerializer<K> keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + Function<Integer, K> getKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + keySerializer, + maxKeyGroupNumber, + new KeyGroupRange(0, maxKeyGroupNumber - 1), + mock(TaskKvStateRegistry.class)); + + try { + keyedStateBackend.restore(null); + ValueState<String> testState = keyedStateBackend.getPartitionedState( + namespace, + namespaceSerializer, + new ValueStateDescriptor<String>(testStateName, String.class)); + + // insert record + for (int i = 0; i < 1000; ++i) { + keyedStateBackend.setCurrentKey(getKeyFunc.apply(i)); + testState.update(String.valueOf(i)); + } + + ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8); + boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + RocksDBKeySerializationUtils.writeNameSpace( + namespace, + namespaceSerializer, + outputStream, + new DataOutputViewStreamWrapper(outputStream), + ambiguousKeyPossible); + + byte[] nameSpaceBytes = outputStream.toByteArray(); + + try ( + ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName); + RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(keyedStateBackend.db, handle); + RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper = + new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>( + iterator, + testStateName, + keySerializer, + keyedStateBackend.getKeyGroupPrefixBytes(), + ambiguousKeyPossible, + nameSpaceBytes)) { + + iterator.seekToFirst(); + + // valid record + List<Integer> fetchedKeys = new ArrayList<>(1000); + while (iteratorWrapper.hasNext()) { + fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString())); + } + + fetchedKeys.sort(Comparator.comparingInt(a -> a)); + Assert.assertEquals(1000, fetchedKeys.size()); + + for (int i = 0; i < 1000; ++i) { + Assert.assertEquals(i, fetchedKeys.get(i).intValue()); + } + } + } finally { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java deleted file mode 100644 index 0cdda4b..0000000 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksIterator; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.function.Function; - -import static org.mockito.Mockito.mock; - -/** - * Tests for the RocksIteratorWrapper. - */ -public class RocksDBRocksIteratorWrapperTest { - - @Rule - public final TemporaryFolder tmp = new TemporaryFolder(); - - @Test - public void testIterator() throws Exception{ - - // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false - testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); - - // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true - testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); - - // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false - testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); - - // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true - testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); - } - - <K> void testIteratorHelper( - TypeSerializer<K> keySerializer, - TypeSerializer namespaceSerializer, - int maxKeyGroupNumber, - Function<Integer, K> getKeyFunc) throws Exception { - - String testStateName = "aha"; - String namespace = "ns"; - - String dbPath = tmp.newFolder().getAbsolutePath(); - String checkpointPath = tmp.newFolder().toURI().toString(); - RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); - backend.setDbStoragePath(dbPath); - - Environment env = new DummyEnvironment("TestTask", 1, 0); - RocksDBKeyedStateBackend<K> keyedStateBackend = (RocksDBKeyedStateBackend<K>) backend.createKeyedStateBackend( - env, - new JobID(), - "Test", - keySerializer, - maxKeyGroupNumber, - new KeyGroupRange(0, maxKeyGroupNumber - 1), - mock(TaskKvStateRegistry.class)); - - try { - keyedStateBackend.restore(null); - ValueState<String> testState = keyedStateBackend.getPartitionedState( - namespace, - namespaceSerializer, - new ValueStateDescriptor<String>(testStateName, String.class)); - - // insert record - for (int i = 0; i < 1000; ++i) { - keyedStateBackend.setCurrentKey(getKeyFunc.apply(i)); - testState.update(String.valueOf(i)); - } - - ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(8); - boolean ambiguousKeyPossible = RocksDBKeySerializationUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); - RocksDBKeySerializationUtils.writeNameSpace( - namespace, - namespaceSerializer, - outputStream, - new DataOutputViewStreamWrapper(outputStream), - ambiguousKeyPossible); - - byte[] nameSpaceBytes = outputStream.toByteArray(); - - try ( - ColumnFamilyHandle handle = keyedStateBackend.getColumnFamilyHandle(testStateName); - RocksIterator iterator = keyedStateBackend.db.newIterator(handle); - RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<K> iteratorWrapper = - new RocksDBKeyedStateBackend.RocksIteratorForKeysWrapper<>( - iterator, - testStateName, - keySerializer, - keyedStateBackend.getKeyGroupPrefixBytes(), - ambiguousKeyPossible, - nameSpaceBytes)) { - - iterator.seekToFirst(); - - // valid record - List<Integer> fetchedKeys = new ArrayList<>(1000); - while (iteratorWrapper.hasNext()) { - fetchedKeys.add(Integer.parseInt(iteratorWrapper.next().toString())); - } - - fetchedKeys.sort(Comparator.comparingInt(a -> a)); - Assert.assertEquals(1000, fetchedKeys.size()); - - for (int i = 0; i < 1000; ++i) { - Assert.assertEquals(i, fetchedKeys.get(i).intValue()); - } - } - } finally { - if (keyedStateBackend != null) { - keyedStateBackend.dispose(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c75c1529/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java index e05e7ae..8724615 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state.benchmark; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper; import org.apache.flink.core.memory.MemoryUtils; import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; @@ -33,7 +34,6 @@ import org.rocksdb.CompactionStyle; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.Options; import org.rocksdb.RocksDB; -import org.rocksdb.RocksIterator; import org.rocksdb.WriteOptions; import sun.misc.Unsafe; @@ -166,7 +166,7 @@ public class RocksDBPerformanceTest extends TestLogger { int pos = 0; - try (final RocksIterator iterator = rocksDB.newIterator()) { + try (final RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(rocksDB)) { // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate);