[FLINK-3278] Add Partitioned State Backend Based on RocksDB

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/524e56bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/524e56bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/524e56bc

Branch: refs/heads/master
Commit: 524e56bcbf9faf4b7363abfefeb3b3fa53949bad
Parents: 67ca4a4
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Jan 21 10:56:47 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Feb 3 20:27:51 2016 +0100

----------------------------------------------------------------------
 .../flink-statebackend-rocksdb/pom.xml          |  71 ++++
 .../streaming/state/AbstractRocksDBState.java   | 372 +++++++++++++++++++
 .../streaming/state/RocksDBListState.java       | 183 +++++++++
 .../streaming/state/RocksDBReducingState.java   | 190 ++++++++++
 .../streaming/state/RocksDBStateBackend.java    | 127 +++++++
 .../streaming/state/RocksDBValueState.java      | 156 ++++++++
 .../state/RocksDBStateBackendTest.java          |  53 +++
 .../src/test/resources/log4j-test.properties    |  27 ++
 .../src/test/resources/log4j.properties         |  27 ++
 .../src/test/resources/logback-test.xml         |  30 ++
 flink-contrib/pom.xml                           |   1 +
 .../flink/util/ExternalProcessRunner.java       | 233 ++++++++++++
 .../apache/flink/util/HDFSCopyFromLocal.java    |  48 +++
 .../org/apache/flink/util/HDFSCopyToLocal.java  |  49 +++
 .../flink/util/ExternalProcessRunnerTest.java   |  98 +++++
 flink-tests/pom.xml                             |   7 +
 .../EventTimeWindowCheckpointingITCase.java     |   9 +
 17 files changed, 1681 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml 
b/flink-contrib/flink-statebackend-rocksdb/pom.xml
new file mode 100644
index 0000000..999c496
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-contrib</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+       <name>flink-statebackend-rocksdb</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients_2.10</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.guava</groupId>
+                       <artifactId>guava</artifactId>
+                       <version>${guava.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.rocksdb</groupId>
+                       <artifactId>rocksdbjni</artifactId>
+                       <version>4.1.0</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
new file mode 100644
index 0000000..6dbe16c
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.util.HDFSCopyFromLocal;
+import org.apache.flink.util.HDFSCopyToLocal;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.rocksdb.BackupEngine;
+import org.rocksdb.BackupableDBOptions;
+import org.rocksdb.Env;
+import org.rocksdb.Options;
+import org.rocksdb.RestoreOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.StringAppendOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for {@link State} implementations that store state in a RocksDB 
database.
+ *
+ * <p>This base class is responsible for setting up the RocksDB database, for
+ * checkpointing/restoring the database and for disposal in the {@link 
#dispose()} method. The
+ * concrete subclasses just use the RocksDB handle to store/retrieve state.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <S> The type of {@link State}.
+ * @param <SD> The type of {@link StateDescriptor}.
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public abstract class AbstractRocksDBState<K, N, S extends State, SD extends 
StateDescriptor<S>, Backend extends AbstractStateBackend>
+       implements KvState<K, N, S, SD, Backend>, State {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBState.class);
+
+       /** Serializer for the keys */
+       protected final TypeSerializer<K> keySerializer;
+
+       /** Serializer for the namespace */
+       protected final TypeSerializer<N> namespaceSerializer;
+
+       /** The current key, which the next value methods will refer to */
+       protected K currentKey;
+
+       /** The current namespace, which the next value methods will refer to */
+       protected N currentNamespace;
+
+       /** Store it so that we can clean up in dispose() */
+       protected final File dbPath;
+
+       protected final String checkpointPath;
+
+       /** Our RocksDB instance */
+       protected final RocksDB db;
+
+       /**
+        * Creates a new RocksDB backed state.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        */
+       protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               File dbPath,
+               String checkpointPath) {
+               this.keySerializer = requireNonNull(keySerializer);
+               this.namespaceSerializer = namespaceSerializer;
+               this.dbPath = dbPath;
+               this.checkpointPath = checkpointPath;
+
+               RocksDB.loadLibrary();
+
+               Options options = new Options().setCreateIfMissing(true);
+               options.setMergeOperator(new StringAppendOperator());
+
+               if (!dbPath.exists()) {
+                       if (!dbPath.mkdirs()) {
+                               throw new RuntimeException("Could not create 
RocksDB data directory.");
+                       }
+               }
+
+               // clean it, this will remove the last part of the path but 
RocksDB will recreate it
+               try {
+                       File db = new File(dbPath, "db");
+                       LOG.warn("Deleting already existing db directory {}.", 
db);
+                       FileUtils.deleteDirectory(db);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error cleaning RocksDB data 
directory.", e);
+               }
+
+               try {
+                       db = RocksDB.open(options, new File(dbPath, 
"db").getAbsolutePath());
+               } catch (RocksDBException e) {
+                       throw new RuntimeException("Error while opening RocksDB 
instance.", e);
+               }
+
+               options.dispose();
+
+       }
+
+       /**
+        * Creates a new RocksDB backed state and restores from the given 
backup directory. After
+        * restoring the backup directory is deleted.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        * @param restorePath The path to a backup directory from which to 
restore RocksDb database.
+        */
+       protected AbstractRocksDBState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               File dbPath,
+               String checkpointPath,
+               String restorePath) {
+
+               RocksDB.loadLibrary();
+
+               try {
+                       BackupEngine backupEngine = 
BackupEngine.open(Env.getDefault(), new BackupableDBOptions(restorePath + "/"));
+                       backupEngine.restoreDbFromLatestBackup(new File(dbPath, 
"db").getAbsolutePath(), new File(dbPath, "db").getAbsolutePath(), new 
RestoreOptions(true));
+                       FileUtils.deleteDirectory(new File(restorePath));
+               } catch (RocksDBException|IOException|IllegalArgumentException 
e) {
+                       throw new RuntimeException("Error while restoring 
RocksDB state from " + restorePath, e);
+               }
+
+               this.keySerializer = requireNonNull(keySerializer);
+               this.namespaceSerializer = namespaceSerializer;
+               this.dbPath = dbPath;
+               this.checkpointPath = checkpointPath;
+
+               Options options = new Options().setCreateIfMissing(true);
+               options.setMergeOperator(new StringAppendOperator());
+
+               if (!dbPath.exists()) {
+                       if (!dbPath.mkdirs()) {
+                               throw new RuntimeException("Could not create 
RocksDB data directory.");
+                       }
+               }
+
+               try {
+                       db = RocksDB.open(options, new File(dbPath, 
"db").getAbsolutePath());
+               } catch (RocksDBException e) {
+                       throw new RuntimeException("Error while opening RocksDB 
instance.", e);
+               }
+
+               options.dispose();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       final public void clear() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       db.remove(key);
+               } catch (IOException|RocksDBException e) {
+                       throw new RuntimeException("Error while removing entry 
from RocksDB", e);
+               }
+       }
+
+       protected void writeKeyAndNamespace(DataOutputView out) throws 
IOException {
+               keySerializer.serialize(currentKey, out);
+               out.writeByte(42);
+               namespaceSerializer.serialize(currentNamespace, out);
+       }
+
+       @Override
+       final public void setCurrentKey(K currentKey) {
+               this.currentKey = currentKey;
+       }
+
+       @Override
+       final public void setCurrentNamespace(N namespace) {
+               this.currentNamespace = namespace;
+       }
+
+       protected abstract KvStateSnapshot<K, N, S, SD, Backend> 
createRocksDBSnapshot(URI backupUri, long checkpointId);
+
+       @Override
+       final public KvStateSnapshot<K, N, S, SD, Backend> snapshot(
+               long checkpointId,
+               long timestamp) throws Exception {
+               boolean success = false;
+
+               final File localBackupPath = new File(dbPath, "backup-" + 
checkpointId);
+               final URI backupUri = new URI(checkpointPath + "/chk-" + 
checkpointId);
+
+               try {
+                       if (!localBackupPath.exists()) {
+                               if (!localBackupPath.mkdirs()) {
+                                       throw new RuntimeException("Could not 
create local backup path " + localBackupPath);
+                               }
+                       }
+
+                       BackupEngine backupEngine = 
BackupEngine.open(Env.getDefault(),
+                               new 
BackupableDBOptions(localBackupPath.getAbsolutePath()));
+
+                       backupEngine.createNewBackup(db);
+
+                       HDFSCopyFromLocal.copyFromLocal(localBackupPath, 
backupUri);
+                       KvStateSnapshot<K, N, S, SD, Backend> result = 
createRocksDBSnapshot(backupUri, checkpointId);
+                       success = true;
+                       return result;
+               } finally {
+                       FileUtils.deleteDirectory(localBackupPath);
+                       if (!success) {
+                               FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
+                               fs.delete(new Path(backupUri), true);
+                       }
+               }
+       }
+
+       @Override
+       final public void dispose() {
+               db.dispose();
+               try {
+                       FileUtils.deleteDirectory(dbPath);
+               } catch (IOException e) {
+                       throw new RuntimeException("Error disposing RocksDB 
data directory.", e);
+               }
+       }
+
+       public static abstract class AbstractRocksDBSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBSnapshot.class);
+
+               // 
------------------------------------------------------------------------
+               //  Ctor parameters for RocksDB state
+               // 
------------------------------------------------------------------------
+
+               /** Store it so that we can clean up in dispose() */
+               protected final File dbPath;
+
+               /** Where we should put RocksDB backups */
+               protected final String checkpointPath;
+
+               // 
------------------------------------------------------------------------
+               //  Info about this checkpoint
+               // 
------------------------------------------------------------------------
+
+               protected final URI backupUri;
+
+               protected long checkpointId;
+
+               // 
------------------------------------------------------------------------
+               //  For sanity checks
+               // 
------------------------------------------------------------------------
+
+               /** Key serializer */
+               protected final TypeSerializer<K> keySerializer;
+
+               /** Namespace serializer */
+               protected final TypeSerializer<N> namespaceSerializer;
+
+               /** Hash of the StateDescriptor, for sanity checks */
+               protected final SD stateDesc;
+
+               public AbstractRocksDBSnapshot(File dbPath,
+                       String checkpointPath,
+                       URI backupUri,
+                       long checkpointId,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       SD stateDesc) {
+                       this.dbPath = dbPath;
+                       this.checkpointPath = checkpointPath;
+                       this.backupUri = backupUri;
+                       this.checkpointId = checkpointId;
+
+                       this.stateDesc = stateDesc;
+                       this.keySerializer = keySerializer;
+                       this.namespaceSerializer = namespaceSerializer;
+               }
+
+               protected abstract KvState<K, N, S, SD, Backend> 
createRocksDBState(TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       SD stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath) throws Exception;
+
+               @Override
+               public final KvState<K, N, S, SD, Backend> restoreState(
+                       Backend stateBackend,
+                       TypeSerializer<K> keySerializer,
+                       ClassLoader classLoader,
+                       long recoveryTimestamp) throws Exception {
+
+                       // validity checks
+                       if (!this.keySerializer.equals(keySerializer)) {
+                               throw new IllegalArgumentException(
+                                       "Cannot restore the state from the 
snapshot with the given serializers. " +
+                                               "State (K/V) was serialized 
with " +
+                                               "(" + keySerializer + ") " +
+                                               "now is (" + keySerializer + 
")");
+                       }
+
+                       if (!dbPath.exists()) {
+                               if (!dbPath.mkdirs()) {
+                                       throw new RuntimeException("Could not 
create RocksDB base path " + dbPath);
+                               }
+                       }
+
+                       FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
+
+                       final File localBackupPath = new File(dbPath, "chk-" + 
checkpointId);
+
+                       if (localBackupPath.exists()) {
+                               try {
+                                       LOG.warn("Deleting already existing 
local backup directory {}.", localBackupPath);
+                                       
FileUtils.deleteDirectory(localBackupPath);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Error 
cleaning RocksDB local backup directory.", e);
+                               }
+                       }
+
+                       HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
+                       return createRocksDBState(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, 
localBackupPath.getAbsolutePath());
+               }
+
+               @Override
+               public final void discardState() throws Exception {
+                       FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
+                       fs.delete(new Path(backupUri), true);
+               }
+
+               @Override
+               public final long getStateSize() throws Exception {
+                       return 0;
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
new file mode 100644
index 0000000..e97e65d
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ListState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of the values in the list state.
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBListState<K, N, V, Backend extends AbstractStateBackend>
+       extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, Backend>
+       implements ListState<V> {
+
+       /** Serializer for the values */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final ListStateDescriptor<V> stateDesc;
+
+       /**
+        * Creates a new {@code RocksDBListState}.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        */
+       protected RocksDBListState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ListStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+               this.stateDesc = requireNonNull(stateDesc);
+               this.valueSerializer = stateDesc.getSerializer();
+       }
+
+       /**
+        * Creates a new {@code RocksDBListState}.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        */
+       protected RocksDBListState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ListStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath,
+               String restorePath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+               this.stateDesc = requireNonNull(stateDesc);
+               this.valueSerializer = stateDesc.getSerializer();
+       }
+
+       @Override
+       public Iterable<V> get() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+
+                       if (valueBytes == null) {
+                               return Collections.emptyList();
+                       }
+
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(valueBytes);
+                       DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+
+                       List<V> result = new ArrayList<>();
+                       while (in.available() > 0) {
+                               result.add(valueSerializer.deserialize(in));
+                               if (in.available() > 0) {
+                                       in.readByte();
+                               }
+                       }
+                       return result;
+               } catch (IOException|RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(V value) throws IOException {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+
+                       baos.reset();
+
+                       valueSerializer.serialize(value, out);
+                       db.merge(key, baos.toByteArray());
+
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       protected KvStateSnapshot<K, N, ListState<V>, ListStateDescriptor<V>, 
Backend> createRocksDBSnapshot(
+               URI backupUri,
+               long checkpointId) {
+               return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
+       }
+
+       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ListState<V>, 
ListStateDescriptor<V>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(File dbPath,
+                       String checkpointPath,
+                       URI backupUri,
+                       long checkpointId,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ListStateDescriptor<V> stateDesc) {
+                       super(dbPath,
+                               checkpointPath,
+                               backupUri,
+                               checkpointId,
+                               keySerializer,
+                               namespaceSerializer,
+                               stateDesc);
+               }
+
+               @Override
+               protected KvState<K, N, ListState<V>, ListStateDescriptor<V>, 
Backend> createRocksDBState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ListStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath) throws Exception {
+                       return new RocksDBListState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
new file mode 100644
index 0000000..eb21c3b
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBReducingState<K, N, V, Backend extends 
AbstractStateBackend>
+       extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend>
+       implements ReducingState<V> {
+
+       /** Serializer for the values */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final ReducingStateDescriptor<V> stateDesc;
+
+       /** User-specified reduce function */
+       private final ReduceFunction<V> reduceFunction;
+
+       /**
+        * Creates a new {@code RocksDBReducingState}.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        */
+       protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ReducingStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+               this.stateDesc = requireNonNull(stateDesc);
+               this.valueSerializer = stateDesc.getSerializer();
+               this.reduceFunction = stateDesc.getReduceFunction();
+       }
+
+       protected RocksDBReducingState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ReducingStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath,
+               String restorePath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+               this.stateDesc = stateDesc;
+               this.valueSerializer = stateDesc.getSerializer();
+               this.reduceFunction = stateDesc.getReduceFunction();
+       }
+
+       @Override
+       public V get() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+                       if (valueBytes == null) {
+                               return null;
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+               } catch (IOException|RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(V value) throws IOException {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+
+                       if (valueBytes == null) {
+                               baos.reset();
+                               valueSerializer.serialize(value, out);
+                               db.put(key, baos.toByteArray());
+                       } else {
+                               V oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+                               V newValue = reduceFunction.reduce(oldValue, 
value);
+                               baos.reset();
+                               valueSerializer.serialize(newValue, out);
+                               db.put(key, baos.toByteArray());
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       protected KvStateSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> createRocksDBSnapshot(
+               URI backupUri,
+               long checkpointId) {
+               return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
+       }
+
+       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(File dbPath,
+                       String checkpointPath,
+                       URI backupUri,
+                       long checkpointId,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ReducingStateDescriptor<V> stateDesc) {
+                       super(dbPath,
+                               checkpointPath,
+                               backupUri,
+                               checkpointId,
+                               keySerializer,
+                               namespaceSerializer,
+                               stateDesc);
+               }
+
+               @Override
+               protected KvState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, Backend> createRocksDBState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ReducingStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath) throws Exception {
+                       return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
new file mode 100644
index 0000000..aaaeea4
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.File;
+import java.io.Serializable;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ */
+public class RocksDBStateBackend extends AbstractStateBackend {
+       private static final long serialVersionUID = 1L;
+
+       /** Base path for RocksDB directory. */
+       private final String dbBasePath;
+
+       /** The checkpoint directory that we snapshot RocksDB backups to. */
+       private final String checkpointDirectory;
+
+       /** Operator identifier that is used to uniqueify the RocksDB storage 
path. */
+       private String operatorIdentifier;
+
+       /** JobID for uniquifying backup paths. */
+       private JobID jobId;
+
+       private AbstractStateBackend backingStateBackend;
+
+       public RocksDBStateBackend(String dbBasePath, String 
checkpointDirectory, AbstractStateBackend backingStateBackend) {
+               this.dbBasePath = requireNonNull(dbBasePath);
+               this.checkpointDirectory = requireNonNull(checkpointDirectory);
+               this.backingStateBackend = requireNonNull(backingStateBackend);
+       }
+
+       @Override
+       public void initializeForJob(Environment env,
+               String operatorIdentifier,
+               TypeSerializer<?> keySerializer) throws Exception {
+               super.initializeForJob(env, operatorIdentifier, keySerializer);
+               this.operatorIdentifier = operatorIdentifier.replace(" ", "");
+               backingStateBackend.initializeForJob(env, operatorIdentifier, 
keySerializer);
+               this.jobId = env.getJobID();
+       }
+
+       @Override
+       public void disposeAllStateForCurrentJob() throws Exception {
+
+       }
+
+       @Override
+       public void close() throws Exception {
+
+       }
+
+       private File getDbPath(String stateName) {
+               return new File(new File(new File(new File(dbBasePath), 
jobId.toShortString()), operatorIdentifier), stateName);
+       }
+
+       private String getCheckpointPath(String stateName) {
+               return checkpointDirectory + "/" + jobId.toShortString() + "/" 
+ operatorIdentifier + "/" + stateName;
+       }
+
+       @Override
+       protected <N, T> ValueState<T> createValueState(TypeSerializer<N> 
namespaceSerializer,
+               ValueStateDescriptor<T> stateDesc) throws Exception {
+               File dbPath = getDbPath(stateDesc.getName());
+               String checkpointPath = getCheckpointPath(stateDesc.getName());
+               return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+       }
+
+       @Override
+       protected <N, T> ListState<T> createListState(TypeSerializer<N> 
namespaceSerializer,
+               ListStateDescriptor<T> stateDesc) throws Exception {
+               File dbPath = getDbPath(stateDesc.getName());
+               String checkpointPath = getCheckpointPath(stateDesc.getName());
+               return new RocksDBListState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+       }
+
+       @Override
+       protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> 
namespaceSerializer,
+               ReducingStateDescriptor<T> stateDesc) throws Exception {
+               File dbPath = getDbPath(stateDesc.getName());
+               String checkpointPath = getCheckpointPath(stateDesc.getName());
+               return new RocksDBReducingState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath);
+       }
+
+       @Override
+       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID,
+               long timestamp) throws Exception {
+               return 
backingStateBackend.createCheckpointStateOutputStream(checkpointID, timestamp);
+       }
+
+       @Override
+       public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(S state,
+               long checkpointID,
+               long timestamp) throws Exception {
+               return backingStateBackend.checkpointStateSerializable(state, 
checkpointID, timestamp);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
new file mode 100644
index 0000000..8767a86
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.rocksdb.RocksDBException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ValueState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ * @param <Backend> The type of the backend that snapshots this key/value 
state.
+ */
+public class RocksDBValueState<K, N, V, Backend extends AbstractStateBackend>
+       extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>, Backend>
+       implements ValueState<V> {
+
+       /** Serializer for the values */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** This holds the name of the state and can create an initial default 
value for the state. */
+       protected final ValueStateDescriptor<V> stateDesc;
+
+       /**
+        * Creates a new {@code RocksDBReducingState}.
+        *
+        * @param keySerializer The serializer for the keys.
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        * @param dbPath The path on the local system where RocksDB data should 
be stored.
+        */
+       protected RocksDBValueState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ValueStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath);
+               this.stateDesc = requireNonNull(stateDesc);
+               this.valueSerializer = stateDesc.getSerializer();
+       }
+
+       protected RocksDBValueState(TypeSerializer<K> keySerializer,
+               TypeSerializer<N> namespaceSerializer,
+               ValueStateDescriptor<V> stateDesc,
+               File dbPath,
+               String backupPath,
+               String restorePath) {
+               super(keySerializer, namespaceSerializer, dbPath, backupPath, 
restorePath);
+               this.stateDesc = stateDesc;
+               this.valueSerializer = stateDesc.getSerializer();
+       }
+
+       @Override
+       public V value() {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       byte[] valueBytes = db.get(key);
+                       if (valueBytes == null) {
+                               return stateDesc.getDefaultValue();
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+               } catch (IOException|RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void update(V value) throws IOException {
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
+               try {
+                       writeKeyAndNamespace(out);
+                       byte[] key = baos.toByteArray();
+                       baos.reset();
+                       valueSerializer.serialize(value, out);
+                       db.put(key, baos.toByteArray());
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       protected KvStateSnapshot<K, N, ValueState<V>, ValueStateDescriptor<V>, 
Backend> createRocksDBSnapshot(
+               URI backupUri,
+               long checkpointId) {
+               return new Snapshot<>(dbPath, checkpointPath, backupUri, 
checkpointId, keySerializer, namespaceSerializer, stateDesc);
+       }
+
+       private static class Snapshot<K, N, V, Backend extends 
AbstractStateBackend> extends AbstractRocksDBSnapshot<K, N, ValueState<V>, 
ValueStateDescriptor<V>, Backend> {
+               private static final long serialVersionUID = 1L;
+
+               public Snapshot(File dbPath,
+                       String checkpointPath,
+                       URI backupUri,
+                       long checkpointId,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ValueStateDescriptor<V> stateDesc) {
+                       super(dbPath,
+                               checkpointPath,
+                               backupUri,
+                               checkpointId,
+                               keySerializer,
+                               namespaceSerializer,
+                               stateDesc);
+               }
+
+               @Override
+               protected KvState<K, N, ValueState<V>, ValueStateDescriptor<V>, 
Backend> createRocksDBState(
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<N> namespaceSerializer,
+                       ValueStateDescriptor<V> stateDesc,
+                       File dbPath,
+                       String backupPath,
+                       String restorePath) throws Exception {
+                       return new RocksDBValueState<>(keySerializer, 
namespaceSerializer, stateDesc, dbPath, checkpointPath, restorePath);
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
new file mode 100644
index 0000000..3b3ac31
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests for the partitioned state part of {@link RocksDBStateBackend}.
+ */
+public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBackend> {
+
+       private File dbDir;
+       private File chkDir;
+
+       @Override
+       protected RocksDBStateBackend getStateBackend() throws IOException {
+               dbDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+               chkDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
+
+               return new RocksDBStateBackend(dbDir.getAbsolutePath(), 
"file://" + chkDir.getAbsolutePath(), new MemoryStateBackend());
+       }
+
+       @Override
+       protected void cleanup() {
+               try {
+                       FileUtils.deleteDirectory(dbDir);
+                       FileUtils.deleteDirectory(chkDir);
+               } catch (IOException ignore) {}
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..4f56748
--- /dev/null
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index 82b6211..76f0f88 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -43,5 +43,6 @@ under the License.
                <module>flink-tweet-inputformat</module>
                <module>flink-operator-stats</module>
                <module>flink-connector-wikiedits</module>
+               <module>flink-statebackend-rocksdb</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java 
b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
new file mode 100644
index 0000000..8e4725c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ExternalProcessRunner.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Utility class for running a class in an external process. This will try to 
find the java
+ * executable in common places and will use the classpath of the current 
process as the classpath
+ * of the new process.
+ *
+ * <p>Attention: The entry point class must be in the classpath of the 
currently running process,
+ * otherwise the newly spawned process will not find it and fail.
+ */
+public class ExternalProcessRunner {
+       private final String entryPointClassName;
+
+       private final Process process;
+
+       final StringWriter errorOutput = new StringWriter();
+
+       /**
+        * Creates a new {@code ProcessRunner} that runs the given class with 
the given parameters.
+        * The class must have a "main" method.
+        */
+       public ExternalProcessRunner(String entryPointClassName, String[] 
parameters) throws IOException {
+               this.entryPointClassName = entryPointClassName;
+
+               String javaCommand = getJavaCommandPath();
+
+               List<String> commandList = new ArrayList<>();
+
+               commandList.add(javaCommand);
+               commandList.add("-classpath");
+               commandList.add(getCurrentClasspath());
+               commandList.add(entryPointClassName);
+
+               Collections.addAll(commandList, parameters);
+
+               process = new ProcessBuilder(commandList).start();
+
+               new PipeForwarder(process.getErrorStream(), errorOutput);
+       }
+
+       /**
+        * Get the stderr stream of the process.
+        */
+       public StringWriter getErrorOutput() {
+               return errorOutput;
+       }
+
+       /**
+        * Start the external process, wait for it to finish and return the 
exit code of that process.
+        *
+        * <p>If this method is interrupted it will destroy the external 
process and forward the
+        * {@code InterruptedException}.
+        */
+       public int run() throws Exception {
+               try {
+                       int returnCode = process.waitFor();
+
+                       if (returnCode != 0) {
+                               // determine whether we failed because of a 
ClassNotFoundException and forward that
+                               if 
(getErrorOutput().toString().contains("Error: Could not find or load main class 
" + entryPointClassName)) {
+                                       throw new 
ClassNotFoundException("Error: Could not find or load main class " + 
entryPointClassName);
+                               }
+
+                       }
+                       return returnCode;
+               } catch (InterruptedException e) {
+                       try {
+                               Class<?> processClass = process.getClass();
+                               Method destroyForcibly = 
processClass.getMethod("destroyForcibly");
+                               destroyForcibly.setAccessible(true);
+                               destroyForcibly.invoke(process);
+                       } catch (NoSuchMethodException ex) {
+                               // we don't have destroyForcibly
+                               process.destroy();
+                       }
+                       throw new InterruptedException("Interrupted while 
waiting for external process.");
+               }
+       }
+
+       /**
+        * Tries to get the java executable command with which the current JVM 
was started.
+        * Returns null, if the command could not be found.
+        *
+        * @return The java executable command.
+        */
+       public static String getJavaCommandPath() {
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder("java", 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return "java";
+                       }
+               }
+               catch (Throwable t) {
+                       // ignore and try the second path
+               }
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder("java.exe", 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return "java.exe";
+                       }
+               }
+               catch (Throwable t) {
+                       // ignore and try the second path
+               }
+
+               File javaHome = new File(System.getProperty("java.home"));
+
+               String path1 = new File(javaHome, "java").getAbsolutePath();
+               String path2 = new File(new File(javaHome, "bin"), 
"java").getAbsolutePath();
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder(path1, 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return path1;
+                       }
+               }
+               catch (Throwable t) {
+                       // ignore and try the second path
+               }
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder(path2, 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return path2;
+                       }
+               }
+               catch (Throwable tt) {
+                       // no luck
+               }
+
+               String path3 = new File(javaHome, "java.exe").getAbsolutePath();
+               String path4 = new File(new File(javaHome, "bin"), 
"java.exe").getAbsolutePath();
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder(path3, 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return path3;
+                       }
+               }
+               catch (Throwable t) {
+                       // ignore and try the second path
+               }
+
+               try {
+                       ProcessBuilder bld = new ProcessBuilder(path4, 
"-version");
+                       Process process = bld.start();
+                       if (process.waitFor() == 0) {
+                               return path4;
+                       }
+               }
+               catch (Throwable tt) {
+                       // no luck
+               }
+               return null;
+       }
+
+       /**
+        * Gets the classpath with which the current JVM was started.
+        *
+        * @return The classpath with which the current JVM was started.
+        */
+       public static String getCurrentClasspath() {
+               RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+               return bean.getClassPath();
+       }
+
+       /**
+        * Utility class to read the output of a process stream and forward it 
into a StringWriter.
+        */
+       public static class PipeForwarder extends Thread {
+
+               private final StringWriter target;
+               private final InputStream source;
+
+               public PipeForwarder(InputStream source, StringWriter target) {
+                       super("Pipe Forwarder");
+                       setDaemon(true);
+
+                       this.source = source;
+                       this.target = target;
+
+                       start();
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               int next;
+                               while ((next = source.read()) != -1) {
+                                       target.write(next);
+                               }
+                       }
+                       catch (IOException e) {
+                               // terminate
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java 
b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
new file mode 100644
index 0000000..cf6780b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * Utility for copying from local file system to a HDFS {@link FileSystem} in 
an external process.
+ * This is required since {@code FileSystem.copyFromLocalFile} does not like 
being interrupted.
+ */
+public class HDFSCopyFromLocal {
+       public static void main(String[] args) throws Exception {
+               String localBackupPath = args[0];
+               String backupUri = args[1];
+
+               FileSystem fs = FileSystem.get(new URI(backupUri), new 
Configuration());
+
+               fs.copyFromLocalFile(new Path(localBackupPath), new 
Path(backupUri));
+       }
+
+       public static void copyFromLocal(File localPath, URI remotePath) throws 
Exception {
+               ExternalProcessRunner processRunner = new 
ExternalProcessRunner(HDFSCopyFromLocal.class.getName(),
+                       new String[]{localPath.getAbsolutePath(), 
remotePath.toString()});
+               if (processRunner.run() != 0) {
+                       throw new  RuntimeException("Error while copying to 
remote FileSystem: " + processRunner.getErrorOutput());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java 
b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
new file mode 100644
index 0000000..813f768
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.net.URI;
+
+/**
+ * Utility for copying from a HDFS {@link FileSystem} to the local file system 
in an external
+ * process. This is required since {@code FileSystem.copyToLocalFile} does not 
like being
+ * interrupted.
+ */
+public class HDFSCopyToLocal {
+       public static void main(String[] args) throws Exception {
+               String backupUri = args[0];
+               String dbPath = args[1];
+
+               FileSystem fs = FileSystem.get(new URI(backupUri), new 
Configuration());
+
+               fs.copyToLocalFile(new Path(backupUri), new Path(dbPath));
+       }
+
+       public static void copyToLocal(URI remotePath, File localPath) throws 
Exception {
+               ExternalProcessRunner processRunner = new 
ExternalProcessRunner(HDFSCopyToLocal.class.getName(),
+                       new String[]{remotePath.toString(), 
localPath.getAbsolutePath()});
+               if (processRunner.run() != 0) {
+                       throw new  RuntimeException("Error while copying from 
remote FileSystem: " + processRunner.getErrorOutput());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java 
b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
new file mode 100644
index 0000000..5ebe772
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/util/ExternalProcessRunnerTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ExternalProcessRunnerTest {
+
+       @Test(expected = ClassNotFoundException.class)
+       public void testClassNotFound() throws Exception {
+               ExternalProcessRunner runner = new 
ExternalProcessRunner("MyClassThatDoesNotExist", new String[]{});
+               runner.run();
+       }
+
+       @Test
+       public void testInterrupting() throws Exception {
+
+               final ExternalProcessRunner runner = new 
ExternalProcessRunner(InfiniteLoop.class.getName(), new String[]{});
+
+               Thread thread = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       runner.run();
+                               } catch (InterruptedException e) {
+                                       // this is expected
+                               } catch (Exception e) {
+                                       fail("Other exception received " + e);
+                               }
+                       }
+               };
+
+               thread.start();
+               thread.interrupt();
+               thread.join();
+       }
+
+       @Test
+       public void testPrintToErr() throws Exception {
+               final ExternalProcessRunner runner = new 
ExternalProcessRunner(PrintToError.class.getName(), new String[]{"hello42"});
+
+               int result = runner.run();
+
+               assertEquals(0, result);
+               assertEquals(runner.getErrorOutput().toString(), "Hello process 
hello42\n");
+       }
+
+       @Test
+       public void testFailing() throws Exception {
+               final ExternalProcessRunner runner = new 
ExternalProcessRunner(Failing.class.getName(), new String[]{});
+
+               int result = runner.run();
+
+               assertEquals(1, result);
+               // this needs to be adapted if the test changes because it 
contains the line number
+               assertEquals(runner.getErrorOutput().toString(), "Exception in 
thread \"main\" java.lang.RuntimeException: HEHE, I'm failing.\n" +
+                       "\tat 
org.apache.flink.util.ExternalProcessRunnerTest$Failing.main(ExternalProcessRunnerTest.java:94)\n");
+       }
+
+
+       public static class InfiniteLoop {
+               public static void main(String[] args) {
+                       while (true) {
+                       }
+               }
+       }
+
+       public static class PrintToError {
+               public static void main(String[] args) {
+                       System.err.println("Hello process " + args[0]);
+               }
+       }
+
+       public static class Failing {
+               public static void main(String[] args) {
+                       throw new RuntimeException("HEHE, I'm failing.");
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 996f5e0..73a3d66 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -175,6 +175,13 @@ under the License.
                        <version>${guava.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/524e56bc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 5886982..9bc0040 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
@@ -112,6 +113,12 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                                String backups = 
tempFolder.newFolder().getAbsolutePath();
                                this.stateBackend = new 
FsStateBackend("file://" + backups);
                                break;
+                       case ROCKSDB:
+                               String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
+                               String rocksDbBackups = 
tempFolder.newFolder().getAbsolutePath();
+
+                               this.stateBackend = new 
RocksDBStateBackend(rocksDb, "file://" + rocksDbBackups, new 
MemoryStateBackend());
+                               break;
                }
        }
 
@@ -739,6 +746,8 @@ public class EventTimeWindowCheckpointingITCase extends 
TestLogger {
                return Arrays.asList(new Object[][] {
                                {StateBackendEnum.MEM},
                                {StateBackendEnum.FILE},
+//                             {StateBackendEnum.DB},
+                               {StateBackendEnum.ROCKSDB}
                        }
                );
        }

Reply via email to