[FLINK-3389] [rocksdb] Add pre-defined option profiles.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be72758d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be72758d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be72758d Branch: refs/heads/master Commit: be72758d1104400c8a48554d717c5b8cea5b3617 Parents: 82c7383 Author: Stephan Ewen <se...@apache.org> Authored: Thu Feb 11 15:30:56 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 11 21:34:03 2016 +0100 ---------------------------------------------------------------------- .../contrib/streaming/state/OptionsFactory.java | 32 ++++++- .../streaming/state/PredefinedOptions.java | 91 ++++++++++++++++++++ .../streaming/state/RocksDBStateBackend.java | 76 ++++++++++++++-- 3 files changed, 190 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java index 73b1e5d..3e52f1f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java @@ -24,8 +24,36 @@ import org.rocksdb.Options; * A factory for {@link Options} to be passed to the {@link RocksDBStateBackend}. * Options have to be created lazily by this factory, because the {@code Options} * class is not serializable and holds pointers to native code. + * + * <p>A typical pattern to use this OptionsFactory is as follows: + * + * <h3>Java 8:</h3> + * <pre>{@code + * rocksDbBackend.setOptions( (currentOptions) -> currentOptions.setMaxOpenFiles(1024) ); + * }</pre> + * + * <h3>Java 7:</h3> + * <pre>{@code + * rocksDbBackend.setOptions(new OptionsFactory() { + * + * public Options setOptions(Options currentOptions) { + * return currentOptions.setMaxOpenFiles(1024); + * } + * }) + * }</pre> */ public interface OptionsFactory extends java.io.Serializable { - - Options createOptions(); + + /** + * This method should set the additional options on top of the current options object. + * The current options object may contain pre-defined options based on flags that have + * been configured on the state backend. + * + * <p>It is important to set the options on the current object and return the result from + * the setter methods, otherwise the pre-defined options may get lost. + * + * @param currentOptions The options object with the pre-defined options. + * @return The options object on which the additional options are set. + */ + Options createOptions(Options currentOptions); } http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java new file mode 100644 index 0000000..383f043 --- /dev/null +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.rocksdb.CompactionStyle; +import org.rocksdb.Options; + +/** + * The {@code PredefinedOptions} are configuration settings for the {@link RocksDBStateBackend}. + * The various pre-defined choices are configurations that have been empirically + * determined to be beneficial for performance under different settings. + * + * <p>Some of these settings are based on experiments by the Flink community, some follow + * guides from the RocksDB project. + */ +public enum PredefinedOptions { + + /** + * Default options for all settings. + */ + DEFAULT { + + @Override + public Options createOptions() { + return new Options(); + } + }, + + /** + * Pre-defined options for regular spinning hard disks. + * + * <p>This constant configures RocksDB with some options that lead empirically + * to better performance when the machines executing the system use + * regular spinning hard disks. The following options are set: + * <ul> + * <li>Optimized level-style compactions</li> + * </ul> + */ + SPINNING_DISK_OPTIMIZED { + + @Override + public Options createOptions() { + return new Options() + .setCompactionStyle(CompactionStyle.LEVEL) + .optimizeLevelStyleCompaction(); + } + }, + + /** + * Pre-defined options for Flash SSDs. + * + * <p>This constant configures RocksDB with some options that lead empirically + * to better performance when the machines executing the system use SSDs. + * The following options are set: + * <ul> + * <li>none</li> + * </ul> + */ + FLASH_SSD_OPTIMIZED { + + @Override + public Options createOptions() { + return new Options(); + } + }; + + // ------------------------------------------------------------------------ + + /** + * Creates the options for this pre-defined setting. + * + * @return The pre-defined options object. + */ + public abstract Options createOptions(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/be72758d/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 8c0171a..eddd8c0 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.api.common.state.StateBackend; + import org.rocksdb.Options; import org.rocksdb.StringAppendOperator; @@ -45,6 +46,10 @@ import static java.util.Objects.requireNonNull; * For persistence against loss of machines, checkpoints take a snapshot of the * RocksDB database, and persist that snapshot in a file system (by default) or * another configurable state backend. + * + * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options + * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and + * {@link #setOptions(OptionsFactory)}. */ public class RocksDBStateBackend extends AbstractStateBackend { private static final long serialVersionUID = 1L; @@ -62,6 +67,9 @@ public class RocksDBStateBackend extends AbstractStateBackend { private JobID jobId; private AbstractStateBackend backingStateBackend; + + /** The pre-configured option settings */ + private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; /** The options factory to create the RocksDB options in the cluster */ private OptionsFactory optionsFactory; @@ -69,6 +77,7 @@ public class RocksDBStateBackend extends AbstractStateBackend { /** The options from the options factory, cached */ private transient Options rocksDbOptions; + // ------------------------------------------------------------------------ public RocksDBStateBackend(String dbBasePath, String checkpointDirectory, AbstractStateBackend backingStateBackend) { this.dbBasePath = requireNonNull(dbBasePath); @@ -76,10 +85,14 @@ public class RocksDBStateBackend extends AbstractStateBackend { this.backingStateBackend = requireNonNull(backingStateBackend); } + // ------------------------------------------------------------------------ + @Override - public void initializeForJob(Environment env, - String operatorIdentifier, - TypeSerializer<?> keySerializer) throws Exception { + public void initializeForJob( + Environment env, + String operatorIdentifier, + TypeSerializer<?> keySerializer) throws Exception + { super.initializeForJob(env, operatorIdentifier, keySerializer); this.operatorIdentifier = operatorIdentifier.replace(" ", ""); backingStateBackend.initializeForJob(env, operatorIdentifier, keySerializer); @@ -106,6 +119,10 @@ public class RocksDBStateBackend extends AbstractStateBackend { return checkpointDirectory + "/" + jobId.toString() + "/" + operatorIdentifier + "/" + stateName; } + // ------------------------------------------------------------------------ + // State factories + // ------------------------------------------------------------------------ + @Override protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception { @@ -154,9 +171,42 @@ public class RocksDBStateBackend extends AbstractStateBackend { // ------------------------------------------------------------------------ /** - * Defines the {@link org.rocksdb.Options} for the RocksDB instances. + * Sets the predefined options for RocksDB. + * + * <p>If a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}), + * then the options from the factory are applied on top of the here specified + * predefined options. + * + * @param options The options to set (must not be null). + */ + public void setPredefinedOptions(PredefinedOptions options) { + predefinedOptions = requireNonNull(options); + } + + /** + * Gets the currently set predefined options for RocksDB. + * The default options (if nothing was set via {@link #setPredefinedOptions(PredefinedOptions)}) + * are {@link PredefinedOptions#DEFAULT}. + * + * <p>If a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}), + * then the options from the factory are applied on top of the predefined options. + * + * @return The currently set predefined options for RocksDB. + */ + public PredefinedOptions getPredefinedOptions() { + return predefinedOptions; + } + + /** + * Sets {@link org.rocksdb.Options} for the RocksDB instances. * Because the options are not serializable and hold native code references, - * they must be specified through a factory. + * they must be specified through a factory. + * + * <p>The options created by the factory here are applied on top of the pre-defined + * options profile selected via {@link #setPredefinedOptions(PredefinedOptions)}. + * If the pre-defined options profile is the default + * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls the RocksDB + * options. * * @param optionsFactory The options factory that lazily creates the RocksDB options. */ @@ -172,12 +222,24 @@ public class RocksDBStateBackend extends AbstractStateBackend { public OptionsFactory getOptions() { return optionsFactory; } - + + /** + * Gets the RocksDB Options to be used for all RocksDB instances. + */ Options getRocksDBOptions() { if (rocksDbOptions == null) { - Options opt = optionsFactory == null ? new Options() : optionsFactory.createOptions(); + // initial options from pre-defined profile + Options opt = predefinedOptions.createOptions(); + + // add user-defined options, if specified + if (optionsFactory != null) { + opt = optionsFactory.createOptions(opt); + } + + // add necessary default options opt = opt.setCreateIfMissing(true); opt = opt.setMergeOperator(new StringAppendOperator()); + rocksDbOptions = opt; } return rocksDbOptions;