http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
new file mode 100644
index 0000000..e8c34cc
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link MapState} implementation that stores state in RocksDB.
+ *
+ * <p>{@link RocksDBStateBackend} must ensure that we set the
+ * {@link org.rocksdb.StringAppendOperator} on the column family that we use 
for our state since
+ * we use the {@code merge()} call.
+ *
+ * @param <K>  The type of the key.
+ * @param <N>  The type of the namespace.
+ * @param <UK> The type of the keys in the map state.
+ * @param <UV> The type of the values in the map state.
+ */
+public class RocksDBMapState<K, N, UK, UV>
+       extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
+       implements InternalMapState<N, UK, UV> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
+
+       /** Serializer for the keys and values. */
+       private final TypeSerializer<UK> userKeySerializer;
+       private final TypeSerializer<UV> userValueSerializer;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBMapState}.
+        *
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state.
+        */
+       public RocksDBMapState(ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       MapStateDescriptor<UK, UV> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+
+               this.userKeySerializer = stateDesc.getKeySerializer();
+               this.userValueSerializer = stateDesc.getValueSerializer();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  MapState Implementation
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public UV get(UK userKey) throws IOException, RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+               return (rawValueBytes == null ? null : 
deserializeUserValue(rawValueBytes));
+       }
+
+       @Override
+       public void put(UK userKey, UV userValue) throws IOException, 
RocksDBException {
+
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = serializeUserValue(userValue);
+
+               backend.db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
+       }
+
+       @Override
+       public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
+               if (map == null) {
+                       return;
+               }
+
+               for (Map.Entry<UK, UV> entry : map.entrySet()) {
+                       put(entry.getKey(), entry.getValue());
+               }
+       }
+
+       @Override
+       public void remove(UK userKey) throws IOException, RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+
+               backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
+       }
+
+       @Override
+       public boolean contains(UK userKey) throws IOException, 
RocksDBException {
+               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
+               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
+
+               return (rawValueBytes != null);
+       }
+
+       @Override
+       public Iterable<Map.Entry<UK, UV>> entries() throws IOException, 
RocksDBException {
+               final Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+               // Return null to make the behavior consistent with other 
states.
+               if (!iterator.hasNext()) {
+                       return null;
+               } else {
+                       return new Iterable<Map.Entry<UK, UV>>() {
+                               @Override
+                               public Iterator<Map.Entry<UK, UV>> iterator() {
+                                       return iterator;
+                               }
+                       };
+               }
+       }
+
+       @Override
+       public Iterable<UK> keys() throws IOException, RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+               return new Iterable<UK>() {
+                       @Override
+                       public Iterator<UK> iterator() {
+                               return new RocksDBMapIterator<UK>(backend.db, 
prefixBytes) {
+                                       @Override
+                                       public UK next() {
+                                               RocksDBMapEntry entry = 
nextEntry();
+                                               return (entry == null ? null : 
entry.getKey());
+                                       }
+                               };
+                       }
+               };
+       }
+
+       @Override
+       public Iterable<UV> values() throws IOException, RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+               return new Iterable<UV>() {
+                       @Override
+                       public Iterator<UV> iterator() {
+                               return new RocksDBMapIterator<UV>(backend.db, 
prefixBytes) {
+                                       @Override
+                                       public UV next() {
+                                               RocksDBMapEntry entry = 
nextEntry();
+                                               return (entry == null ? null : 
entry.getValue());
+                                       }
+                               };
+                       }
+               };
+       }
+
+       @Override
+       public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, 
RocksDBException {
+               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
+
+               return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, 
prefixBytes) {
+                       @Override
+                       public Map.Entry<UK, UV> next() {
+                               return nextEntry();
+                       }
+               };
+       }
+
+       @Override
+       public void clear() {
+               try {
+                       Iterator<Map.Entry<UK, UV>> iterator = iterator();
+
+                       while (iterator.hasNext()) {
+                               iterator.next();
+                               iterator.remove();
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Error while cleaning the state.", e);
+               }
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
+               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
+
+               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
+               Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace(
+                               serializedKeyAndNamespace,
+                               backend.getKeySerializer(),
+                               namespaceSerializer);
+
+               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
+
+               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(128);
+               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, 
outputStream, outputView);
+               final byte[] keyPrefixBytes = outputStream.toByteArray();
+
+               final Iterator<Map.Entry<UK, UV>> iterator = new 
RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
+                       @Override
+                       public Map.Entry<UK, UV> next() {
+                               return nextEntry();
+                       }
+               };
+
+               // Return null to make the behavior consistent with other 
backends
+               if (!iterator.hasNext()) {
+                       return null;
+               }
+
+               return KvStateSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
+                       @Override
+                       public Iterator<Map.Entry<UK, UV>> iterator() {
+                               return iterator;
+                       }
+               }, userKeySerializer, userValueSerializer);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Serialization Methods
+       // 
------------------------------------------------------------------------
+
+       private byte[] serializeCurrentKeyAndNamespace() throws IOException {
+               writeCurrentKeyWithGroupAndNamespace();
+
+               return keySerializationStream.toByteArray();
+       }
+
+       private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
+               writeCurrentKeyWithGroupAndNamespace();
+               userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
+
+               return keySerializationStream.toByteArray();
+       }
+
+       private byte[] serializeUserValue(UV userValue) throws IOException {
+               keySerializationStream.reset();
+
+               if (userValue == null) {
+                       keySerializationDataOutputView.writeBoolean(true);
+               } else {
+                       keySerializationDataOutputView.writeBoolean(false);
+                       userValueSerializer.serialize(userValue, 
keySerializationDataOutputView);
+               }
+
+               return keySerializationStream.toByteArray();
+       }
+
+       private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
+               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawKeyBytes);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+
+               readKeyWithGroupAndNamespace(bais, in);
+
+               return userKeySerializer.deserialize(in);
+       }
+
+       private UV deserializeUserValue(byte[] rawValueBytes) throws 
IOException {
+               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawValueBytes);
+               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
+
+               boolean isNull = in.readBoolean();
+
+               return isNull ? null : userValueSerializer.deserialize(in);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal Classes
+       // 
------------------------------------------------------------------------
+
+       /** A map entry in RocksDBMapState. */
+       private class RocksDBMapEntry implements Map.Entry<UK, UV> {
+               private final RocksDB db;
+
+               /** The raw bytes of the key stored in RocksDB. Each user key 
is stored in RocksDB
+                * with the format #KeyGroup#Key#Namespace#UserKey. */
+               private final byte[] rawKeyBytes;
+
+               /** The raw bytes of the value stored in RocksDB. */
+               private byte[] rawValueBytes;
+
+               /** True if the entry has been deleted. */
+               private boolean deleted;
+
+               /** The user key and value. The deserialization is performed 
lazily, i.e. the key
+                * and the value is deserialized only when they are accessed. */
+               private UK userKey = null;
+               private UV userValue = null;
+
+               RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, 
final byte[] rawValueBytes) {
+                       this.db = db;
+
+                       this.rawKeyBytes = rawKeyBytes;
+                       this.rawValueBytes = rawValueBytes;
+                       this.deleted = false;
+               }
+
+               public void remove() {
+                       deleted = true;
+                       rawValueBytes = null;
+
+                       try {
+                               db.remove(columnFamily, writeOptions, 
rawKeyBytes);
+                       } catch (RocksDBException e) {
+                               throw new RuntimeException("Error while 
removing data from RocksDB.", e);
+                       }
+               }
+
+               @Override
+               public UK getKey() {
+                       if (userKey == null) {
+                               try {
+                                       userKey = 
deserializeUserKey(rawKeyBytes);
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Error while 
deserializing the user key.");
+                               }
+                       }
+
+                       return userKey;
+               }
+
+               @Override
+               public UV getValue() {
+                       if (deleted) {
+                               return null;
+                       } else {
+                               if (userValue == null) {
+                                       try {
+                                               userValue = 
deserializeUserValue(rawValueBytes);
+                                       } catch (IOException e) {
+                                               throw new 
RuntimeException("Error while deserializing the user value.");
+                                       }
+                               }
+
+                               return userValue;
+                       }
+               }
+
+               @Override
+               public UV setValue(UV value) {
+                       if (deleted) {
+                               throw new IllegalStateException("The value has 
already been deleted.");
+                       }
+
+                       UV oldValue = getValue();
+
+                       try {
+                               userValue = value;
+                               rawValueBytes = serializeUserValue(value);
+
+                               db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
+                       } catch (IOException | RocksDBException e) {
+                               throw new RuntimeException("Error while putting 
data into RocksDB.", e);
+                       }
+
+                       return oldValue;
+               }
+       }
+
+       /** An auxiliary utility to scan all entries under the given key. */
+       private abstract class RocksDBMapIterator<T> implements Iterator<T> {
+
+               static final int CACHE_SIZE_BASE = 1;
+               static final int CACHE_SIZE_LIMIT = 128;
+
+               /** The db where data resides. */
+               private final RocksDB db;
+
+               /**
+                * The prefix bytes of the key being accessed. All entries 
under the same key
+                * has the same prefix, hence we can stop the iterating once 
coming across an
+                * entry with a different prefix.
+                */
+               private final byte[] keyPrefixBytes;
+
+               /**
+                * True if all entries have been accessed or the iterator has 
come across an
+                * entry with a different prefix.
+                */
+               private boolean expired = false;
+
+               /** A in-memory cache for the entries in the rocksdb. */
+               private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
+               private int cacheIndex = 0;
+
+               RocksDBMapIterator(final RocksDB db, final byte[] 
keyPrefixBytes) {
+                       this.db = db;
+                       this.keyPrefixBytes = keyPrefixBytes;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       loadCache();
+
+                       return (cacheIndex < cacheEntries.size());
+               }
+
+               @Override
+               public void remove() {
+                       if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
+                               throw new IllegalStateException("The remove 
operation must be called after an valid next operation.");
+                       }
+
+                       RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
+                       lastEntry.remove();
+               }
+
+               final RocksDBMapEntry nextEntry() {
+                       loadCache();
+
+                       if (cacheIndex == cacheEntries.size()) {
+                               if (!expired) {
+                                       throw new IllegalStateException();
+                               }
+
+                               return null;
+                       }
+
+                       RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+                       cacheIndex++;
+
+                       return entry;
+               }
+
+               private void loadCache() {
+                       if (cacheIndex > cacheEntries.size()) {
+                               throw new IllegalStateException();
+                       }
+
+                       // Load cache entries only when the cache is empty and 
there still exist unread entries
+                       if (cacheIndex < cacheEntries.size() || expired) {
+                               return;
+                       }
+
+                       RocksIterator iterator = db.newIterator(columnFamily);
+
+                       /*
+                        * The iteration starts from the prefix bytes at the 
first loading. The cache then is
+                        * reloaded when the next entry to return is the last 
one in the cache. At that time,
+                        * we will start the iterating from the last returned 
entry.
+                        */
+                       RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? 
null : cacheEntries.get(cacheEntries.size() - 1);
+                       byte[] startBytes = (lastEntry == null ? keyPrefixBytes 
: lastEntry.rawKeyBytes);
+                       int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : 
Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT));
+
+                       cacheEntries.clear();
+                       cacheIndex = 0;
+
+                       iterator.seek(startBytes);
+
+                       /*
+                        * If the last returned entry is not deleted, it will 
be the first entry in the
+                        * iterating. Skip it to avoid redundant access in such 
cases.
+                        */
+                       if (lastEntry != null && !lastEntry.deleted) {
+                               iterator.next();
+                       }
+
+                       while (true) {
+                               if (!iterator.isValid() || 
!underSameKey(iterator.key())) {
+                                       expired = true;
+                                       break;
+                               }
+
+                               if (cacheEntries.size() >= numEntries) {
+                                       break;
+                               }
+
+                               RocksDBMapEntry entry = new RocksDBMapEntry(db, 
iterator.key(), iterator.value());
+                               cacheEntries.add(entry);
+
+                               iterator.next();
+                       }
+
+                       iterator.close();
+               }
+
+               private boolean underSameKey(byte[] rawKeyBytes) {
+                       if (rawKeyBytes.length < keyPrefixBytes.length) {
+                               return false;
+                       }
+
+                       for (int i = 0; i < keyPrefixBytes.length; ++i) {
+                               if (rawKeyBytes[i] != keyPrefixBytes[i]) {
+                                       return false;
+                               }
+                       }
+
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
new file mode 100644
index 0000000..b4c3f51
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * {@link ReducingState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ */
+public class RocksDBReducingState<K, N, V>
+       extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, V>
+       implements InternalReducingState<N, V> {
+
+       /** Serializer for the values. */
+       private final TypeSerializer<V> valueSerializer;
+
+       /** User-specified reduce function. */
+       private final ReduceFunction<V> reduceFunction;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBReducingState}.
+        *
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                     and can create a default state value.
+        */
+       public RocksDBReducingState(ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       ReducingStateDescriptor<V> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+               this.valueSerializer = stateDesc.getSerializer();
+               this.reduceFunction = stateDesc.getReduceFunction();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       @Override
+       public V get() {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] valueBytes = backend.db.get(columnFamily, key);
+                       if (valueBytes == null) {
+                               return null;
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+               } catch (IOException | RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
+               }
+       }
+
+       @Override
+       public void add(V value) throws IOException {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] valueBytes = backend.db.get(columnFamily, key);
+
+                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+                       if (valueBytes == null) {
+                               keySerializationStream.reset();
+                               valueSerializer.serialize(value, out);
+                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       } else {
+                               V oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+                               V newValue = reduceFunction.reduce(oldValue, 
value);
+                               keySerializationStream.reset();
+                               valueSerializer.serialize(newValue, out);
+                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+                       }
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+
+       @Override
+       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
+               if (sources == null || sources.isEmpty()) {
+                       return;
+               }
+
+               // cache key and namespace
+               final K key = backend.getCurrentKey();
+               final int keyGroup = backend.getCurrentKeyGroupIndex();
+
+               try {
+                       V current = null;
+
+                       // merge the sources to the target
+                       for (N source : sources) {
+                               if (source != null) {
+
+                                       writeKeyWithGroupAndNamespace(
+                                                       keyGroup, key, source,
+                                                       keySerializationStream, 
keySerializationDataOutputView);
+
+                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
+                                       final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
+                                       backend.db.delete(columnFamily, 
sourceKey);
+
+                                       if (valueBytes != null) {
+                                               V value = 
valueSerializer.deserialize(
+                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+
+                                               if (current != null) {
+                                                       current = 
reduceFunction.reduce(current, value);
+                                               }
+                                               else {
+                                                       current = value;
+                                               }
+                                       }
+                               }
+                       }
+
+                       // if something came out of merging the sources, merge 
it or write it to the target
+                       if (current != null) {
+                               // create the target full-binary-key
+                               writeKeyWithGroupAndNamespace(
+                                               keyGroup, key, target,
+                                               keySerializationStream, 
keySerializationDataOutputView);
+
+                               final byte[] targetKey = 
keySerializationStream.toByteArray();
+                               final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
+
+                               if (targetValueBytes != null) {
+                                       // target also had a value, merge
+                                       V value = valueSerializer.deserialize(
+                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
+
+                                       current = 
reduceFunction.reduce(current, value);
+                               }
+
+                               // serialize the resulting value
+                               keySerializationStream.reset();
+                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
+
+                               // write the resulting value
+                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
+                       }
+               }
+               catch (Exception e) {
+                       throw new Exception("Error while merging state in 
RocksDB", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
new file mode 100644
index 0000000..b7e4794
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.util.AbstractID;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A State Backend that stores its state in {@code RocksDB}. This state 
backend can
+ * store very large state that exceeds memory and spills to disk.
+ *
+ * <p>All key/value state (including windows) is stored in the key/value index 
of RocksDB.
+ * For persistence against loss of machines, checkpoints take a snapshot of the
+ * RocksDB database, and persist that snapshot in a file system (by default) or
+ * another configurable state backend.
+ *
+ * <p>The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
+ * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
+ * {@link #setOptions(OptionsFactory)}.
+ */
+public class RocksDBStateBackend extends AbstractStateBackend implements 
ConfigurableStateBackend {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
+
+       /** The number of (re)tries for loading the RocksDB JNI library. */
+       private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
+
+       private static boolean rocksDbInitialized = false;
+
+       // 
------------------------------------------------------------------------
+
+       // -- configuration values, set in the application / configuration
+
+       /** The state backend that we use for creating checkpoint streams. */
+       private final StateBackend checkpointStreamBackend;
+
+       /** Base paths for RocksDB directory, as configured.
+        * Null if not yet set, in which case the configuration values will be 
used.
+        * The configuration defaults to the TaskManager's temp directories. */
+       @Nullable
+       private Path[] localRocksDbDirectories;
+
+       /** The pre-configured option settings. */
+       private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
+
+       /** The options factory to create the RocksDB options in the cluster. */
+       @Nullable
+       private OptionsFactory optionsFactory;
+
+       /** True if incremental checkpointing is enabled.
+        * Null if not yet set, in which case the configuration values will be 
used. */
+       @Nullable
+       private Boolean enableIncrementalCheckpointing;
+
+       // -- runtime values, set on TaskManager when initializing / using the 
backend
+
+       /** Base paths for RocksDB directory, as initialized. */
+       private transient File[] initializedDbBasePaths;
+
+       /** JobID for uniquifying backup paths. */
+       private transient JobID jobId;
+
+       /** The index of the next directory to be used from {@link 
#initializedDbBasePaths}.*/
+       private transient int nextDirectory;
+
+       /** Whether we already lazily initialized our local storage 
directories. */
+       private transient boolean isInitialized;
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(String checkpointDataUri) throws IOException 
{
+               this(new Path(checkpointDataUri).toUri());
+       }
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(String checkpointDataUri, boolean 
enableIncrementalCheckpointing) throws IOException {
+               this(new Path(checkpointDataUri).toUri(), 
enableIncrementalCheckpointing);
+       }
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
+               this(new FsStateBackend(checkpointDataUri));
+       }
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(URI checkpointDataUri, boolean 
enableIncrementalCheckpointing) throws IOException {
+               this(new FsStateBackend(checkpointDataUri), 
enableIncrementalCheckpointing);
+       }
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
+        * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
+        * here where the snapshots from RocksDB would be stored.
+        *
+        * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
+        * {@link StateBackend#createCheckpointStorage(JobID)}.
+        *
+        * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
+        */
+       public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
+               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+       }
+
+       /**
+        * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
+        * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
+        * here where the snapshots from RocksDB would be stored.
+        *
+        * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
+        * {@link StateBackend#createCheckpointStorage(JobID)}.
+        *
+        * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
+        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
+        */
+       public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
boolean enableIncrementalCheckpointing) {
+               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+       }
+
+       /**
+        * @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead.
+        */
+       @Deprecated
+       public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend) {
+               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+       }
+
+       /**
+        * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} 
instead.
+        */
+       @Deprecated
+       public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
+               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
+               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+       }
+
+       /**
+        * Private constructor that creates a re-configured copy of the state 
backend.
+        *
+        * @param original The state backend to re-configure.
+        * @param config The configuration.
+        */
+       private RocksDBStateBackend(RocksDBStateBackend original, Configuration 
config) {
+               // reconfigure the state backend backing the streams
+               final StateBackend originalStreamBackend = 
original.checkpointStreamBackend;
+               this.checkpointStreamBackend = originalStreamBackend instanceof 
ConfigurableStateBackend ?
+                               ((ConfigurableStateBackend) 
originalStreamBackend).configure(config) :
+                               originalStreamBackend;
+
+               // configure incremental checkpoints
+               if (original.enableIncrementalCheckpointing != null) {
+                       this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing;
+               }
+               else {
+                       this.enableIncrementalCheckpointing =
+                                       
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
+               }
+
+               // configure local directories
+               if (original.localRocksDbDirectories != null) {
+                       this.localRocksDbDirectories = 
original.localRocksDbDirectories;
+               }
+               else {
+                       final String rocksdbLocalPaths = 
config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
+                       if (rocksdbLocalPaths != null) {
+                               String[] directories = 
rocksdbLocalPaths.split(",|" + File.pathSeparator);
+
+                               try {
+                                       setDbStoragePaths(directories);
+                               }
+                               catch (IllegalArgumentException e) {
+                                       throw new 
IllegalConfigurationException("Invalid configuration for RocksDB state " +
+                                                       "backend's local 
storage directories: " + e.getMessage(), e);
+                               }
+                       }
+               }
+
+               // copy remaining settings
+               this.predefinedOptions = original.predefinedOptions;
+               this.optionsFactory = original.optionsFactory;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Reconfiguration
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Creates a copy of this state backend that uses the values defined in 
the configuration
+        * for fields where that were not yet specified in this state backend.
+        *
+        * @param config the configuration
+        * @return The re-configured variant of the state backend
+        */
+       @Override
+       public RocksDBStateBackend configure(Configuration config) {
+               return new RocksDBStateBackend(this, config);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  State backend methods
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the state backend that this RocksDB state backend uses to 
persist
+        * its bytes to.
+        *
+        * <p>This RocksDB state backend only implements the RocksDB specific 
parts, it
+        * relies on the 'CheckpointBackend' to persist the checkpoint and 
savepoint bytes
+        * streams.
+        */
+       public StateBackend getCheckpointBackend() {
+               return checkpointStreamBackend;
+       }
+
+       private void lazyInitializeForJob(
+                       Environment env,
+                       @SuppressWarnings("unused") String operatorIdentifier) 
throws IOException {
+
+               if (isInitialized) {
+                       return;
+               }
+
+               this.jobId = env.getJobID();
+
+               // initialize the paths where the local RocksDB files should be 
stored
+               if (localRocksDbDirectories == null) {
+                       // initialize from the temp directories
+                       initializedDbBasePaths = 
env.getIOManager().getSpillingDirectories();
+               }
+               else {
+                       List<File> dirs = new 
ArrayList<>(localRocksDbDirectories.length);
+                       String errorMessage = "";
+
+                       for (Path path : localRocksDbDirectories) {
+                               File f = new File(path.toUri().getPath());
+                               File testDir = new File(f, 
UUID.randomUUID().toString());
+                               if (!testDir.mkdirs()) {
+                                       String msg = "Local DB files directory 
'" + path
+                                                       + "' does not exist and 
cannot be created. ";
+                                       LOG.error(msg);
+                                       errorMessage += msg;
+                               } else {
+                                       dirs.add(f);
+                               }
+                               //noinspection ResultOfMethodCallIgnored
+                               testDir.delete();
+                       }
+
+                       if (dirs.isEmpty()) {
+                               throw new IOException("No local storage 
directories available. " + errorMessage);
+                       } else {
+                               initializedDbBasePaths = dirs.toArray(new 
File[dirs.size()]);
+                       }
+               }
+
+               nextDirectory = new 
Random().nextInt(initializedDbBasePaths.length);
+
+               isInitialized = true;
+       }
+
+       private File getNextStoragePath() {
+               int ni = nextDirectory + 1;
+               ni = ni >= initializedDbBasePaths.length ? 0 : ni;
+               nextDirectory = ni;
+
+               return initializedDbBasePaths[ni];
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Checkpoint initialization and persistent storage
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
+               return checkpointStreamBackend.resolveCheckpoint(pointer);
+       }
+
+       @Override
+       public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
+               return checkpointStreamBackend.createCheckpointStorage(jobId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  State holding data structures
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+                       Environment env,
+                       JobID jobID,
+                       String operatorIdentifier,
+                       TypeSerializer<K> keySerializer,
+                       int numberOfKeyGroups,
+                       KeyGroupRange keyGroupRange,
+                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
+
+               // first, make sure that the RocksDB JNI library is loaded
+               // we do this explicitly here to have better error handling
+               String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
+               ensureRocksDBIsLoaded(tempDir);
+
+               lazyInitializeForJob(env, operatorIdentifier);
+
+               File instanceBasePath =
+                               new File(getNextStoragePath(), "job-" + jobId + 
"_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
+
+               return new RocksDBKeyedStateBackend<>(
+                               operatorIdentifier,
+                               env.getUserClassLoader(),
+                               instanceBasePath,
+                               getDbOptions(),
+                               getColumnOptions(),
+                               kvStateRegistry,
+                               keySerializer,
+                               numberOfKeyGroups,
+                               keyGroupRange,
+                               env.getExecutionConfig(),
+                               isIncrementalCheckpointsEnabled());
+       }
+
+       @Override
+       public OperatorStateBackend createOperatorStateBackend(
+                       Environment env,
+                       String operatorIdentifier) throws Exception {
+
+               //the default for RocksDB; eventually there can be a operator 
state backend based on RocksDB, too.
+               final boolean asyncSnapshots = true;
+               return new DefaultOperatorStateBackend(
+                               env.getUserClassLoader(),
+                               env.getExecutionConfig(),
+                               asyncSnapshots);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Parameters
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Sets the path where the RocksDB local database files should be 
stored on the local
+        * file system. Setting this path overrides the default behavior, where 
the
+        * files are stored across the configured temp directories.
+        *
+        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
+        * temp directories will be used.
+        *
+        * @param path The path where the local RocksDB database files are 
stored.
+        */
+       public void setDbStoragePath(String path) {
+               setDbStoragePaths(path == null ? null : new String[] { path });
+       }
+
+       /**
+        * Sets the paths across which the local RocksDB database files are 
distributed on the local
+        * file system. Setting these paths overrides the default behavior, 
where the
+        * files are stored across the configured temp directories.
+        *
+        * <p>Each distinct state will be stored in one path, but when the 
state backend creates
+        * multiple states, they will store their files on different paths.
+        *
+        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
+        * temp directories will be used.
+        *
+        * @param paths The paths across which the local RocksDB database files 
will be spread.
+        */
+       public void setDbStoragePaths(String... paths) {
+               if (paths == null) {
+                       localRocksDbDirectories = null;
+               }
+               else if (paths.length == 0) {
+                       throw new IllegalArgumentException("empty paths");
+               }
+               else {
+                       Path[] pp = new Path[paths.length];
+
+                       for (int i = 0; i < paths.length; i++) {
+                               if (paths[i] == null) {
+                                       throw new 
IllegalArgumentException("null path");
+                               }
+
+                               pp[i] = new Path(paths[i]);
+                               String scheme = pp[i].toUri().getScheme();
+                               if (scheme != null && 
!scheme.equalsIgnoreCase("file")) {
+                                       throw new 
IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
+                               }
+                       }
+
+                       localRocksDbDirectories = pp;
+               }
+       }
+
+       /**
+        *
+        * @return The configured DB storage paths, or null, if none were 
configured.
+        */
+       public String[] getDbStoragePaths() {
+               if (localRocksDbDirectories == null) {
+                       return null;
+               } else {
+                       String[] paths = new 
String[localRocksDbDirectories.length];
+                       for (int i = 0; i < paths.length; i++) {
+                               paths[i] = 
localRocksDbDirectories[i].toString();
+                       }
+                       return paths;
+               }
+       }
+
+       /**
+        * Gets whether incremental checkpoints are enabled for this state 
backend.
+        */
+       public boolean isIncrementalCheckpointsEnabled() {
+               if (enableIncrementalCheckpointing != null) {
+                       return enableIncrementalCheckpointing;
+               }
+               else {
+                       return 
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Parametrize with RocksDB Options
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Sets the predefined options for RocksDB.
+        *
+        * <p>If a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * then the options from the factory are applied on top of the here 
specified
+        * predefined options.
+        *
+        * @param options The options to set (must not be null).
+        */
+       public void setPredefinedOptions(PredefinedOptions options) {
+               predefinedOptions = checkNotNull(options);
+       }
+
+       /**
+        * Gets the currently set predefined options for RocksDB.
+        * The default options (if nothing was set via {@link 
#setPredefinedOptions(PredefinedOptions)})
+        * are {@link PredefinedOptions#DEFAULT}.
+        *
+        * <p>If a user-defined  options factory is set (via {@link 
#setOptions(OptionsFactory)}),
+        * then the options from the factory are applied on top of the 
predefined options.
+        *
+        * @return The currently set predefined options for RocksDB.
+        */
+       public PredefinedOptions getPredefinedOptions() {
+               return predefinedOptions;
+       }
+
+       /**
+        * Sets {@link org.rocksdb.Options} for the RocksDB instances.
+        * Because the options are not serializable and hold native code 
references,
+        * they must be specified through a factory.
+        *
+        * <p>The options created by the factory here are applied on top of the 
pre-defined
+        * options profile selected via {@link 
#setPredefinedOptions(PredefinedOptions)}.
+        * If the pre-defined options profile is the default
+        * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls 
the RocksDB
+        * options.
+        *
+        * @param optionsFactory The options factory that lazily creates the 
RocksDB options.
+        */
+       public void setOptions(OptionsFactory optionsFactory) {
+               this.optionsFactory = optionsFactory;
+       }
+
+       /**
+        * Gets the options factory that lazily creates the RocksDB options.
+        *
+        * @return The options factory.
+        */
+       public OptionsFactory getOptions() {
+               return optionsFactory;
+       }
+
+       /**
+        * Gets the RocksDB {@link DBOptions} to be used for all RocksDB 
instances.
+        */
+       public DBOptions getDbOptions() {
+               // initial options from pre-defined profile
+               DBOptions opt = predefinedOptions.createDBOptions();
+
+               // add user-defined options, if specified
+               if (optionsFactory != null) {
+                       opt = optionsFactory.createDBOptions(opt);
+               }
+
+               // add necessary default options
+               opt = opt.setCreateIfMissing(true);
+
+               return opt;
+       }
+
+       /**
+        * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances.
+        */
+       public ColumnFamilyOptions getColumnOptions() {
+               // initial options from pre-defined profile
+               ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions();
+
+               // add user-defined options, if specified
+               if (optionsFactory != null) {
+                       opt = optionsFactory.createColumnOptions(opt);
+               }
+
+               return opt;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "RocksDBStateBackend{" +
+                               "checkpointStreamBackend=" + 
checkpointStreamBackend +
+                               ", localRocksDbDirectories=" + 
Arrays.toString(localRocksDbDirectories) +
+                               ", enableIncrementalCheckpointing=" + 
enableIncrementalCheckpointing +
+                               '}';
+       }
+
+       // 
------------------------------------------------------------------------
+       //  static library loading utilities
+       // 
------------------------------------------------------------------------
+
+       private void ensureRocksDBIsLoaded(String tempDirectory) throws 
IOException {
+               synchronized (RocksDBStateBackend.class) {
+                       if (!rocksDbInitialized) {
+
+                               final File tempDirParent = new 
File(tempDirectory).getAbsoluteFile();
+                               LOG.info("Attempting to load RocksDB native 
library and store it under '{}'", tempDirParent);
+
+                               Throwable lastException = null;
+                               for (int attempt = 1; attempt <= 
ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
+                                       try {
+                                               // when multiple instances of 
this class and RocksDB exist in different
+                                               // class loaders, then we can 
see the following exception:
+                                               // 
"java.lang.UnsatisfiedLinkError: Native Library 
/path/to/temp/dir/librocksdbjni-linux64.so
+                                               // already loaded in another 
class loader"
+
+                                               // to avoid that, we need to 
add a random element to the library file path
+                                               // (I know, seems like an 
unnecessary hack, since the JVM obviously can handle multiple
+                                               //  instances of the same JNI 
library being loaded in different class loaders, but
+                                               //  apparently not when coming 
from the same file path, so there we go)
+
+                                               final File rocksLibFolder = new 
File(tempDirParent, "rocksdb-lib-" + new AbstractID());
+
+                                               // make sure the temp path 
exists
+                                               LOG.debug("Attempting to create 
RocksDB native library folder {}", rocksLibFolder);
+                                               // noinspection 
ResultOfMethodCallIgnored
+                                               rocksLibFolder.mkdirs();
+
+                                               // explicitly load the JNI 
dependency if it has not been loaded before
+                                               
NativeLibraryLoader.getInstance().loadLibrary(rocksLibFolder.getAbsolutePath());
+
+                                               // this initialization here 
should validate that the loading succeeded
+                                               RocksDB.loadLibrary();
+
+                                               // seems to have worked
+                                               LOG.info("Successfully loaded 
RocksDB native library");
+                                               rocksDbInitialized = true;
+                                               return;
+                                       }
+                                       catch (Throwable t) {
+                                               lastException = t;
+                                               LOG.debug("RocksDB JNI library 
loading attempt {} failed", attempt, t);
+
+                                               // try to force RocksDB to 
attempt reloading the library
+                                               try {
+                                                       
resetRocksDBLoadedFlag();
+                                               } catch (Throwable tt) {
+                                                       LOG.debug("Failed to 
reset 'initialized' flag in RocksDB native code loader", tt);
+                                               }
+                                       }
+                               }
+
+                               throw new IOException("Could not load the 
native RocksDB library", lastException);
+                       }
+               }
+       }
+
+       @VisibleForTesting
+       static void resetRocksDBLoadedFlag() throws Exception {
+               final Field initField = 
org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
+               initField.setAccessible(true);
+               initField.setBoolean(null, false);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
new file mode 100644
index 0000000..94e15fa
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.state.StateBackendFactory;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates an {@link 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend}
+ * from a configuration.
+ */
+public class RocksDBStateBackendFactory implements 
StateBackendFactory<RocksDBStateBackend> {
+
+       @Override
+       public RocksDBStateBackend createFromConfig(Configuration config)
+                       throws IllegalConfigurationException, IOException {
+
+               // we need to explicitly read the checkpoint directory here, 
because that
+               // is a required constructor parameter
+               final String checkpointDirURI = 
config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
+               if (checkpointDirURI == null) {
+                       throw new IllegalConfigurationException(
+                               "Cannot create the RocksDB state backend: The 
configuration does not specify the " +
+                               "checkpoint directory '" + 
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
+               }
+
+               return new 
RocksDBStateBackend(checkpointDirURI).configure(config);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
new file mode 100644
index 0000000..da21e8a
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * {@link ValueState} implementation that stores state in RocksDB.
+ *
+ * @param <K> The type of the key.
+ * @param <N> The type of the namespace.
+ * @param <V> The type of value that the state state stores.
+ */
+public class RocksDBValueState<K, N, V>
+       extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>, V>
+       implements InternalValueState<N, V> {
+
+       /** Serializer for the values. */
+       private final TypeSerializer<V> valueSerializer;
+
+       /**
+        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
+        * because JNI segfaults for some reason if they are.
+        */
+       private final WriteOptions writeOptions;
+
+       /**
+        * Creates a new {@code RocksDBValueState}.
+        *
+        * @param namespaceSerializer The serializer for the namespace.
+        * @param stateDesc The state identifier for the state. This contains 
name
+        *                           and can create a default state value.
+        */
+       public RocksDBValueState(ColumnFamilyHandle columnFamily,
+                       TypeSerializer<N> namespaceSerializer,
+                       ValueStateDescriptor<V> stateDesc,
+                       RocksDBKeyedStateBackend<K> backend) {
+
+               super(columnFamily, namespaceSerializer, stateDesc, backend);
+               this.valueSerializer = stateDesc.getSerializer();
+
+               writeOptions = new WriteOptions();
+               writeOptions.setDisableWAL(true);
+       }
+
+       @Override
+       public V value() {
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       byte[] valueBytes = backend.db.get(columnFamily, key);
+                       if (valueBytes == null) {
+                               return stateDesc.getDefaultValue();
+                       }
+                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+               } catch (IOException | RocksDBException e) {
+                       throw new RuntimeException("Error while retrieving data 
from RocksDB.", e);
+               }
+       }
+
+       @Override
+       public void update(V value) throws IOException {
+               if (value == null) {
+                       clear();
+                       return;
+               }
+               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
+               try {
+                       writeCurrentKeyWithGroupAndNamespace();
+                       byte[] key = keySerializationStream.toByteArray();
+                       keySerializationStream.reset();
+                       valueSerializer.serialize(value, out);
+                       backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
+               } catch (Exception e) {
+                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
new file mode 100644
index 0000000..bae1f81
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.testutils.BackendForTestStream;
+import 
org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory;
+import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.FutureUtil;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for asynchronous RocksDB Key/Value state checkpoints.
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", 
"org.apache.log4j.*"})
+@SuppressWarnings("serial")
+public class RocksDBAsyncSnapshotTest extends TestLogger {
+
+       /**
+        * Temporary fold for test.
+        */
+       @Rule
+       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       /**
+        * This ensures that asynchronous state handles are actually 
materialized asynchronously.
+        *
+        * <p>We use latches to block at various stages and see if the code 
still continues through
+        * the parts that are not asynchronous. If the checkpoint is not done 
asynchronously the
+        * test will simply lock forever.
+        */
+       @Test
+       public void testFullyAsyncSnapshot() throws Exception {
+
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                               OneInputStreamTask::new,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               testHarness.setupOutputForSingletonOperatorChain();
+
+               testHarness.configureForKeyedStream(new KeySelector<String, 
String>() {
+                       @Override
+                       public String getKey(String value) throws Exception {
+                               return value;
+                       }
+               }, BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+
+               File dbDir = temporaryFolder.newFolder();
+
+               RocksDBStateBackend backend = new RocksDBStateBackend(new 
MemoryStateBackend());
+               backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+               streamConfig.setStateBackend(backend);
+
+               streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+               streamConfig.setOperatorID(new OperatorID());
+
+               final OneShotLatch delayCheckpointLatch = new OneShotLatch();
+               final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
+
+               CheckpointResponder checkpointResponderMock = new 
CheckpointResponder() {
+
+                       @Override
+                       public void acknowledgeCheckpoint(
+                               JobID jobID,
+                               ExecutionAttemptID executionAttemptID,
+                               long checkpointId,
+                               CheckpointMetrics checkpointMetrics,
+                               TaskStateSnapshot subtaskState) {
+                               // block on the latch, to verify that 
triggerCheckpoint returns below,
+                               // even though the async checkpoint would not 
finish
+                               try {
+                                       delayCheckpointLatch.await();
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                               boolean hasManagedKeyedState = false;
+                               for (Map.Entry<OperatorID, 
OperatorSubtaskState> entry : subtaskState.getSubtaskStateMappings()) {
+                                       OperatorSubtaskState state = 
entry.getValue();
+                                       if (state != null) {
+                                               hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
+                                       }
+                               }
+
+                               // should be one k/v state
+                               assertTrue(hasManagedKeyedState);
+
+                               // we now know that the checkpoint went through
+                               ensureCheckpointLatch.trigger();
+                       }
+
+                       @Override
+                       public void declineCheckpoint(
+                               JobID jobID, ExecutionAttemptID 
executionAttemptID,
+                               long checkpointId, Throwable cause) {
+
+                       }
+               };
+
+               JobID jobID = new JobID();
+               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(0L, 0L);
+               TestTaskStateManager taskStateManagerTestMock = new 
TestTaskStateManager(
+                       jobID,
+                       executionAttemptID,
+                       checkpointResponderMock);
+
+               StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+                       testHarness.jobConfig,
+                       testHarness.taskConfig,
+                       testHarness.memorySize,
+                       new MockInputSplitProvider(),
+                       testHarness.bufferSize,
+                       taskStateManagerTestMock);
+
+               testHarness.invoke(mockEnv);
+
+               final OneInputStreamTask<String, String> task = 
testHarness.getTask();
+
+               // wait for the task to be running
+               for (Field field: StreamTask.class.getDeclaredFields()) {
+                       if (field.getName().equals("isRunning")) {
+                               field.setAccessible(true);
+                               while (!field.getBoolean(task)) {
+                                       Thread.sleep(10);
+                               }
+                       }
+               }
+
+               task.triggerCheckpoint(new CheckpointMetaData(42, 17), 
CheckpointOptions.forCheckpointWithDefaultLocation());
+
+               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
+               // now we allow the checkpoint
+               delayCheckpointLatch.trigger();
+
+               // wait for the checkpoint to go through
+               ensureCheckpointLatch.await();
+
+               testHarness.endInput();
+
+               ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
+               threadPool.shutdown();
+               Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
+
+               testHarness.waitForTaskCompletion();
+               if (mockEnv.wasFailedExternally()) {
+                       fail("Unexpected exception during execution.");
+               }
+       }
+
+       /**
+        * This tests ensures that canceling of asynchronous snapshots works as 
expected and does not block.
+        */
+       @Test
+       public void testCancelFullyAsyncCheckpoints() throws Exception {
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                               OneInputStreamTask::new,
+                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setupOutputForSingletonOperatorChain();
+
+               testHarness.configureForKeyedStream(value -> value, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+
+               File dbDir = temporaryFolder.newFolder();
+
+               // this is the proper instance that we need to call.
+               BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
+                       new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
+
+                       int count = 1;
+
+                       @Override
+                       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
+                               // we skip the first created stream, because it 
is used to checkpoint the timer service, which is
+                               // currently not asynchronous.
+                               if (count > 0) {
+                                       --count;
+                                       return new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
+                               } else {
+                                       return 
super.createCheckpointStateOutputStream(scope);
+                               }
+                       }
+               };
+
+               // to avoid serialization of the above factory instance, we 
need to pass it in
+               // through a static variable
+
+               StateBackend stateBackend = new BackendForTestStream(new 
StaticForwardFactory(blockerCheckpointStreamFactory));
+
+               RocksDBStateBackend backend = new 
RocksDBStateBackend(stateBackend);
+               backend.setDbStoragePath(dbDir.getAbsolutePath());
+
+               streamConfig.setStateBackend(backend);
+
+               streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+               streamConfig.setOperatorID(new OperatorID());
+
+               TestTaskStateManager taskStateManagerTestMock = new 
TestTaskStateManager();
+
+               StreamMockEnvironment mockEnv = new StreamMockEnvironment(
+                               testHarness.jobConfig,
+                               testHarness.taskConfig,
+                               testHarness.memorySize,
+                               new MockInputSplitProvider(),
+                               testHarness.bufferSize,
+                               taskStateManagerTestMock);
+
+               blockerCheckpointStreamFactory.setBlockerLatch(new 
OneShotLatch());
+               blockerCheckpointStreamFactory.setWaiterLatch(new 
OneShotLatch());
+
+               testHarness.invoke(mockEnv);
+
+               final OneInputStreamTask<String, String> task = 
testHarness.getTask();
+
+               // wait for the task to be running
+               for (Field field: StreamTask.class.getDeclaredFields()) {
+                       if (field.getName().equals("isRunning")) {
+                               field.setAccessible(true);
+                               while (!field.getBoolean(task)) {
+                                       Thread.sleep(10);
+                               }
+                       }
+               }
+
+               task.triggerCheckpoint(
+                       new CheckpointMetaData(42, 17),
+                       CheckpointOptions.forCheckpointWithDefaultLocation());
+
+               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+               blockerCheckpointStreamFactory.getWaiterLatch().await();
+               task.cancel();
+               blockerCheckpointStreamFactory.getBlockerLatch().trigger();
+               testHarness.endInput();
+               
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
+
+               try {
+                       ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
+                       threadPool.shutdown();
+                       Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
+                       testHarness.waitForTaskCompletion();
+
+                       fail("Operation completed. Cancel failed.");
+               } catch (Exception expected) {
+
+                       Throwable cause = expected.getCause();
+
+                       if (!(cause instanceof CancelTaskException)) {
+                               fail("Unexpected exception: " + expected);
+                       }
+               }
+       }
+
+       /**
+        * Test that the snapshot files are cleaned up in case of a failure 
during the snapshot
+        * procedure.
+        */
+       @Test
+       public void testCleanupOfSnapshotsInFailureCase() throws Exception {
+               long checkpointId = 1L;
+               long timestamp = 42L;
+
+               Environment env = new DummyEnvironment("test task", 1, 0);
+
+               final IOException testException = new IOException("Test 
exception");
+               CheckpointStateOutputStream outputStream = spy(new 
FailingStream(testException));
+
+               RocksDBStateBackend backend = new 
RocksDBStateBackend((StateBackend) new MemoryStateBackend());
+
+               
backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString());
+
+               AbstractKeyedStateBackend<Void> keyedStateBackend = 
backend.createKeyedStateBackend(
+                       env,
+                       new JobID(),
+                       "test operator",
+                       VoidSerializer.INSTANCE,
+                       1,
+                       new KeyGroupRange(0, 0),
+                       null);
+
+               try {
+
+                       keyedStateBackend.restore(null);
+
+                       // register a state so that the state backend has to 
checkpoint something
+                       keyedStateBackend.getPartitionedState(
+                               "namespace",
+                               StringSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("foobar", 
String.class));
+
+                       RunnableFuture<KeyedStateHandle> snapshotFuture = 
keyedStateBackend.snapshot(
+                               checkpointId, timestamp,
+                               new TestCheckpointStreamFactory(() -> 
outputStream),
+                               
CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                       try {
+                               FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+                               fail("Expected an exception to be thrown 
here.");
+                       } catch (ExecutionException e) {
+                               Assert.assertEquals(testException, 
e.getCause());
+                       }
+
+                       verify(outputStream).close();
+               } finally {
+                       IOUtils.closeQuietly(keyedStateBackend);
+                       keyedStateBackend.dispose();
+               }
+       }
+
+       @Test
+       public void testConsistentSnapshotSerializationFlagsAndMasks() {
+
+               Assert.assertEquals(0xFFFF, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK);
+               Assert.assertEquals(0x80, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
+
+               byte[] expectedKey = new byte[] {42, 42};
+               byte[] modKey = expectedKey.clone();
+
+               Assert.assertFalse(
+                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
+               
Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey);
+               Assert.assertFalse(
+                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
+
+               Assert.assertTrue(Arrays.equals(expectedKey, modKey));
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class AsyncCheckpointOperator
+               extends AbstractStreamOperator<String>
+               implements OneInputStreamOperator<String, String> {
+
+               @Override
+               public void open() throws Exception {
+                       super.open();
+
+                       // also get the state in open, this way we are sure 
that it was created before
+                       // we trigger the test checkpoint
+                       ValueState<String> state = getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
+
+               }
+
+               @Override
+               public void processElement(StreamRecord<String> element) throws 
Exception {
+                       // we also don't care
+
+                       ValueState<String> state = getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
+
+                       state.update(element.getValue());
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // failing stream
+       // 
------------------------------------------------------------------------
+
+       private static class StaticForwardFactory implements StreamFactory {
+
+               static CheckpointStreamFactory factory;
+
+               StaticForwardFactory(CheckpointStreamFactory factory) {
+                       StaticForwardFactory.factory = factory;
+               }
+
+               @Override
+               public CheckpointStateOutputStream get() throws Exception {
+                       return 
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
+               }
+       }
+
+       private static class FailingStream extends CheckpointStateOutputStream {
+
+               private final IOException testException;
+
+               FailingStream(IOException testException) {
+                       this.testException = testException;
+               }
+
+               @Override
+               public StreamStateHandle closeAndGetHandle() throws IOException 
{
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public void write(int b) throws IOException {
+                       throw testException;
+               }
+
+               @Override
+               public void flush() throws IOException {
+                       throw testException;
+               }
+
+               @Override
+               public void sync() throws IOException {
+                       throw testException;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
new file mode 100644
index 0000000..565f27d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.junit.Test;
+
+/**
+ * This test checks that the RocksDB native code loader still responds to 
resetting the init flag.
+ */
+public class RocksDBInitResetTest {
+
+       @Test
+       public void testResetInitFlag() throws Exception {
+               RocksDBStateBackend.resetRocksDBLoadedFlag();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
new file mode 100644
index 0000000..1d14f6e
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Tests for the RocksDBMergeIterator.
+ */
+public class RocksDBMergeIteratorTest {
+
+       private static final int NUM_KEY_VAL_STATES = 50;
+       private static final int MAX_NUM_KEYS = 20;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Test
+       public void testEmptyMergeIterator() throws IOException {
+               RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
+                               new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2);
+               Assert.assertFalse(emptyIterator.isValid());
+       }
+
+       @Test
+       public void testMergeIteratorByte() throws Exception {
+               Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
+
+               testMergeIterator(Byte.MAX_VALUE);
+       }
+
+       @Test
+       public void testMergeIteratorShort() throws Exception {
+               Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
+
+               testMergeIterator(Short.MAX_VALUE);
+       }
+
+       public void testMergeIterator(int maxParallelism) throws Exception {
+               Random random = new Random(1234);
+
+               RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath());
+               try {
+                       List<Tuple2<RocksIterator, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
+                       List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
+
+                       int totalKeysExpected = 0;
+
+                       for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
+                               ColumnFamilyHandle handle = 
rocksDB.createColumnFamily(
+                                               new 
ColumnFamilyDescriptor(("column-" + 
c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
+
+                               ByteArrayOutputStreamWithPos bos = new 
ByteArrayOutputStreamWithPos();
+                               DataOutputStream dos = new 
DataOutputStream(bos);
+
+                               int numKeys = random.nextInt(MAX_NUM_KEYS + 1);
+
+                               for (int i = 0; i < numKeys; ++i) {
+                                       if (maxParallelism <= Byte.MAX_VALUE) {
+                                               dos.writeByte(i);
+                                       } else {
+                                               dos.writeShort(i);
+                                       }
+                                       dos.writeInt(i);
+                                       byte[] key = bos.toByteArray();
+                                       byte[] val = new byte[]{42};
+                                       rocksDB.put(handle, key, val);
+
+                                       bos.reset();
+                               }
+                               columnFamilyHandlesWithKeyCount.add(new 
Tuple2<>(handle, numKeys));
+                               totalKeysExpected += numKeys;
+                       }
+
+                       int id = 0;
+                       for (Tuple2<ColumnFamilyHandle, Integer> 
columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
+                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id));
+                               ++id;
+                       }
+
+                       RocksDBKeyedStateBackend.RocksDBMergeIterator 
mergeIterator = new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, 
maxParallelism <= Byte.MAX_VALUE ? 1 : 2);
+
+                       int prevKVState = -1;
+                       int prevKey = -1;
+                       int prevKeyGroup = -1;
+                       int totalKeysActual = 0;
+
+                       while (mergeIterator.isValid()) {
+                               ByteBuffer bb = 
ByteBuffer.wrap(mergeIterator.key());
+
+                               int keyGroup = maxParallelism > Byte.MAX_VALUE 
? bb.getShort() : bb.get();
+                               int key = bb.getInt();
+
+                               Assert.assertTrue(keyGroup >= prevKeyGroup);
+                               Assert.assertTrue(key >= prevKey);
+                               Assert.assertEquals(prevKeyGroup != keyGroup, 
mergeIterator.isNewKeyGroup());
+                               Assert.assertEquals(prevKVState != 
mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
+
+                               prevKeyGroup = keyGroup;
+                               prevKVState = mergeIterator.kvStateId();
+
+                               //System.out.println(keyGroup + " " + key + " " 
+ mergeIterator.kvStateId());
+                               mergeIterator.next();
+                               ++totalKeysActual;
+                       }
+
+                       Assert.assertEquals(totalKeysExpected, totalKeysActual);
+
+                       for (Tuple2<ColumnFamilyHandle, Integer> 
handleWithCount : columnFamilyHandlesWithKeyCount) {
+                               rocksDB.dropColumnFamily(handleWithCount.f0);
+                       }
+               } finally {
+                       rocksDB.close();
+               }
+       }
+
+}

Reply via email to