Repository: flink
Updated Branches:
  refs/heads/master 28c6254ee -> 9ee167949


[FLINK-3355] [rocksdb backend] Allow passing options to the RocksDB backend.

This also cleans up the generics in the RocksDB state classes.

This closes #1608


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ee16794
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ee16794
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ee16794

Branch: refs/heads/master
Commit: 9ee16794909d18aa84e8d0b738a6a447d11e6eeb
Parents: 28c6254
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 8 19:55:29 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Feb 9 11:03:09 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   | 113 +++++++++----------
 .../contrib/streaming/state/OptionsFactory.java |  31 +++++
 .../streaming/state/RocksDBListState.java       |  68 ++++++-----
 .../streaming/state/RocksDBReducingState.java   |  86 +++++++-------
 .../streaming/state/RocksDBStateBackend.java    |  76 +++++++++++--
 .../streaming/state/RocksDBValueState.java      |  74 ++++++------
 6 files changed, 273 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 783332c..05e15e8 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -1,36 +1,38 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.commons.io.FileUtils;
+
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.util.HDFSCopyFromLocal;
 import org.apache.flink.util.HDFSCopyToLocal;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+
 import org.rocksdb.BackupEngine;
 import org.rocksdb.BackupableDBOptions;
 import org.rocksdb.Env;
@@ -38,7 +40,7 @@ import org.rocksdb.Options;
 import org.rocksdb.RestoreOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.StringAppendOperator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,10 +62,9 @@ import static java.util.Objects.requireNonNull;
  * @param <N> The type of the namespace.
  * @param <S> The type of {@link State}.
  * @param <SD> The type of {@link StateDescriptor}.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
-       implements KvState<K, N, S, SD, Backend>, State {
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>>
+       implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBState.class);
 
@@ -95,9 +96,11 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
        protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               File dbPath,
-               String checkpointPath) {
+                       TypeSerializer<N> namespaceSerializer,
+                       File dbPath,
+                       String checkpointPath,
+                       Options options) {
+               
                this.keySerializer = requireNonNull(keySerializer);
                this.namespaceSerializer = namespaceSerializer;
                this.dbPath = dbPath;
@@ -105,9 +108,6 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
 
                RocksDB.loadLibrary();
 
-               Options options = new Options().setCreateIfMissing(true);
-               options.setMergeOperator(new StringAppendOperator());
-
                if (!dbPath.exists()) {
                        if (!dbPath.mkdirs()) {
                                throw new RuntimeException("Could not create 
RocksDB data directory.");
@@ -128,9 +128,6 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                } catch (RocksDBException e) {
                        throw new RuntimeException("Error while opening RocksDB 
instance.", e);
                }
-
-               options.dispose();
-
        }
 
        /**
@@ -143,10 +140,11 @@ public abstract class AbstractRocksDBState<K, N, S 
extends State, SD extends Sta
         * @param restorePath The path to a backup directory from which to 
restore RocksDb database.
         */
        protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               File dbPath,
-               String checkpointPath,
-               String restorePath) {
+                       TypeSerializer<N> namespaceSerializer,
+                       File dbPath,
+                       String checkpointPath,
+                       String restorePath,
+                       Options options) {
 
                RocksDB.loadLibrary();
 
@@ -162,9 +160,6 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                this.dbPath = dbPath;
                this.checkpointPath = checkpointPath;
 
-               Options options = new Options().setCreateIfMissing(true);
-               options.setMergeOperator(new StringAppendOperator());
-
                if (!dbPath.exists()) {
                        if (!dbPath.mkdirs()) {
                                throw new RuntimeException("Could not create 
RocksDB data directory.");
@@ -176,8 +171,6 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                } catch (RocksDBException e) {
                        throw new RuntimeException("Error while opening RocksDB 
instance.", e);
                }
-
-               options.dispose();
        }
 
        // 
------------------------------------------------------------------------
@@ -211,12 +204,10 @@ public abstract class AbstractRocksDBState<K, N, S 
extends State, SD extends Sta
                this.currentNamespace = namespace;
        }
 
-       protected abstract KvStateSnapshot<K, N, S, SD, Backend> 
createRocksDBSnapshot(URI backupUri, long checkpointId);
+       protected abstract AbstractRocksDBSnapshot<K, N, S, SD> 
createRocksDBSnapshot(URI backupUri, long checkpointId);
 
        @Override
-       final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
-               long checkpointId,
-               long timestamp) throws Exception {
+       public final AbstractRocksDBSnapshot<K, N, S, SD> snapshot(long 
checkpointId, long timestamp) throws Exception {
                boolean success = false;
 
                final File localBackupPath = new File(dbPath, "backup-" + 
checkpointId);
@@ -234,7 +225,7 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        }
 
                        HDFSCopyFromLocal.copyFromLocal(localBackupPath, 
backupUri);
-                       KvStateSnapshot<K, N, S, SD, Backend> result = 
createRocksDBSnapshot(backupUri, checkpointId);
+                       AbstractRocksDBSnapshot<K, N, S, SD> result = 
createRocksDBSnapshot(backupUri, checkpointId);
                        success = true;
                        return result;
                } finally {
@@ -256,7 +247,9 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                }
        }
 
-       public static abstract class AbstractRocksDBSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
+       public static abstract class AbstractRocksDBSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S, ?>>
+                       implements KvStateSnapshot<K, N, S, SD, 
RocksDBStateBackend>
+       {
                private static final long serialVersionUID = 1L;
 
                private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
@@ -293,12 +286,13 @@ public abstract class AbstractRocksDBState<K, N, S 
extends State, SD extends Sta
                protected final SD stateDesc;
 
                public AbstractRocksDBSnapshot(File dbPath,
-                       String checkpointPath,
-                       URI backupUri,
-                       long checkpointId,
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       SD stateDesc) {
+                               String checkpointPath,
+                               URI backupUri,
+                               long checkpointId,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               SD stateDesc) {
+                       
                        this.dbPath = dbPath;
                        this.checkpointPath = checkpointPath;
                        this.backupUri = backupUri;
@@ -309,19 +303,21 @@ public abstract class AbstractRocksDBState<K, N, S 
extends State, SD extends Sta
                        this.namespaceSerializer = namespaceSerializer;
                }
 
-               protected abstract KvState<K, N, S, SD, Backend> 
createRocksDBState(TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       SD stateDesc,
-                       File dbPath,
-                       String backupPath,
-                       String restorePath) throws Exception;
+               protected abstract KvState<K, N, S, SD, RocksDBStateBackend> 
createRocksDBState(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               SD stateDesc,
+                               File dbPath,
+                               String backupPath,
+                               String restorePath,
+                               Options options) throws Exception;
 
                @Override
-               public final KvState<K, N, S, SD, Backend> restoreState(
-                       Backend stateBackend,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader classLoader,
-                       long recoveryTimestamp) throws Exception {
+               public final KvState<K, N, S, SD, RocksDBStateBackend> 
restoreState(
+                               RocksDBStateBackend stateBackend,
+                               TypeSerializer<K> keySerializer,
+                               ClassLoader classLoader,
+                               long recoveryTimestamp) throws Exception {
 
                        // validity checks
                        if (!this.keySerializer.equals(keySerializer)) {
@@ -352,7 +348,8 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        }
 
                        HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
-                       return createRocksDBState(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, 
localBackupPath.getAbsolutePath());
+                       return createRocksDBState(keySerializer, 
namespaceSerializer, stateDesc, dbPath, 
+                                       checkpointPath, 
localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
new file mode 100644
index 0000000..73b1e5d
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.Options;
+
+/**
+ * A factory for {@link Options} to be passed to the {@link 
RocksDBStateBackend}.
+ * Options have to be created lazily by this factory, because the {@code 
Options}
+ * class is not serializable and holds pointers to native code.
+ */
+public interface OptionsFactory extends java.io.Serializable {
+       
+       Options createOptions();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index e97e65d..da07f75 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ListState;
@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -44,10 +45,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of the values in the list state.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
-       extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, Backend>
+public class RocksDBListState<K, N, V>
+       extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>>
        implements ListState<V> {
 
        /** Serializer for the values */
@@ -66,11 +66,13 @@ public class RocksDBListState<K, N, V, Backend extends 
AbstractStateBackend>
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
        protected RocksDBListState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ListStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ListStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
options);
                this.stateDesc = requireNonNull(stateDesc);
                this.valueSerializer = stateDesc.getSerializer();
        }
@@ -85,12 +87,14 @@ public class RocksDBListState<K, N, V, Backend extends 
AbstractStateBackend>
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
        protected RocksDBListState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ListStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath,
-               String restorePath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ListStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath, options);
                this.stateDesc = requireNonNull(stateDesc);
                this.valueSerializer = stateDesc.getSerializer();
        }
@@ -143,13 +147,16 @@ public class RocksDBListState<K, N, V, Backend extends 
AbstractStateBackend>
        }
 
        @Override
-       protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, 
Backend> createRocksDBSnapshot(
-               URI backupUri,
-               long checkpointId) {
+       protected AbstractRocksDBSnapshot<K, N, ListState<V>, 
ListStateDescriptor<V>> createRocksDBSnapshot(
+                       URI backupUri,
+                       long checkpointId) {
+               
                return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
        }
 
-       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, 
ListStateDescriptor<V>, Backend> {
+       private static class Snapshot<K, N, V> extends 
+                       AbstractRocksDBSnapshot<K, N, ListState<V>, 
ListStateDescriptor<V>>
+       {
                private static final long serialVersionUID = 1L;
 
                public Snapshot(File dbPath,
@@ -169,14 +176,17 @@ public class RocksDBListState<K, N, V, Backend extends 
AbstractStateBackend>
                }
 
                @Override
-               protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, 
Backend> createRocksDBState(
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       ListStateDescriptor<V> stateDesc,
-                       File dbPath,
-                       String backupPath,
-                       String restorePath) throws Exception {
-                       return new RocksDBListState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, 
RocksDBStateBackend> createRocksDBState(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               ListStateDescriptor<V> stateDesc,
+                               File dbPath,
+                               String backupPath,
+                               String restorePath,
+                               Options options) throws Exception {
+                       
+                       return new RocksDBListState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, 
+                                       checkpointPath, restorePath, options);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index eb21c3b..81f9ffb 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -1,22 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.streaming.state;
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -35,15 +16,17 @@ package org.apache.flink.contrib.streaming.state;
  * limitations under the License.
  */
 
+package org.apache.flink.contrib.streaming.state;
+
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -60,10 +43,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public class RocksDBReducingState<K, N, V, Backend extends 
AbstractStateBackend>
-       extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend>
+public class RocksDBReducingState<K, N, V>
+       extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>>
        implements ReducingState<V> {
 
        /** Serializer for the values */
@@ -85,23 +67,27 @@ public class RocksDBReducingState<K, N, V, Backend extends 
AbstractStateBackend>
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
        protected RocksDBReducingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ReducingStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
options);
                this.stateDesc = requireNonNull(stateDesc);
                this.valueSerializer = stateDesc.getSerializer();
                this.reduceFunction = stateDesc.getReduceFunction();
        }
 
        protected RocksDBReducingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath,
-               String restorePath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ReducingStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath, options);
                this.stateDesc = stateDesc;
                this.valueSerializer = stateDesc.getSerializer();
                this.reduceFunction = stateDesc.getReduceFunction();
@@ -150,13 +136,16 @@ public class RocksDBReducingState<K, N, V, Backend 
extends AbstractStateBackend>
        }
 
        @Override
-       protected KvStateSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot(
-               URI backupUri,
-               long checkpointId) {
+       protected AbstractRocksDBSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>> createRocksDBSnapshot(
+                       URI backupUri,
+                       long checkpointId) {
+               
                return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
        }
 
-       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> {
+       private static class Snapshot<K, N, V> extends 
+                       AbstractRocksDBSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>>
+       {
                private static final long serialVersionUID = 1L;
 
                public Snapshot(File dbPath,
@@ -176,14 +165,17 @@ public class RocksDBReducingState<K, N, V, Backend 
extends AbstractStateBackend>
                }
 
                @Override
-               protected KvState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> createRocksDBState(
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       ReducingStateDescriptor<V> stateDesc,
-                       File dbPath,
-                       String backupPath,
-                       String restorePath) throws Exception {
-                       return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               protected KvState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, RocksDBStateBackend> createRocksDBState(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               ReducingStateDescriptor<V> stateDesc,
+                               File dbPath,
+                               String backupPath,
+                               String restorePath,
+                               Options options) throws Exception {
+                       
+                       return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, 
+                                       dbPath, checkpointPath, restorePath, 
options);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index eefa4a9..8c0171a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -31,11 +31,20 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.api.common.state.StateBackend;
+import org.rocksdb.Options;
+import org.rocksdb.StringAppendOperator;
 
 import static java.util.Objects.requireNonNull;
 
 /**
- *
+ * A {@link StateBackend} that stores its state in {@code RocksDB}. This state 
backend can
+ * store very large state that exceeds memory and spills to disk.
+ * 
+ * <p>All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+ * For persistence against loss of machines, checkpoints take a snapshot of the
+ * RocksDB database, and persist that snapshot in a file system (by default) or
+ * another configurable state backend.
  */
 public class RocksDBStateBackend extends AbstractStateBackend {
        private static final long serialVersionUID = 1L;
@@ -53,6 +62,13 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        private JobID jobId;
 
        private AbstractStateBackend backingStateBackend;
+       
+       /** The options factory to create the RocksDB options in the cluster */
+       private OptionsFactory optionsFactory;
+       
+       /** The options from the options factory, cached */
+       private transient Options rocksDbOptions;
+       
 
        public RocksDBStateBackend(String dbBasePath, String 
checkpointDirectory, AbstractStateBackend backingStateBackend) {
                this.dbBasePath = requireNonNull(dbBasePath);
@@ -71,13 +87,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
-       public void disposeAllStateForCurrentJob() throws Exception {
-
-       }
+       public void disposeAllStateForCurrentJob() throws Exception {}
 
        @Override
        public void close() throws Exception {
-
+               Options opt = this.rocksDbOptions;
+               if (opt != null) {
+                       opt.dispose();
+                       this.rocksDbOptions = null;
+               }
        }
 
        private File getDbPath(String stateName) {
@@ -93,7 +111,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                ValueStateDescriptor<T> stateDesc) throws Exception {
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
-               return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+               
+               return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, 
+                               stateDesc, dbPath, checkpointPath, 
getRocksDBOptions());
        }
 
        @Override
@@ -101,7 +121,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                ListStateDescriptor<T> stateDesc) throws Exception {
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
-               return new RocksDBListState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+               
+               return new RocksDBListState<>(keySerializer, 
namespaceSerializer, 
+                               stateDesc, dbPath, checkpointPath, 
getRocksDBOptions());
        }
 
        @Override
@@ -109,7 +131,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                ReducingStateDescriptor<T> stateDesc) throws Exception {
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
-               return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+               
+               return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, 
+                               stateDesc, dbPath, checkpointPath, 
getRocksDBOptions());
        }
 
        @Override
@@ -124,4 +148,38 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
                long timestamp) throws Exception {
                return backingStateBackend.checkpointStateSerializable(state, 
checkpointID, timestamp);
        }
+       
+       // 
------------------------------------------------------------------------
+       //  Parametrize with Options
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Defines the {@link org.rocksdb.Options} for the RocksDB instances.
+        * Because the options are not serializable and hold native code 
references,
+        * they must be specified through a factory. 
+        * 
+        * @param optionsFactory The options factory that lazily creates the 
RocksDB options.
+        */
+       public void setOptions(OptionsFactory optionsFactory) {
+               this.optionsFactory = optionsFactory;
+       }
+
+       /**
+        * Gets the options factory that lazily creates the RocksDB options.
+        * 
+        * @return The options factory.
+        */
+       public OptionsFactory getOptions() {
+               return optionsFactory;
+       }
+       
+       Options getRocksDBOptions() {
+               if (rocksDbOptions == null) {
+                       Options opt = optionsFactory == null ? new Options() : 
optionsFactory.createOptions();
+                       opt = opt.setCreateIfMissing(true);
+                       opt = opt.setMergeOperator(new StringAppendOperator());
+                       rocksDbOptions = opt;
+               }
+               return rocksDbOptions;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ee16794/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index f51e160..388f099 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,15 +6,16 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ValueState;
@@ -22,9 +23,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -41,10 +42,9 @@ import static java.util.Objects.requireNonNull;
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <V> The type of value that the state state stores.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
-       extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>, Backend>
+public class RocksDBValueState<K, N, V>
+       extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>>
        implements ValueState<V> {
 
        /** Serializer for the values */
@@ -63,22 +63,26 @@ public class RocksDBValueState<K, N, V, Backend extends 
AbstractStateBackend>
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
        protected RocksDBValueState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ValueStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
options);
                this.stateDesc = requireNonNull(stateDesc);
                this.valueSerializer = stateDesc.getSerializer();
        }
 
        protected RocksDBValueState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<V> stateDesc,
-               File dbPath,
-               String backupPath,
-               String restorePath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+                       TypeSerializer<N> namespaceSerializer,
+                       ValueStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath, options);
                this.stateDesc = stateDesc;
                this.valueSerializer = stateDesc.getSerializer();
        }
@@ -120,13 +124,16 @@ public class RocksDBValueState<K, N, V, Backend extends 
AbstractStateBackend>
        }
 
        @Override
-       protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, 
Backend> createRocksDBSnapshot(
-               URI backupUri,
-               long checkpointId) {
+       protected AbstractRocksDBSnapshot<K, N, ValueState<V>, 
ValueStateDescriptor<V>> createRocksDBSnapshot(
+                       URI backupUri,
+                       long checkpointId) {
+               
                return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
        }
 
-       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, 
ValueStateDescriptor<V>, Backend> {
+       private static class Snapshot<K, N, V> 
+                       extends AbstractRocksDBSnapshot<K, N, ValueState<V>, 
ValueStateDescriptor<V>>
+       {
                private static final long serialVersionUID = 1L;
 
                public Snapshot(File dbPath,
@@ -146,14 +153,17 @@ public class RocksDBValueState<K, N, V, Backend extends 
AbstractStateBackend>
                }
 
                @Override
-               protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, 
Backend> createRocksDBState(
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       ValueStateDescriptor<V> stateDesc,
-                       File dbPath,
-                       String backupPath,
-                       String restorePath) throws Exception {
-                       return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, 
RocksDBStateBackend> createRocksDBState(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               ValueStateDescriptor<V> stateDesc,
+                               File dbPath,
+                               String backupPath,
+                               String restorePath,
+                               Options options) throws Exception {
+                       
+                       return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, 
+                                       checkpointPath, restorePath, options);
                }
        }
 }

Reply via email to