[FLINK-8695] [rocksdb] Move flink-statebackend-rocksdb from 'flink-contrib' to 'flink-state-backends'.
This closes #5523 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f7392d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f7392d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f7392d7 Branch: refs/heads/master Commit: 2f7392d77d85823d3db1f1e5a8d4f6c94358d773 Parents: 3d52f52 Author: Stephan Ewen <se...@apache.org> Authored: Mon Feb 19 10:31:44 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Feb 19 18:37:49 2018 +0100 ---------------------------------------------------------------------- .../flink-statebackend-rocksdb/pom.xml | 88 - .../streaming/state/AbstractRocksDBState.java | 267 --- .../contrib/streaming/state/OptionsFactory.java | 74 - .../streaming/state/PredefinedOptions.java | 209 -- .../state/RocksDBAggregatingState.java | 206 -- .../streaming/state/RocksDBFoldingState.java | 122 -- .../state/RocksDBKeyedStateBackend.java | 2033 ------------------ .../streaming/state/RocksDBListState.java | 230 -- .../streaming/state/RocksDBMapState.java | 532 ----- .../streaming/state/RocksDBReducingState.java | 189 -- .../streaming/state/RocksDBStateBackend.java | 691 ------ .../state/RocksDBStateBackendFactory.java | 49 - .../streaming/state/RocksDBValueState.java | 106 - .../state/RocksDBAsyncSnapshotTest.java | 505 ----- .../streaming/state/RocksDBInitResetTest.java | 32 - .../state/RocksDBMergeIteratorTest.java | 152 -- .../state/RocksDBStateBackendConfigTest.java | 416 ---- .../state/RocksDBStateBackendFactoryTest.java | 176 -- .../state/RocksDBStateBackendTest.java | 526 ----- .../state/RocksDbMultiClassLoaderTest.java | 73 - .../RocksDBListStatePerformanceTest.java | 168 -- .../state/benchmark/RocksDBPerformanceTest.java | 204 -- .../src/test/resources/log4j-test.properties | 27 - flink-contrib/pom.xml | 1 - .../flink-statebackend-rocksdb/pom.xml | 94 + .../streaming/state/AbstractRocksDBState.java | 267 +++ .../contrib/streaming/state/OptionsFactory.java | 74 + .../streaming/state/PredefinedOptions.java | 209 ++ .../state/RocksDBAggregatingState.java | 206 ++ .../streaming/state/RocksDBFoldingState.java | 122 ++ .../state/RocksDBKeyedStateBackend.java | 2033 ++++++++++++++++++ .../streaming/state/RocksDBListState.java | 230 ++ .../streaming/state/RocksDBMapState.java | 532 +++++ .../streaming/state/RocksDBReducingState.java | 189 ++ .../streaming/state/RocksDBStateBackend.java | 691 ++++++ .../state/RocksDBStateBackendFactory.java | 49 + .../streaming/state/RocksDBValueState.java | 106 + .../state/RocksDBAsyncSnapshotTest.java | 505 +++++ .../streaming/state/RocksDBInitResetTest.java | 32 + .../state/RocksDBMergeIteratorTest.java | 152 ++ .../state/RocksDBStateBackendConfigTest.java | 416 ++++ .../state/RocksDBStateBackendFactoryTest.java | 176 ++ .../state/RocksDBStateBackendTest.java | 526 +++++ .../state/RocksDbMultiClassLoaderTest.java | 73 + .../RocksDBListStatePerformanceTest.java | 168 ++ .../state/benchmark/RocksDBPerformanceTest.java | 204 ++ .../src/test/resources/log4j-test.properties | 27 + flink-state-backends/pom.xml | 42 + pom.xml | 1 + tools/travis_mvn_watchdog.sh | 2 +- 50 files changed, 7125 insertions(+), 7077 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml deleted file mode 100644 index f9a4910..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/pom.xml +++ /dev/null @@ -1,88 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-contrib</artifactId> - <version>1.5-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> - <name>flink-statebackend-rocksdb</name> - - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.rocksdb</groupId> - <artifactId>rocksdbjni</artifactId> - <version>5.6.1</version> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java deleted file mode 100644 index 969a1fc..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.Preconditions; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.IOException; - -/** - * Base class for {@link State} implementations that store state in a RocksDB database. - * - * <p>State is not stored in this class but in the {@link org.rocksdb.RocksDB} instance that - * the {@link RocksDBStateBackend} manages and checkpoints. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <S> The type of {@link State}. - * @param <SD> The type of {@link StateDescriptor}. - */ -public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S, V>, V> - implements InternalKvState<N>, State { - - /** Serializer for the namespace. */ - final TypeSerializer<N> namespaceSerializer; - - /** The current namespace, which the next value methods will refer to. */ - private N currentNamespace; - - /** Backend that holds the actual RocksDB instance where we store state. */ - protected RocksDBKeyedStateBackend<K> backend; - - /** The column family of this particular instance of state. */ - protected ColumnFamilyHandle columnFamily; - - /** State descriptor from which to create this state instance. */ - protected final SD stateDesc; - - /** - * We disable writes to the write-ahead-log here. - */ - private final WriteOptions writeOptions; - - protected final ByteArrayOutputStreamWithPos keySerializationStream; - protected final DataOutputView keySerializationDataOutputView; - - private final boolean ambiguousKeyPossible; - - /** - * Creates a new RocksDB backed state. - * @param namespaceSerializer The serializer for the namespace. - */ - protected AbstractRocksDBState( - ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - SD stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - this.namespaceSerializer = namespaceSerializer; - this.backend = backend; - - this.columnFamily = columnFamily; - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor"); - - this.keySerializationStream = new ByteArrayOutputStreamWithPos(128); - this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream); - this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0) - && (namespaceSerializer.getLength() < 0); - } - - // ------------------------------------------------------------------------ - - @Override - public void clear() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - backend.db.remove(columnFamily, writeOptions, key); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while removing entry from RocksDB", e); - } - } - - @Override - public void setCurrentNamespace(N namespace) { - this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace"); - } - - @Override - @SuppressWarnings("unchecked") - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - - //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation - Tuple2<K, N> des = KvStateSerializer.<K, N>deserializeKeyAndNamespace( - serializedKeyAndNamespace, - backend.getKeySerializer(), - namespaceSerializer); - - int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - - // we cannot reuse the keySerializationStream member since this method - // is called concurrently to the other ones and it may thus contain garbage - ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128); - DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream); - - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, - tmpKeySerializationStream, tmpKeySerializationDateDataOutputView); - - return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray()); - } - - protected void writeCurrentKeyWithGroupAndNamespace() throws IOException { - writeKeyWithGroupAndNamespace( - backend.getCurrentKeyGroupIndex(), - backend.getCurrentKey(), - currentNamespace, - keySerializationStream, - keySerializationDataOutputView); - } - - protected void writeKeyWithGroupAndNamespace( - int keyGroup, K key, N namespace, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - - Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); - - keySerializationStream.reset(); - writeKeyGroup(keyGroup, keySerializationDataOutputView); - writeKey(key, keySerializationStream, keySerializationDataOutputView); - writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); - } - - private void writeKeyGroup( - int keyGroup, - DataOutputView keySerializationDateDataOutputView) throws IOException { - for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { - keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); - } - } - - private void writeKey( - K key, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - //write key - int beforeWrite = keySerializationStream.getPosition(); - backend.getKeySerializer().serialize(key, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write size of key - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); - } - } - - private void writeNameSpace( - N namespace, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - int beforeWrite = keySerializationStream.getPosition(); - namespaceSerializer.serialize(namespace, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write length of namespace - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); - } - } - - private static void writeLengthFrom( - int fromPosition, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDateDataOutputView) throws IOException { - int length = keySerializationStream.getPosition() - fromPosition; - writeVariableIntBytes(length, keySerializationDateDataOutputView); - } - - private static void writeVariableIntBytes( - int value, - DataOutputView keySerializationDateDataOutputView) - throws IOException { - do { - keySerializationDateDataOutputView.writeByte(value); - value >>>= 8; - } while (value != 0); - } - - protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int keyGroup = readKeyGroup(inputView); - K key = readKey(inputStream, inputView); - N namespace = readNamespace(inputStream, inputView); - - return new Tuple3<>(keyGroup, key, namespace); - } - - private int readKeyGroup(DataInputView inputView) throws IOException { - int keyGroup = 0; - for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) { - keyGroup <<= 8; - keyGroup |= (inputView.readByte() & 0xFF); - } - return keyGroup; - } - - private K readKey(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int beforeRead = inputStream.getPosition(); - K key = backend.getKeySerializer().deserialize(inputView); - if (ambiguousKeyPossible) { - int length = inputStream.getPosition() - beforeRead; - readVariableIntBytes(inputView, length); - } - return key; - } - - private N readNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int beforeRead = inputStream.getPosition(); - N namespace = namespaceSerializer.deserialize(inputView); - if (ambiguousKeyPossible) { - int length = inputStream.getPosition() - beforeRead; - readVariableIntBytes(inputView, length); - } - return namespace; - } - - private void readVariableIntBytes(DataInputView inputView, int value) throws IOException { - do { - inputView.readByte(); - value >>>= 8; - } while (value != 0); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java deleted file mode 100644 index 34f7f62..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; - -/** - * A factory for {@link DBOptions} to be passed to the {@link RocksDBStateBackend}. - * Options have to be created lazily by this factory, because the {@code Options} - * class is not serializable and holds pointers to native code. - * - * <p>A typical pattern to use this OptionsFactory is as follows: - * - * <h3>Java 8:</h3> - * <pre>{@code - * rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) ); - * }</pre> - * - * <h3>Java 7:</h3> - * <pre>{@code - * rocksDbBackend.setOptions(new OptionsFactory() { - * - * public Options setOptions(Options currentOptions) { - * return currentOptions.setMaxOpenFiles(1024); - * } - * }) - * }</pre> - */ -public interface OptionsFactory extends java.io.Serializable { - - /** - * This method should set the additional options on top of the current options object. - * The current options object may contain pre-defined options based on flags that have - * been configured on the state backend. - * - * <p>It is important to set the options on the current object and return the result from - * the setter methods, otherwise the pre-defined options may get lost. - * - * @param currentOptions The options object with the pre-defined options. - * @return The options object on which the additional options are set. - */ - DBOptions createDBOptions(DBOptions currentOptions); - - /** - * This method should set the additional options on top of the current options object. - * The current options object may contain pre-defined options based on flags that have - * been configured on the state backend. - * - * <p>It is important to set the options on the current object and return the result from - * the setter methods, otherwise the pre-defined options may get lost. - * - * @param currentOptions The options object with the pre-defined options. - * @return The options object on which the additional options are set. - */ - ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java deleted file mode 100644 index 73dc0be..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.rocksdb.BlockBasedTableConfig; -import org.rocksdb.BloomFilter; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.CompactionStyle; -import org.rocksdb.DBOptions; - -/** - * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. - * The various pre-defined choices are configurations that have been empirically - * determined to be beneficial for performance under different settings. - * - * <p>Some of these settings are based on experiments by the Flink community, some follow - * guides from the RocksDB project. - */ -public enum PredefinedOptions { - - /** - * Default options for all settings, except that writes are not forced to the - * disk. - * - * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, - * there is no need to sync data to stable storage. - */ - DEFAULT { - - @Override - public DBOptions createDBOptions() { - return new DBOptions() - .setUseFsync(false); - } - - @Override - public ColumnFamilyOptions createColumnOptions() { - return new ColumnFamilyOptions(); - } - - }, - - /** - * Pre-defined options for regular spinning hard disks. - * - * <p>This constant configures RocksDB with some options that lead empirically - * to better performance when the machines executing the system use - * regular spinning hard disks. - * - * <p>The following options are set: - * <ul> - * <li>setCompactionStyle(CompactionStyle.LEVEL)</li> - * <li>setLevelCompactionDynamicLevelBytes(true)</li> - * <li>setIncreaseParallelism(4)</li> - * <li>setUseFsync(false)</li> - * <li>setDisableDataSync(true)</li> - * <li>setMaxOpenFiles(-1)</li> - * </ul> - * - * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, - * there is no need to sync data to stable storage. - */ - SPINNING_DISK_OPTIMIZED { - - @Override - public DBOptions createDBOptions() { - - return new DBOptions() - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1); - } - - @Override - public ColumnFamilyOptions createColumnOptions() { - return new ColumnFamilyOptions() - .setCompactionStyle(CompactionStyle.LEVEL) - .setLevelCompactionDynamicLevelBytes(true); - } - }, - - /** - * Pre-defined options for better performance on regular spinning hard disks, - * at the cost of a higher memory consumption. - * - * <p><b>NOTE: These settings will cause RocksDB to consume a lot of memory for - * block caching and compactions. If you experience out-of-memory problems related to, - * RocksDB, consider switching back to {@link #SPINNING_DISK_OPTIMIZED}.</b></p> - * - * <p>The following options are set: - * <ul> - * <li>setLevelCompactionDynamicLevelBytes(true)</li> - * <li>setTargetFileSizeBase(256 MBytes)</li> - * <li>setMaxBytesForLevelBase(1 GByte)</li> - * <li>setWriteBufferSize(64 MBytes)</li> - * <li>setIncreaseParallelism(4)</li> - * <li>setMinWriteBufferNumberToMerge(3)</li> - * <li>setMaxWriteBufferNumber(4)</li> - * <li>setUseFsync(false)</li> - * <li>setMaxOpenFiles(-1)</li> - * <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li> - * <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li> - * </ul> - * - * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, - * there is no need to sync data to stable storage. - */ - SPINNING_DISK_OPTIMIZED_HIGH_MEM { - - @Override - public DBOptions createDBOptions() { - - return new DBOptions() - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1); - } - - @Override - public ColumnFamilyOptions createColumnOptions() { - - final long blockCacheSize = 256 * 1024 * 1024; - final long blockSize = 128 * 1024; - final long targetFileSize = 256 * 1024 * 1024; - final long writeBufferSize = 64 * 1024 * 1024; - - return new ColumnFamilyOptions() - .setCompactionStyle(CompactionStyle.LEVEL) - .setLevelCompactionDynamicLevelBytes(true) - .setTargetFileSizeBase(targetFileSize) - .setMaxBytesForLevelBase(4 * targetFileSize) - .setWriteBufferSize(writeBufferSize) - .setMinWriteBufferNumberToMerge(3) - .setMaxWriteBufferNumber(4) - .setTableFormatConfig( - new BlockBasedTableConfig() - .setBlockCacheSize(blockCacheSize) - .setBlockSize(blockSize) - .setFilter(new BloomFilter()) - ); - } - }, - - /** - * Pre-defined options for Flash SSDs. - * - * <p>This constant configures RocksDB with some options that lead empirically - * to better performance when the machines executing the system use SSDs. - * - * <p>The following options are set: - * <ul> - * <li>setIncreaseParallelism(4)</li> - * <li>setUseFsync(false)</li> - * <li>setDisableDataSync(true)</li> - * <li>setMaxOpenFiles(-1)</li> - * </ul> - * - * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, - * there is no need to sync data to stable storage. - */ - FLASH_SSD_OPTIMIZED { - - @Override - public DBOptions createDBOptions() { - return new DBOptions() - .setIncreaseParallelism(4) - .setUseFsync(false) - .setMaxOpenFiles(-1); - } - - @Override - public ColumnFamilyOptions createColumnOptions() { - return new ColumnFamilyOptions(); - } - }; - - // ------------------------------------------------------------------------ - - /** - * Creates the {@link DBOptions}for this pre-defined setting. - * - * @return The pre-defined options object. - */ - public abstract DBOptions createDBOptions(); - - /** - * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting. - * - * @return The pre-defined options object. - */ - public abstract ColumnFamilyOptions createColumnOptions(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java deleted file mode 100644 index 2c07814..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.state.AggregatingState; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.internal.InternalAggregatingState; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.IOException; -import java.util.Collection; - -/** - * An {@link AggregatingState} implementation that stores state in RocksDB. - * - * @param <K> The type of the key - * @param <N> The type of the namespace - * @param <T> The type of the values that aggregated into the state - * @param <ACC> The type of the value stored in the state (the accumulator type) - * @param <R> The type of the value returned from the state - */ -public class RocksDBAggregatingState<K, N, T, ACC, R> - extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC> - implements InternalAggregatingState<N, T, R> { - - /** Serializer for the values. */ - private final TypeSerializer<ACC> valueSerializer; - - /** User-specified aggregation function. */ - private final AggregateFunction<T, ACC, R> aggFunction; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Creates a new {@code RocksDBFoldingState}. - * - * @param namespaceSerializer - * The serializer for the namespace. - * @param stateDesc - * The state identifier for the state. This contains the state name and aggregation function. - */ - public RocksDBAggregatingState( - ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - AggregatingStateDescriptor<T, ACC, R> stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - - this.valueSerializer = stateDesc.getSerializer(); - this.aggFunction = stateDesc.getAggregateFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - @Override - public R get() throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - return aggFunction.getResult(accumulator); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while retrieving value from RocksDB", e); - } - } - - @Override - public void add(T value) throws IOException { - try { - // prepare the current key and namespace for RocksDB lookup - writeCurrentKeyWithGroupAndNamespace(); - final byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - - // get the current value - final byte[] valueBytes = backend.db.get(columnFamily, key); - - // deserialize the current accumulator, or create a blank one - ACC accumulator = valueBytes == null ? - aggFunction.createAccumulator() : - valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - // aggregate the value into the accumulator - accumulator = aggFunction.add(value, accumulator); - - // serialize the new accumulator - final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - valueSerializer.serialize(accumulator, out); - - // write the new value to RocksDB - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - catch (IOException | RocksDBException e) { - throw new IOException("Error while adding value to RocksDB", e); - } - } - - @Override - public void mergeNamespaces(N target, Collection<N> sources) throws Exception { - if (sources == null || sources.isEmpty()) { - return; - } - - // cache key and namespace - final K key = backend.getCurrentKey(); - final int keyGroup = backend.getCurrentKeyGroupIndex(); - - try { - ACC current = null; - - // merge the sources to the target - for (N source : sources) { - if (source != null) { - writeKeyWithGroupAndNamespace( - keyGroup, key, source, - keySerializationStream, keySerializationDataOutputView); - - final byte[] sourceKey = keySerializationStream.toByteArray(); - final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); - backend.db.delete(columnFamily, sourceKey); - - if (valueBytes != null) { - ACC value = valueSerializer.deserialize( - new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - if (current != null) { - current = aggFunction.merge(current, value); - } - else { - current = value; - } - } - } - } - - // if something came out of merging the sources, merge it or write it to the target - if (current != null) { - // create the target full-binary-key - writeKeyWithGroupAndNamespace( - keyGroup, key, target, - keySerializationStream, keySerializationDataOutputView); - - final byte[] targetKey = keySerializationStream.toByteArray(); - final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey); - - if (targetValueBytes != null) { - // target also had a value, merge - ACC value = valueSerializer.deserialize( - new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes))); - - current = aggFunction.merge(current, value); - } - - // serialize the resulting value - keySerializationStream.reset(); - valueSerializer.serialize(current, keySerializationDataOutputView); - - // write the resulting value - backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray()); - } - } - catch (Exception e) { - throw new Exception("Error while merging state in RocksDB", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java deleted file mode 100644 index 479565e..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.state.FoldingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.internal.InternalFoldingState; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.IOException; - -/** - * {@link FoldingState} implementation that stores state in RocksDB. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <T> The type of the values that can be folded into the state. - * @param <ACC> The type of the value in the folding state. - * - * @deprecated will be removed in a future version - */ -@Deprecated -public class RocksDBFoldingState<K, N, T, ACC> - extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC> - implements InternalFoldingState<N, T, ACC> { - - /** Serializer for the values. */ - private final TypeSerializer<ACC> valueSerializer; - - /** User-specified fold function. */ - private final FoldFunction<T, ACC> foldFunction; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Creates a new {@code RocksDBFoldingState}. - * - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public RocksDBFoldingState(ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - - this.valueSerializer = stateDesc.getSerializer(); - this.foldFunction = stateDesc.getFoldFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - @Override - public ACC get() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - if (valueBytes == null) { - return null; - } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); - } - } - - @Override - public void add(T value) throws IOException { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - if (valueBytes == null) { - keySerializationStream.reset(); - valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } else { - ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - ACC newValue = foldFunction.fold(oldValue, value); - keySerializationStream.reset(); - valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } - } - -}