[hotfix] [rocksdb] Convert performance benchmarks to unit tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05065451 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05065451 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05065451 Branch: refs/heads/master Commit: 05065451abf5917f796b00692f43f0a2d2f3ed48 Parents: ea054a7 Author: Stephan Ewen <[email protected]> Authored: Fri Apr 21 12:19:04 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Apr 21 12:19:04 2017 +0200 ---------------------------------------------------------------------- .../ListViaMergeSpeedMiniBenchmark.java | 111 ---------- .../ListViaRangeSpeedMiniBenchmark.java | 132 ------------ .../state/benchmark/RocksDBPerformanceTest.java | 204 +++++++++++++++++++ 3 files changed, 204 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java deleted file mode 100644 index f3e084f..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaMergeSpeedMiniBenchmark.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state.benchmark; - -import org.apache.flink.util.FileUtils; -import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.StringAppendOperator; -import org.rocksdb.WriteOptions; - -import java.io.File; -import java.nio.charset.StandardCharsets; - -public class ListViaMergeSpeedMiniBenchmark { - - public static void main(String[] args) throws Exception { - final File rocksDir = new File("/tmp/rdb"); - FileUtils.deleteDirectory(rocksDir); - - final Options options = new Options() - .setCompactionStyle(CompactionStyle.LEVEL) - .setLevelCompactionDynamicLevelBytes(true) - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1) - .setDisableDataSync(true) - .setCreateIfMissing(true) - .setMergeOperator(new StringAppendOperator()); - - final WriteOptions write_options = new WriteOptions() - .setSync(false) - .setDisableWAL(true); - - final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - - try { - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - - final int num = 50000; - - // ----- insert ----- - System.out.println("begin insert"); - - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - rocksDB.merge(write_options, keyBytes, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - - // ----- read (attempt 1) ----- - - final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; - final long beginGet1 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet1 = System.nanoTime(); - - System.out.println("end get - duration: " + ((endGet1 - beginGet1) / 1_000_000) + " ms"); - - // ----- read (attempt 2) ----- - - final long beginGet2 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet2 = System.nanoTime(); - - System.out.println("end get - duration: " + ((endGet2 - beginGet2) / 1_000_000) + " ms"); - - // ----- compact ----- - System.out.println("compacting..."); - final long beginCompact = System.nanoTime(); - rocksDB.compactRange(); - final long endCompact = System.nanoTime(); - - System.out.println("end compaction - duration: " + ((endCompact - beginCompact) / 1_000_000) + " ms"); - - // ----- read (attempt 3) ----- - - final long beginGet3 = System.nanoTime(); - rocksDB.get(keyBytes, resultHolder); - final long endGet3 = System.nanoTime(); - - System.out.println("end get - duration: " + ((endGet3 - beginGet3) / 1_000_000) + " ms"); - } finally { - rocksDB.close(); - options.close(); - write_options.close(); - FileUtils.deleteDirectory(rocksDir); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java deleted file mode 100644 index f46e2cd..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/ListViaRangeSpeedMiniBenchmark.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state.benchmark; - -import org.apache.flink.core.memory.MemoryUtils; -import org.apache.flink.util.FileUtils; -import org.rocksdb.CompactionStyle; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksIterator; -import org.rocksdb.StringAppendOperator; -import org.rocksdb.WriteOptions; -import sun.misc.Unsafe; - -import java.io.File; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; - -public class ListViaRangeSpeedMiniBenchmark { - - public static void main(String[] args) throws Exception { - final File rocksDir = new File("/tmp/rdb"); - FileUtils.deleteDirectory(rocksDir); - - final Options options = new Options() - .setCompactionStyle(CompactionStyle.LEVEL) - .setLevelCompactionDynamicLevelBytes(true) - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1) - .setDisableDataSync(true) - .setCreateIfMissing(true) - .setMergeOperator(new StringAppendOperator()); - - final WriteOptions write_options = new WriteOptions() - .setSync(false) - .setDisableWAL(true); - - final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); - - final String key = "key"; - final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; - - try { - - final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); - final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); - - final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); - - final Unsafe unsafe = MemoryUtils.UNSAFE; - final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; - - final int num = 50000; - System.out.println("begin insert"); - - final long beginInsert = System.nanoTime(); - for (int i = 0; i < num; i++) { - unsafe.putInt(keyTemplate, offset, i); - rocksDB.put(write_options, keyTemplate, valueBytes); - } - final long endInsert = System.nanoTime(); - System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); - - final byte[] resultHolder = new byte[num * valueBytes.length]; - - final long beginGet = System.nanoTime(); - - final RocksIterator iterator = rocksDB.newIterator(); - int pos = 0; - - try { - // seek to start - unsafe.putInt(keyTemplate, offset, 0); - iterator.seek(keyTemplate); - - // mark end - unsafe.putInt(keyTemplate, offset, -1); - - // iterate - while (iterator.isValid()) { - byte[] currKey = iterator.key(); - if (samePrefix(keyBytes, currKey)) { - byte[] currValue = iterator.value(); - System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); - pos += currValue.length; - iterator.next(); - } else { - break; - } - } - }finally { - iterator.close(); - } - - final long endGet = System.nanoTime(); - - System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); - } finally { - rocksDB.close(); - options.close(); - write_options.close(); - FileUtils.deleteDirectory(rocksDir); - } - } - - private static boolean samePrefix(byte[] prefix, byte[] key) { - for (int i = 0; i < prefix.length; i++) { - if (prefix[i] != key [i]) { - return false; - } - } - - return true; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/05065451/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java new file mode 100644 index 0000000..011703e --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.benchmark; + +import org.apache.flink.core.memory.MemoryUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.CompactionStyle; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.StringAppendOperator; +import org.rocksdb.WriteOptions; +import sun.misc.Unsafe; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Test that validates that the performance of RocksDB is as expected. + * This test guards against the bug filed as 'FLINK-5756' + */ +public class RocksDBPerformanceTest extends TestLogger { + + @Rule + public final TemporaryFolder TMP = new TemporaryFolder(); + + @Test(timeout = 2000) + public void testRocksDbMergePerformance() throws Exception { + final File rocksDir = TMP.newFolder("rdb"); + + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + + final int num = 50000; + + try ( + final Options options = new Options() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true) + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1) + .setDisableDataSync(true) + .setCreateIfMissing(true) + .setMergeOperator(new StringAppendOperator()); + + final WriteOptions write_options = new WriteOptions() + .setSync(false) + .setDisableWAL(true); + + final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) + { + // ----- insert ----- + log.info("begin insert"); + + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(write_options, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000); + + // ----- read (attempt 1) ----- + + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet1 - beginGet1) / 1_000_000); + + // ----- read (attempt 2) ----- + + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet2 - beginGet2) / 1_000_000); + + // ----- compact ----- + log.info("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); + + log.info("end compaction - duration: {} ms", (endCompact - beginCompact) / 1_000_000); + + // ----- read (attempt 3) ----- + + final long beginGet3 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet3 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet3 - beginGet3) / 1_000_000); + } + } + + @Test(timeout = 2000) + public void testRocksDbRangeGetPerformance() throws Exception { + final File rocksDir = TMP.newFolder("rdb"); + + final String key = "key"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + + final int num = 50000; + + try ( + final Options options = new Options() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true) + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1) + .setDisableDataSync(true) + .setCreateIfMissing(true) + .setMergeOperator(new StringAppendOperator()); + + final WriteOptions write_options = new WriteOptions() + .setSync(false) + .setDisableWAL(true); + + final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) + { + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + + log.info("begin insert"); + + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(write_options, keyTemplate, valueBytes); + } + final long endInsert = System.nanoTime(); + log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000); + + @SuppressWarnings("MismatchedReadAndWriteOfArray") + final byte[] resultHolder = new byte[num * valueBytes.length]; + + final long beginGet = System.nanoTime(); + + int pos = 0; + + try (final RocksIterator iterator = rocksDB.newIterator()) { + // seek to start + unsafe.putInt(keyTemplate, offset, 0); + iterator.seek(keyTemplate); + + // iterate + while (iterator.isValid() && samePrefix(keyBytes, iterator.key())) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); + pos += currValue.length; + iterator.next(); + } + } + + final long endGet = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet - beginGet) / 1_000_000); + } + } + + + private static boolean samePrefix(byte[] prefix, byte[] key) { + for (int i = 0; i < prefix.length; i++) { + if (prefix[i] != key [i]) { + return false; + } + } + + return true; + } +}
