http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java new file mode 100644 index 0000000..73dc0be --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; + +/** + * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. + * The various pre-defined choices are configurations that have been empirically + * determined to be beneficial for performance under different settings. + * + * <p>Some of these settings are based on experiments by the Flink community, some follow + * guides from the RocksDB project. + */ +public enum PredefinedOptions { + + /** + * Default options for all settings, except that writes are not forced to the + * disk. + * + * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, + * there is no need to sync data to stable storage. + */ + DEFAULT { + + @Override + public DBOptions createDBOptions() { + return new DBOptions() + .setUseFsync(false); + } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions(); + } + + }, + + /** + * Pre-defined options for regular spinning hard disks. + * + * <p>This constant configures RocksDB with some options that lead empirically + * to better performance when the machines executing the system use + * regular spinning hard disks. + * + * <p>The following options are set: + * <ul> + * <li>setCompactionStyle(CompactionStyle.LEVEL)</li> + * <li>setLevelCompactionDynamicLevelBytes(true)</li> + * <li>setIncreaseParallelism(4)</li> + * <li>setUseFsync(false)</li> + * <li>setDisableDataSync(true)</li> + * <li>setMaxOpenFiles(-1)</li> + * </ul> + * + * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, + * there is no need to sync data to stable storage. + */ + SPINNING_DISK_OPTIMIZED { + + @Override + public DBOptions createDBOptions() { + + return new DBOptions() + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1); + } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true); + } + }, + + /** + * Pre-defined options for better performance on regular spinning hard disks, + * at the cost of a higher memory consumption. + * + * <p><b>NOTE: These settings will cause RocksDB to consume a lot of memory for + * block caching and compactions. If you experience out-of-memory problems related to, + * RocksDB, consider switching back to {@link #SPINNING_DISK_OPTIMIZED}.</b></p> + * + * <p>The following options are set: + * <ul> + * <li>setLevelCompactionDynamicLevelBytes(true)</li> + * <li>setTargetFileSizeBase(256 MBytes)</li> + * <li>setMaxBytesForLevelBase(1 GByte)</li> + * <li>setWriteBufferSize(64 MBytes)</li> + * <li>setIncreaseParallelism(4)</li> + * <li>setMinWriteBufferNumberToMerge(3)</li> + * <li>setMaxWriteBufferNumber(4)</li> + * <li>setUseFsync(false)</li> + * <li>setMaxOpenFiles(-1)</li> + * <li>BlockBasedTableConfig.setBlockCacheSize(256 MBytes)</li> + * <li>BlockBasedTableConfigsetBlockSize(128 KBytes)</li> + * </ul> + * + * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, + * there is no need to sync data to stable storage. + */ + SPINNING_DISK_OPTIMIZED_HIGH_MEM { + + @Override + public DBOptions createDBOptions() { + + return new DBOptions() + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1); + } + + @Override + public ColumnFamilyOptions createColumnOptions() { + + final long blockCacheSize = 256 * 1024 * 1024; + final long blockSize = 128 * 1024; + final long targetFileSize = 256 * 1024 * 1024; + final long writeBufferSize = 64 * 1024 * 1024; + + return new ColumnFamilyOptions() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true) + .setTargetFileSizeBase(targetFileSize) + .setMaxBytesForLevelBase(4 * targetFileSize) + .setWriteBufferSize(writeBufferSize) + .setMinWriteBufferNumberToMerge(3) + .setMaxWriteBufferNumber(4) + .setTableFormatConfig( + new BlockBasedTableConfig() + .setBlockCacheSize(blockCacheSize) + .setBlockSize(blockSize) + .setFilter(new BloomFilter()) + ); + } + }, + + /** + * Pre-defined options for Flash SSDs. + * + * <p>This constant configures RocksDB with some options that lead empirically + * to better performance when the machines executing the system use SSDs. + * + * <p>The following options are set: + * <ul> + * <li>setIncreaseParallelism(4)</li> + * <li>setUseFsync(false)</li> + * <li>setDisableDataSync(true)</li> + * <li>setMaxOpenFiles(-1)</li> + * </ul> + * + * <p>Note: Because Flink does not rely on RocksDB data on disk for recovery, + * there is no need to sync data to stable storage. + */ + FLASH_SSD_OPTIMIZED { + + @Override + public DBOptions createDBOptions() { + return new DBOptions() + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1); + } + + @Override + public ColumnFamilyOptions createColumnOptions() { + return new ColumnFamilyOptions(); + } + }; + + // ------------------------------------------------------------------------ + + /** + * Creates the {@link DBOptions}for this pre-defined setting. + * + * @return The pre-defined options object. + */ + public abstract DBOptions createDBOptions(); + + /** + * Creates the {@link org.rocksdb.ColumnFamilyOptions}for this pre-defined setting. + * + * @return The pre-defined options object. + */ + public abstract ColumnFamilyOptions createColumnOptions(); + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java new file mode 100644 index 0000000..2c07814 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBAggregatingState.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.state.AggregatingState; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.internal.InternalAggregatingState; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.util.Collection; + +/** + * An {@link AggregatingState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key + * @param <N> The type of the namespace + * @param <T> The type of the values that aggregated into the state + * @param <ACC> The type of the value stored in the state (the accumulator type) + * @param <R> The type of the value returned from the state + */ +public class RocksDBAggregatingState<K, N, T, ACC, R> + extends AbstractRocksDBState<K, N, AggregatingState<T, R>, AggregatingStateDescriptor<T, ACC, R>, ACC> + implements InternalAggregatingState<N, T, R> { + + /** Serializer for the values. */ + private final TypeSerializer<ACC> valueSerializer; + + /** User-specified aggregation function. */ + private final AggregateFunction<T, ACC, R> aggFunction; + + /** + * We disable writes to the write-ahead-log here. We can't have these in the base class + * because JNI segfaults for some reason if they are. + */ + private final WriteOptions writeOptions; + + /** + * Creates a new {@code RocksDBFoldingState}. + * + * @param namespaceSerializer + * The serializer for the namespace. + * @param stateDesc + * The state identifier for the state. This contains the state name and aggregation function. + */ + public RocksDBAggregatingState( + ColumnFamilyHandle columnFamily, + TypeSerializer<N> namespaceSerializer, + AggregatingStateDescriptor<T, ACC, R> stateDesc, + RocksDBKeyedStateBackend<K> backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.valueSerializer = stateDesc.getSerializer(); + this.aggFunction = stateDesc.getAggregateFunction(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + @Override + public R get() throws IOException { + try { + // prepare the current key and namespace for RocksDB lookup + writeCurrentKeyWithGroupAndNamespace(); + final byte[] key = keySerializationStream.toByteArray(); + + // get the current value + final byte[] valueBytes = backend.db.get(columnFamily, key); + + if (valueBytes == null) { + return null; + } + + ACC accumulator = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + return aggFunction.getResult(accumulator); + } + catch (IOException | RocksDBException e) { + throw new IOException("Error while retrieving value from RocksDB", e); + } + } + + @Override + public void add(T value) throws IOException { + try { + // prepare the current key and namespace for RocksDB lookup + writeCurrentKeyWithGroupAndNamespace(); + final byte[] key = keySerializationStream.toByteArray(); + keySerializationStream.reset(); + + // get the current value + final byte[] valueBytes = backend.db.get(columnFamily, key); + + // deserialize the current accumulator, or create a blank one + ACC accumulator = valueBytes == null ? + aggFunction.createAccumulator() : + valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + + // aggregate the value into the accumulator + accumulator = aggFunction.add(value, accumulator); + + // serialize the new accumulator + final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + valueSerializer.serialize(accumulator, out); + + // write the new value to RocksDB + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); + } + catch (IOException | RocksDBException e) { + throw new IOException("Error while adding value to RocksDB", e); + } + } + + @Override + public void mergeNamespaces(N target, Collection<N> sources) throws Exception { + if (sources == null || sources.isEmpty()) { + return; + } + + // cache key and namespace + final K key = backend.getCurrentKey(); + final int keyGroup = backend.getCurrentKeyGroupIndex(); + + try { + ACC current = null; + + // merge the sources to the target + for (N source : sources) { + if (source != null) { + writeKeyWithGroupAndNamespace( + keyGroup, key, source, + keySerializationStream, keySerializationDataOutputView); + + final byte[] sourceKey = keySerializationStream.toByteArray(); + final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); + backend.db.delete(columnFamily, sourceKey); + + if (valueBytes != null) { + ACC value = valueSerializer.deserialize( + new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + + if (current != null) { + current = aggFunction.merge(current, value); + } + else { + current = value; + } + } + } + } + + // if something came out of merging the sources, merge it or write it to the target + if (current != null) { + // create the target full-binary-key + writeKeyWithGroupAndNamespace( + keyGroup, key, target, + keySerializationStream, keySerializationDataOutputView); + + final byte[] targetKey = keySerializationStream.toByteArray(); + final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey); + + if (targetValueBytes != null) { + // target also had a value, merge + ACC value = valueSerializer.deserialize( + new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes))); + + current = aggFunction.merge(current, value); + } + + // serialize the resulting value + keySerializationStream.reset(); + valueSerializer.serialize(current, keySerializationDataOutputView); + + // write the resulting value + backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray()); + } + } + catch (Exception e) { + throw new Exception("Error while merging state in RocksDB", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java new file mode 100644 index 0000000..479565e --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.state.FoldingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.internal.InternalFoldingState; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; + +import java.io.IOException; + +/** + * {@link FoldingState} implementation that stores state in RocksDB. + * + * @param <K> The type of the key. + * @param <N> The type of the namespace. + * @param <T> The type of the values that can be folded into the state. + * @param <ACC> The type of the value in the folding state. + * + * @deprecated will be removed in a future version + */ +@Deprecated +public class RocksDBFoldingState<K, N, T, ACC> + extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, ACC> + implements InternalFoldingState<N, T, ACC> { + + /** Serializer for the values. */ + private final TypeSerializer<ACC> valueSerializer; + + /** User-specified fold function. */ + private final FoldFunction<T, ACC> foldFunction; + + /** + * We disable writes to the write-ahead-log here. We can't have these in the base class + * because JNI segfaults for some reason if they are. + */ + private final WriteOptions writeOptions; + + /** + * Creates a new {@code RocksDBFoldingState}. + * + * @param namespaceSerializer The serializer for the namespace. + * @param stateDesc The state identifier for the state. This contains name + * and can create a default state value. + */ + public RocksDBFoldingState(ColumnFamilyHandle columnFamily, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + RocksDBKeyedStateBackend<K> backend) { + + super(columnFamily, namespaceSerializer, stateDesc, backend); + + this.valueSerializer = stateDesc.getSerializer(); + this.foldFunction = stateDesc.getFoldFunction(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + @Override + public ACC get() { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, key); + if (valueBytes == null) { + return null; + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while retrieving data from RocksDB", e); + } + } + + @Override + public void add(T value) throws IOException { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, key); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + if (valueBytes == null) { + keySerializationStream.reset(); + valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); + } else { + ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); + ACC newValue = foldFunction.fold(oldValue, value); + keySerializationStream.reset(); + valueSerializer.serialize(newValue, out); + backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); + } + } catch (Exception e) { + throw new RuntimeException("Error while adding data to RocksDB", e); + } + } + +}