Repository: flink
Updated Branches:
  refs/heads/master b6bfcf008 -> e927ec0be


[FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e927ec0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e927ec0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e927ec0b

Branch: refs/heads/master
Commit: e927ec0bee4cf951e49b838538efa679e1af13e2
Parents: 94cba89
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 12 15:35:08 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 12 18:51:01 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  6 +-
 .../streaming/state/RocksDBFoldingState.java    | 89 +++++++++++---------
 .../streaming/state/RocksDBStateBackend.java    |  3 +-
 .../runtime/state/AbstractStateBackend.java     |  2 +-
 4 files changed, 57 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 05e15e8..7d3172a 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -95,7 +95,8 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
         * @param namespaceSerializer The serializer for the namespace.
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         */
-       protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+       protected AbstractRocksDBState(
+                       TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
                        File dbPath,
                        String checkpointPath,
@@ -139,7 +140,8 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         * @param restorePath The path to a backup directory from which to 
restore RocksDb database.
         */
-       protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+       protected AbstractRocksDBState(
+                       TypeSerializer<K> keySerializer,
                        TypeSerializer<N> namespaceSerializer,
                        File dbPath,
                        String checkpointPath,

http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 7e4e573..d7b75bd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -21,13 +21,12 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
+
+import org.rocksdb.Options;
 import org.rocksdb.RocksDBException;
 
 import java.io.ByteArrayInputStream;
@@ -39,16 +38,15 @@ import java.net.URI;
 import static java.util.Objects.requireNonNull;
 
 /**
- * {@link ReducingState} implementation that stores state in RocksDB.
+ * {@link FoldingState} implementation that stores state in RocksDB.
  *
  * @param <K> The type of the key.
  * @param <N> The type of the namespace.
  * @param <T> The type of the values that can be folded into the state.
  * @param <ACC> The type of the value in the folding state.
- * @param <Backend> The type of the backend that snapshots this key/value 
state.
  */
-public class RocksDBFoldingState<K, N, T, ACC, Backend extends 
AbstractStateBackend>
-       extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend>
+public class RocksDBFoldingState<K, N, T, ACC>
+       extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>>
        implements FoldingState<T, ACC> {
 
        /** Serializer for the values */
@@ -70,12 +68,16 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend 
extends AbstractStateBack
         * @param dbPath The path on the local system where RocksDB data should 
be stored.
         * @param backupPath The path where to store backups.
         */
-       protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc,
-               File dbPath,
-               String backupPath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+       protected RocksDBFoldingState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
options);
+               
                this.stateDesc = requireNonNull(stateDesc);
                this.valueSerializer = stateDesc.getSerializer();
                this.foldFunction = stateDesc.getFoldFunction();
@@ -92,13 +94,17 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend 
extends AbstractStateBack
         * @param backupPath The path where to store backups.
         * @param restorePath The path on the local file system that we are 
restoring from.
         */
-       protected RocksDBFoldingState(TypeSerializer<K> keySerializer,
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc,
-               File dbPath,
-               String backupPath,
-               String restorePath) {
-               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+       protected RocksDBFoldingState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       FoldingStateDescriptor<T, ACC> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath,
+                       Options options) {
+               
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath, options);
+               
                this.stateDesc = stateDesc;
                this.valueSerializer = stateDesc.getSerializer();
                this.foldFunction = stateDesc.getFoldFunction();
@@ -147,22 +153,24 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend 
extends AbstractStateBack
        }
 
        @Override
-       protected KvStateSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> createRocksDBSnapshot(
-               URI backupUri,
-               long checkpointId) {
+       protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot(
+                       URI backupUri, long checkpointId) {
+               
                return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
        }
 
-       private static class Snapshot<K, N, T, ACC, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, 
ACC>, FoldingStateDescriptor<T, ACC>, Backend> {
+       private static class Snapshot<K, N, T, ACC> extends 
AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, 
ACC>> {
                private static final long serialVersionUID = 1L;
 
-               public Snapshot(File dbPath,
-                       String checkpointPath,
-                       URI backupUri,
-                       long checkpointId,
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDesc) {
+               public Snapshot(
+                               File dbPath,
+                               String checkpointPath,
+                               URI backupUri,
+                               long checkpointId,
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               FoldingStateDescriptor<T, ACC> stateDesc) {
+                       
                        super(dbPath,
                                checkpointPath,
                                backupUri,
@@ -173,14 +181,17 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend 
extends AbstractStateBack
                }
 
                @Override
-               protected KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, Backend> createRocksDBState(
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDesc,
-                       File dbPath,
-                       String backupPath,
-                       String restorePath) throws Exception {
-                       return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               protected KvState<K, N, FoldingState<T, ACC>, 
FoldingStateDescriptor<T, ACC>, RocksDBStateBackend> 
+                       createRocksDBState(
+                               TypeSerializer<K> keySerializer,
+                               TypeSerializer<N> namespaceSerializer,
+                               FoldingStateDescriptor<T, ACC> stateDesc,
+                               File dbPath,
+                               String backupPath,
+                               String restorePath,
+                               Options options) throws Exception {
+                       
+                       return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b323c5e..04bb17c 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -281,7 +281,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
 
                File dbPath = getDbPath(stateDesc.getName());
                String checkpointPath = getCheckpointPath(stateDesc.getName());
-               return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+               return new RocksDBFoldingState<>(keySerializer, 
namespaceSerializer,
+                               stateDesc, dbPath, checkpointPath, 
getRocksDBOptions());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index beccd86..3e99362 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -159,7 +159,7 @@ public abstract class AbstractStateBackend implements 
java.io.Serializable {
         * @param <T> Type of the values folded into the state
         * @param <ACC> Type of the value in the state   *
         */
-       abstract protected <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+       protected abstract <N, T, ACC> FoldingState<T, ACC> 
createFoldingState(TypeSerializer<N> namespaceSerializer, 
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
 
        /**
         * Sets the current key that is used for partitioned state.

Reply via email to