Repository: flink Updated Branches: refs/heads/master b6bfcf008 -> e927ec0be
[FLINK-2991] Adjust RocksDB Folding State to latest RocksDBStateBackend Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e927ec0b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e927ec0b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e927ec0b Branch: refs/heads/master Commit: e927ec0bee4cf951e49b838538efa679e1af13e2 Parents: 94cba89 Author: Stephan Ewen <[email protected]> Authored: Fri Feb 12 15:35:08 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 12 18:51:01 2016 +0100 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 6 +- .../streaming/state/RocksDBFoldingState.java | 89 +++++++++++--------- .../streaming/state/RocksDBStateBackend.java | 3 +- .../runtime/state/AbstractStateBackend.java | 2 +- 4 files changed, 57 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 05e15e8..7d3172a 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -95,7 +95,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta * @param namespaceSerializer The serializer for the namespace. * @param dbPath The path on the local system where RocksDB data should be stored. */ - protected AbstractRocksDBState(TypeSerializer<K> keySerializer, + protected AbstractRocksDBState( + TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File dbPath, String checkpointPath, @@ -139,7 +140,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta * @param dbPath The path on the local system where RocksDB data should be stored. * @param restorePath The path to a backup directory from which to restore RocksDb database. */ - protected AbstractRocksDBState(TypeSerializer<K> keySerializer, + protected AbstractRocksDBState( + TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File dbPath, String checkpointPath, http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 7e4e573..d7b75bd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -21,13 +21,12 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.state.FoldingState; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; + +import org.rocksdb.Options; import org.rocksdb.RocksDBException; import java.io.ByteArrayInputStream; @@ -39,16 +38,15 @@ import java.net.URI; import static java.util.Objects.requireNonNull; /** - * {@link ReducingState} implementation that stores state in RocksDB. + * {@link FoldingState} implementation that stores state in RocksDB. * * @param <K> The type of the key. * @param <N> The type of the namespace. * @param <T> The type of the values that can be folded into the state. * @param <ACC> The type of the value in the folding state. - * @param <Backend> The type of the backend that snapshots this key/value state. */ -public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBackend> - extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> +public class RocksDBFoldingState<K, N, T, ACC> + extends AbstractRocksDBState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> implements FoldingState<T, ACC> { /** Serializer for the values */ @@ -70,12 +68,16 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBack * @param dbPath The path on the local system where RocksDB data should be stored. * @param backupPath The path where to store backups. */ - protected RocksDBFoldingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - File dbPath, - String backupPath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath); + protected RocksDBFoldingState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, options); + this.stateDesc = requireNonNull(stateDesc); this.valueSerializer = stateDesc.getSerializer(); this.foldFunction = stateDesc.getFoldFunction(); @@ -92,13 +94,17 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBack * @param backupPath The path where to store backups. * @param restorePath The path on the local file system that we are restoring from. */ - protected RocksDBFoldingState(TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - File dbPath, - String backupPath, - String restorePath) { - super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath); + protected RocksDBFoldingState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) { + + super(keySerializer, namespaceSerializer, dbPath, backupPath, restorePath, options); + this.stateDesc = stateDesc; this.valueSerializer = stateDesc.getSerializer(); this.foldFunction = stateDesc.getFoldFunction(); @@ -147,22 +153,24 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBack } @Override - protected KvStateSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBSnapshot( - URI backupUri, - long checkpointId) { + protected AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> createRocksDBSnapshot( + URI backupUri, long checkpointId) { + return new Snapshot<>(dbPath, checkpointPath, backupUri, checkpointId, keySerializer, namespaceSerializer, stateDesc); } - private static class Snapshot<K, N, T, ACC, Backend extends AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> { + private static class Snapshot<K, N, T, ACC> extends AbstractRocksDBSnapshot<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>> { private static final long serialVersionUID = 1L; - public Snapshot(File dbPath, - String checkpointPath, - URI backupUri, - long checkpointId, - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc) { + public Snapshot( + File dbPath, + String checkpointPath, + URI backupUri, + long checkpointId, + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc) { + super(dbPath, checkpointPath, backupUri, @@ -173,14 +181,17 @@ public class RocksDBFoldingState<K, N, T, ACC, Backend extends AbstractStateBack } @Override - protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> createRocksDBState( - TypeSerializer<K> keySerializer, - TypeSerializer<N> namespaceSerializer, - FoldingStateDescriptor<T, ACC> stateDesc, - File dbPath, - String backupPath, - String restorePath) throws Exception { - return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath); + protected KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, RocksDBStateBackend> + createRocksDBState( + TypeSerializer<K> keySerializer, + TypeSerializer<N> namespaceSerializer, + FoldingStateDescriptor<T, ACC> stateDesc, + File dbPath, + String backupPath, + String restorePath, + Options options) throws Exception { + + return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath, options); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index b323c5e..04bb17c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -281,7 +281,8 @@ public class RocksDBStateBackend extends AbstractStateBackend { File dbPath = getDbPath(stateDesc.getName()); String checkpointPath = getCheckpointPath(stateDesc.getName()); - return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, stateDesc, dbPath, checkpointPath); + return new RocksDBFoldingState<>(keySerializer, namespaceSerializer, + stateDesc, dbPath, checkpointPath, getRocksDBOptions()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e927ec0b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index beccd86..3e99362 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -159,7 +159,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable { * @param <T> Type of the values folded into the state * @param <ACC> Type of the value in the state * */ - abstract protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; + protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception; /** * Sets the current key that is used for partitioned state.
