[FLINK-9486][state] Introduce InternalPriorityQueue as state in keyed state 
backends

This commit does not include the integration with checkpointing.

This closes #6276.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/79b38f8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/79b38f8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/79b38f8f

Branch: refs/heads/master
Commit: 79b38f8f9a79b917d525842cf46087c5b8c40f3d
Parents: b12acea
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Jul 4 13:43:49 2018 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Jul 9 16:12:51 2018 +0200

----------------------------------------------------------------------
 .../KVStateRequestSerializerRocksDBTest.java    |  16 +-
 .../network/KvStateRequestSerializerTest.java   |  19 +-
 .../runtime/state/InternalPriorityQueue.java    |  12 +
 .../state/KeyGroupedInternalPriorityQueue.java  |  38 ++++
 .../flink/runtime/state/KeyedStateBackend.java  |   3 +-
 .../flink/runtime/state/PriorityComparator.java |  42 ++++
 .../runtime/state/PriorityQueueSetFactory.java  |  46 ++++
 .../state/TieBreakingPriorityComparator.java    | 122 ++++++++++
 .../state/filesystem/FsStateBackend.java        |   6 +-
 .../heap/CachingInternalPriorityQueueSet.java   |  26 ++-
 .../state/heap/HeapKeyedStateBackend.java       |  28 ++-
 .../runtime/state/heap/HeapPriorityQueue.java   |  35 ++-
 .../state/heap/HeapPriorityQueueSet.java        |  46 ++--
 .../state/heap/HeapPriorityQueueSetFactory.java |  69 ++++++
 .../heap/KeyGroupPartitionedPriorityQueue.java  |  63 ++++--
 .../runtime/state/heap/TreeOrderedSetCache.java |   7 +
 .../state/memory/MemoryStateBackend.java        |   7 +-
 .../state/InternalPriorityQueueTestBase.java    |  12 +-
 .../state/StateSnapshotCompressionTest.java     |  12 +-
 .../state/heap/HeapPriorityQueueSetTest.java    |   2 +-
 .../state/heap/HeapPriorityQueueTest.java       |   2 +-
 .../state/heap/HeapStateBackendTestBase.java    |  10 +-
 .../KeyGroupPartitionedPriorityQueueTest.java   |   2 +-
 .../streaming/state/RockDBBackendOptions.java   |  38 ++++
 .../state/RocksDBKeyedStateBackend.java         | 171 +++++++++++++-
 .../streaming/state/RocksDBOrderedSetStore.java |  13 +-
 .../streaming/state/RocksDBStateBackend.java    |  22 +-
 ...nalPriorityQueueSetWithRocksDBStoreTest.java |   1 -
 .../state/RocksDBOrderedSetStoreTest.java       |   1 -
 .../state/RocksDBStateBackendTest.java          |   3 +-
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../api/operators/HeapInternalTimerService.java |  85 +++----
 .../operators/InternalTimeServiceManager.java   |  80 ++++---
 .../streaming/api/operators/InternalTimer.java  |  22 ++
 .../InternalTimerServiceSerializationProxy.java |  65 +++---
 .../StreamTaskStateInitializerImpl.java         |   1 +
 .../api/operators/TimerHeapInternalTimer.java   |  23 --
 .../api/operators/TimerSerializer.java          | 222 +++++++++++++++++++
 .../operators/HeapInternalTimerServiceTest.java | 138 ++++++++----
 39 files changed, 1234 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index a49fdd2..9ea3198 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.PredefinedOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -74,9 +75,12 @@ public final class KVStateRequestSerializerRocksDBTest {
                                columnFamilyOptions,
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0),
-                               new ExecutionConfig(), false,
-                               TestLocalRecoveryConfig.disabled()
+                               1,
+                               new KeyGroupRange(0, 0),
+                               new ExecutionConfig(),
+                               false,
+                               TestLocalRecoveryConfig.disabled(),
+                               RocksDBStateBackend.PriorityQueueStateType.HEAP
                        );
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
@@ -112,10 +116,12 @@ public final class KVStateRequestSerializerRocksDBTest {
                                columnFamilyOptions,
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0),
+                               1,
+                               new KeyGroupRange(0, 0),
                                new ExecutionConfig(),
                                false,
-                               TestLocalRecoveryConfig.disabled());
+                               TestLocalRecoveryConfig.disabled(),
+                               
RocksDBStateBackend.PriorityQueueStateType.HEAP);
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 2ba7507..73f8831 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
@@ -185,18 +186,19 @@ public class KvStateRequestSerializerTest {
        @Test
        public void testListSerialization() throws Exception {
                final long key = 0L;
-
+               final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
                // objects for heap state list serialisation
                final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
                        new HeapKeyedStateBackend<>(
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
                                ClassLoader.getSystemClassLoader(),
-                               1,
-                               new KeyGroupRange(0, 0),
+                               keyGroupRange.getNumberOfKeyGroups(),
+                               keyGroupRange,
                                async,
                                new ExecutionConfig(),
-                               TestLocalRecoveryConfig.disabled()
+                               TestLocalRecoveryConfig.disabled(),
+                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128)
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -292,18 +294,19 @@ public class KvStateRequestSerializerTest {
        @Test
        public void testMapSerialization() throws Exception {
                final long key = 0L;
-
+               final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
                // objects for heap state list serialisation
                final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
                        new HeapKeyedStateBackend<>(
                                mock(TaskKvStateRegistry.class),
                                LongSerializer.INSTANCE,
                                ClassLoader.getSystemClassLoader(),
-                               1,
-                               new KeyGroupRange(0, 0),
+                               keyGroupRange.getNumberOfKeyGroups(),
+                               keyGroupRange,
                                async,
                                new ExecutionConfig(),
-                               TestLocalRecoveryConfig.disabled()
+                               TestLocalRecoveryConfig.disabled(),
+                               new HeapPriorityQueueSetFactory(keyGroupRange, 
keyGroupRange.getNumberOfKeyGroups(), 128)
                        );
                longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
index fb3ee82..dc46c8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
@@ -26,6 +26,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * Interface for collection that gives in order access to elements w.r.t their 
priority.
@@ -36,6 +38,16 @@ import java.util.Collection;
 public interface InternalPriorityQueue<T> {
 
        /**
+        * Polls from the top of the queue as long as the the queue is not 
empty and passes the elements to
+        * {@link Consumer} until a {@link Predicate} rejects an offered 
element. The rejected element is not
+        * removed from the queue and becomes the new head.
+        *
+        * @param canConsume bulk polling ends once this returns false. The 
rejected element is nor removed and not consumed.
+        * @param consumer consumer function for elements accepted by 
canConsume.
+        */
+       void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull Consumer<T> 
consumer);
+
+       /**
         * Retrieves and removes the first element (w.r.t. the order) of this 
set,
         * or returns {@code null} if this set is empty.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
new file mode 100644
index 0000000..68472e2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/**
+ * This interface exists as (temporary) adapter between the new {@link 
InternalPriorityQueue} and the old way in which
+ * timers are written in a snapshot. This interface can probably go away once 
timer state becomes part of the
+ * keyed state backend snapshot.
+ */
+public interface KeyGroupedInternalPriorityQueue<T> extends 
InternalPriorityQueue<T> {
+
+       /**
+        * Returns the subset of elements in the priority queue that belongs to 
the given key-group, within the operator's
+        * key-group range.
+        */
+       @Nonnull
+       Set<T> getSubsetForKeyGroup(int keyGroupId);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index ad75a1f..7ba14b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -31,7 +31,8 @@ import java.util.stream.Stream;
  *
  * @param <K> The key by which state is keyed.
  */
-public interface KeyedStateBackend<K> extends InternalKeyContext<K>, 
KeyedStateFactory, Disposable {
+public interface KeyedStateBackend<K>
+       extends InternalKeyContext<K>, KeyedStateFactory, 
PriorityQueueSetFactory, Disposable {
 
        /**
         * Sets the current key that is used for partitioned state.

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
new file mode 100644
index 0000000..2f6f5a7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityComparator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+/**
+ * This interface works similar to {@link Comparable} and is used to 
prioritize between two objects. The main difference
+ * between this interface and {@link Comparable} is it is not require to 
follow the usual contract between that
+ * {@link Comparable#compareTo(Object)} and {@link Object#equals(Object)}. The 
contract of this interface is:
+ * When two objects are equal, they indicate the same priority, but indicating 
the same priority does not require that
+ * both objects are equal.
+ *
+ * @param <T> type of the compared objects.
+ */
+@FunctionalInterface
+public interface PriorityComparator<T> {
+
+       /**
+        * Compares two objects for priority. Returns a negative integer, zero, 
or a positive integer as the first
+        * argument has lower, equal to, or higher priority than the second.
+        * @param left left operand in the comparison by priority.
+        * @param right left operand in the comparison by priority.
+        * @return a negative integer, zero, or a positive integer as the first 
argument has lower, equal to, or higher
+        * priority than the second.
+        */
+       int comparePriority(T left, T right);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
new file mode 100644
index 0000000..6f509c0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Factory for {@link KeyGroupedInternalPriorityQueue} instances.
+ */
+public interface PriorityQueueSetFactory {
+
+       /**
+        * Creates a {@link KeyGroupedInternalPriorityQueue}.
+        *
+        * @param stateName                    unique name for associated with 
this queue.
+        * @param byteOrderedElementSerializer a serializer that with a format 
that is lexicographically ordered in
+        *                                     alignment with 
elementPriorityComparator.
+        * @param <T>                          type of the stored elements.
+        * @return the queue with the specified unique name.
+        */
+       @Nonnull
+       <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T> 
create(
+               @Nonnull String stateName,
+               @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
+               @Nonnull KeyExtractorFunction<T> keyExtractor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
new file mode 100644
index 0000000..4384eb7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TieBreakingPriorityComparator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/**
+ * This class is an adapter between {@link PriorityComparator} and a full 
{@link Comparator} that respects the
+ * contract between {@link Comparator#compare(Object, Object)} and {@link 
Object#equals(Object)}. This is currently
+ * needed for implementations of
+ * {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache}
 that are implemented
+ * on top of a data structure that relies on the this contract, e.g. a tree 
set. We should replace this in the near
+ * future.
+ *
+ * @param <T> type of the compared elements.
+ */
+public class TieBreakingPriorityComparator<T> implements Comparator<T>, 
PriorityComparator<T> {
+
+       /** The {@link PriorityComparator} to which we delegate in a first 
step. */
+       @Nonnull
+       private final PriorityComparator<T> priorityComparator;
+
+       /** Serializer for instances of the compared objects. */
+       @Nonnull
+       private final TypeSerializer<T> serializer;
+
+       /** Stream that we use in serialization. */
+       @Nonnull
+       private final ByteArrayOutputStreamWithPos outStream;
+
+       /** {@link org.apache.flink.core.memory.DataOutputView} around 
outStream. */
+       @Nonnull
+       private final DataOutputViewStreamWrapper outView;
+
+       public TieBreakingPriorityComparator(
+               @Nonnull PriorityComparator<T> priorityComparator,
+               @Nonnull TypeSerializer<T> serializer,
+               @Nonnull ByteArrayOutputStreamWithPos outStream,
+               @Nonnull DataOutputViewStreamWrapper outView) {
+
+               this.priorityComparator = priorityComparator;
+               this.serializer = serializer;
+               this.outStream = outStream;
+               this.outView = outView;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public int compare(T o1, T o2) {
+
+               // first we compare priority, this should be the most commonly 
hit case
+               int cmp = priorityComparator.comparePriority(o1, o2);
+
+               if (cmp != 0) {
+                       return cmp;
+               }
+
+               // here we start tie breaking and do our best to comply with 
the compareTo/equals contract, first we try
+               // to simply find an existing way to fully compare.
+               if (o1 instanceof Comparable && 
o1.getClass().equals(o2.getClass())) {
+                       return ((Comparable<T>) o1).compareTo(o2);
+               }
+
+               // we catch this case before moving to more expensive tie 
breaks.
+               if (o1.equals(o2)) {
+                       return 0;
+               }
+
+               // if objects are not equal, their serialized form should 
somehow differ as well. this can be costly, and...
+               // TODO we should have an alternative approach in the future, 
e.g. a cache that does not rely on compare to check equality.
+               try {
+                       outStream.reset();
+                       serializer.serialize(o1, outView);
+                       int leftLen = outStream.getPosition();
+                       serializer.serialize(o2, outView);
+                       int rightLen = outStream.getPosition() - leftLen;
+                       return compareBytes(outStream.getBuf(), 0, leftLen, 
leftLen, rightLen);
+               } catch (IOException ex) {
+                       throw new FlinkRuntimeException("Serializer problem in 
comparator.", ex);
+               }
+       }
+
+       @Override
+       public int comparePriority(T left, T right) {
+               return priorityComparator.comparePriority(left, right);
+       }
+
+       public static int compareBytes(byte[] bytes, int offLeft, int leftLen, 
int offRight, int rightLen) {
+               int maxLen = Math.min(leftLen, rightLen);
+               for (int i = 0; i < maxLen; ++i) {
+                       int cmp = bytes[offLeft + i] - bytes[offRight + i];
+                       if (cmp != 0) {
+                               return cmp;
+                       }
+               }
+               return leftLen - rightLen;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 637effd..ad1581b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.util.TernaryBoolean;
 
 import org.slf4j.LoggerFactory;
@@ -457,6 +458,8 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                LocalRecoveryConfig localRecoveryConfig = 
taskStateManager.createLocalRecoveryConfig();
+               HeapPriorityQueueSetFactory priorityQueueSetFactory =
+                       new HeapPriorityQueueSetFactory(keyGroupRange, 
numberOfKeyGroups, 128);
 
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
@@ -466,7 +469,8 @@ public class FsStateBackend extends 
AbstractFileStateBackend implements Configur
                                keyGroupRange,
                                isUsingAsynchronousSnapshots(),
                                env.getExecutionConfig(),
-                               localRecoveryConfig);
+                               localRecoveryConfig,
+                               priorityQueueSetFactory);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
index 771315d..6dc8cf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
@@ -27,6 +27,8 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * This class is an implementation of a {@link InternalPriorityQueue} with set 
semantics that internally consists of
@@ -76,6 +78,15 @@ public class CachingInternalPriorityQueueSet<E> implements 
InternalPriorityQueue
                return orderedCache.peekFirst();
        }
 
+       @Override
+       public void bulkPoll(@Nonnull Predicate<E> canConsume, @Nonnull 
Consumer<E> consumer) {
+               E element;
+               while ((element = peek()) != null && canConsume.test(element)) {
+                       poll();
+                       consumer.accept(element);
+               }
+       }
+
        @Nullable
        @Override
        public E poll() {
@@ -158,7 +169,11 @@ public class CachingInternalPriorityQueueSet<E> implements 
InternalPriorityQueue
        @Nonnull
        @Override
        public CloseableIterator<E> iterator() {
-               return orderedStore.orderedIterator();
+               if (storeOnlyElements) {
+                       return orderedStore.orderedIterator();
+               } else {
+                       return orderedCache.orderedIterator();
+               }
        }
 
        @Override
@@ -184,7 +199,7 @@ public class CachingInternalPriorityQueueSet<E> implements 
InternalPriorityQueue
                                }
                                storeOnlyElements = iterator.hasNext();
                        } catch (Exception e) {
-                               throw new FlinkRuntimeException("Exception 
while closing RocksDB iterator.", e);
+                               throw new FlinkRuntimeException("Exception 
while refilling store from iterator.", e);
                        }
                }
        }
@@ -249,6 +264,13 @@ public class CachingInternalPriorityQueueSet<E> implements 
InternalPriorityQueue
                 */
                @Nullable
                E peekLast();
+
+               /**
+                * Returns an iterator over the store that returns element in 
order. The iterator must be closed by the client
+                * after usage.
+                */
+               @Nonnull
+               CloseableIterator<E> orderedIterator();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 82ce584..b5b2626 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -44,13 +44,17 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotResult;
@@ -102,6 +106,21 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
HeapFoldingState::create)
                ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
 
+       @Nonnull
+       @Override
+       public <T extends HeapPriorityQueueElement> 
KeyGroupedInternalPriorityQueue<T> create(
+               @Nonnull String stateName,
+               @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
+               @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+               return priorityQueueSetFactory.create(
+                       stateName,
+                       byteOrderedElementSerializer,
+                       elementPriorityComparator,
+                       keyExtractor);
+       }
+
        private interface StateFactory {
                <K, N, SV, S extends State, IS extends S> IS createState(
                        StateDescriptor<S, SV> stateDesc,
@@ -137,6 +156,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        private final HeapSnapshotStrategy snapshotStrategy;
 
+       /**
+        * Factory for state that is organized as priority queue.
+        */
+       private final PriorityQueueSetFactory priorityQueueSetFactory;
+
        public HeapKeyedStateBackend(
                        TaskKvStateRegistry kvStateRegistry,
                        TypeSerializer<K> keySerializer,
@@ -145,7 +169,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        KeyGroupRange keyGroupRange,
                        boolean asynchronousSnapshots,
                        ExecutionConfig executionConfig,
-                       LocalRecoveryConfig localRecoveryConfig) {
+                       LocalRecoveryConfig localRecoveryConfig,
+                       PriorityQueueSetFactory priorityQueueSetFactory) {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
                this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
@@ -157,6 +182,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.snapshotStrategy = new 
HeapSnapshotStrategy(synchronicityTrait);
                LOG.info("Initializing heap keyed state backend with stream 
factory.");
                this.restoredKvStateMetaInfos = new HashMap<>();
+               this.priorityQueueSetFactory = priorityQueueSetFactory;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
index 7017905..e5f610e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.heap;
 
 import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.util.CloseableIterator;
 
 import javax.annotation.Nonnegative;
@@ -27,9 +28,10 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
 
@@ -56,9 +58,9 @@ public class HeapPriorityQueue<T extends 
HeapPriorityQueueElement> implements In
        private static final int QUEUE_HEAD_INDEX = 1;
 
        /**
-        * Comparator for the contained elements.
+        * Comparator for the priority of contained elements.
         */
-       private final Comparator<T> elementComparator;
+       private final PriorityComparator<T> elementPriorityComparator;
 
        /**
         * The array that represents the heap-organized priority queue.
@@ -73,19 +75,28 @@ public class HeapPriorityQueue<T extends 
HeapPriorityQueueElement> implements In
        /**
         * Creates an empty {@link HeapPriorityQueue} with the requested 
initial capacity.
         *
-        * @param elementComparator comparator for the contained elements.
+        * @param elementPriorityComparator comparator for the priority of 
contained elements.
         * @param minimumCapacity the minimum and initial capacity of this 
priority queue.
         */
        @SuppressWarnings("unchecked")
        public HeapPriorityQueue(
-               @Nonnull Comparator<T> elementComparator,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
                @Nonnegative int minimumCapacity) {
 
-               this.elementComparator = elementComparator;
+               this.elementPriorityComparator = elementPriorityComparator;
                this.queue = (T[]) new 
HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
        }
 
        @Override
+       public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull 
Consumer<T> consumer) {
+               T element;
+               while ((element = peek()) != null && canConsume.test(element)) {
+                       poll();
+                       consumer.accept(element);
+               }
+       }
+
+       @Override
        @Nullable
        public T poll() {
                return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : 
null;
@@ -227,7 +238,7 @@ public class HeapPriorityQueue<T extends 
HeapPriorityQueueElement> implements In
                final T currentElement = heap[idx];
                int parentIdx = idx >>> 1;
 
-               while (parentIdx > 0 && isElementLessThen(currentElement, 
heap[parentIdx])) {
+               while (parentIdx > 0 && 
isElementPriorityLessThen(currentElement, heap[parentIdx])) {
                        moveElementToIdx(heap[parentIdx], idx);
                        idx = parentIdx;
                        parentIdx >>>= 1;
@@ -245,19 +256,19 @@ public class HeapPriorityQueue<T extends 
HeapPriorityQueueElement> implements In
                int secondChildIdx = firstChildIdx + 1;
 
                if (isElementIndexValid(secondChildIdx, heapSize) &&
-                       isElementLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
+                       isElementPriorityLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
                        firstChildIdx = secondChildIdx;
                }
 
                while (isElementIndexValid(firstChildIdx, heapSize) &&
-                       isElementLessThen(heap[firstChildIdx], currentElement)) 
{
+                       isElementPriorityLessThen(heap[firstChildIdx], 
currentElement)) {
                        moveElementToIdx(heap[firstChildIdx], idx);
                        idx = firstChildIdx;
                        firstChildIdx = idx << 1;
                        secondChildIdx = firstChildIdx + 1;
 
                        if (isElementIndexValid(secondChildIdx, heapSize) &&
-                               isElementLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
+                               isElementPriorityLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
                                firstChildIdx = secondChildIdx;
                        }
                }
@@ -269,8 +280,8 @@ public class HeapPriorityQueue<T extends 
HeapPriorityQueueElement> implements In
                return elementIndex <= heapSize;
        }
 
-       private boolean isElementLessThen(T a, T b) {
-               return elementComparator.compare(a, b) < 0;
+       private boolean isElementPriorityLessThen(T a, T b) {
+               return elementPriorityComparator.comparePriority(a, b) < 0;
        }
 
        private void moveElementToIdx(T element, int idx) {

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
index 61313e9..79f319c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
@@ -18,20 +18,17 @@
 
 package org.apache.flink.runtime.state.heap;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
 
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -49,7 +46,9 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  *
  * @param <T> type of the contained elements.
  */
-public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement> extends 
HeapPriorityQueue<T> {
+public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement>
+       extends HeapPriorityQueue<T>
+       implements KeyGroupedInternalPriorityQueue<T> {
 
        /**
         * Function to extract the key from contained elements.
@@ -74,7 +73,7 @@ public class HeapPriorityQueueSet<T extends 
HeapPriorityQueueElement> extends He
        /**
         * Creates an empty {@link HeapPriorityQueueSet} with the requested 
initial capacity.
         *
-        * @param elementComparator comparator for the contained elements.
+        * @param elementPriorityComparator comparator for the priority of 
contained elements.
         * @param keyExtractor function to extract a key from the contained 
elements.
         * @param minimumCapacity the minimum and initial capacity of this 
priority queue.
         * @param keyGroupRange the key-group range of the elements in this set.
@@ -82,13 +81,13 @@ public class HeapPriorityQueueSet<T extends 
HeapPriorityQueueElement> extends He
         */
        @SuppressWarnings("unchecked")
        public HeapPriorityQueueSet(
-               @Nonnull Comparator<T> elementComparator,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
                @Nonnull KeyExtractorFunction<T> keyExtractor,
                @Nonnegative int minimumCapacity,
                @Nonnull KeyGroupRange keyGroupRange,
                @Nonnegative int totalNumberOfKeyGroups) {
 
-               super(elementComparator, minimumCapacity);
+               super(elementPriorityComparator, minimumCapacity);
 
                this.keyExtractor = keyExtractor;
 
@@ -147,28 +146,9 @@ public class HeapPriorityQueueSet<T extends 
HeapPriorityQueueElement> extends He
                }
        }
 
-       /**
-        * Returns an unmodifiable set of all elements in the given key-group.
-        */
-       @Nonnull
-       public Set<T> getElementsForKeyGroup(@Nonnegative int keyGroupIdx) {
-               return 
Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet());
-       }
-
-       @VisibleForTesting
-       @SuppressWarnings("unchecked")
-       @Nonnull
-       public List<Set<T>> getElementsByKeyGroup() {
-               List<Set<T>> result = new 
ArrayList<>(deduplicationMapsByKeyGroup.length);
-               for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) {
-                       result.add(i, 
Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet()));
-               }
-               return result;
-       }
-
        private HashMap<T, T> getDedupMapForKeyGroup(
-               @Nonnegative int keyGroupIdx) {
-               return 
deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)];
+               @Nonnegative int keyGroupId) {
+               return 
deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
        }
 
        private HashMap<T, T> getDedupMapForElement(T element) {
@@ -182,4 +162,10 @@ public class HeapPriorityQueueSet<T extends 
HeapPriorityQueueElement> extends He
                checkArgument(keyGroupRange.contains(keyGroup));
                return keyGroup - keyGroupRange.getStartKeyGroup();
        }
+
+       @Nonnull
+       @Override
+       public Set<T> getSubsetForKeyGroup(int keyGroupId) {
+               return getDedupMapForKeyGroup(keyGroupId).keySet();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
new file mode 100644
index 0000000..ee6fda9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ *
+ */
+public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
+
+       @Nonnull
+       private final KeyGroupRange keyGroupRange;
+
+       @Nonnegative
+       private final int totalKeyGroups;
+
+       @Nonnegative
+       private final int minimumCapacity;
+
+       public HeapPriorityQueueSetFactory(
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnegative int totalKeyGroups,
+               @Nonnegative int minimumCapacity) {
+
+               this.keyGroupRange = keyGroupRange;
+               this.totalKeyGroups = totalKeyGroups;
+               this.minimumCapacity = minimumCapacity;
+       }
+
+       @Nonnull
+       @Override
+       public <T extends HeapPriorityQueueElement> 
KeyGroupedInternalPriorityQueue<T> create(
+               @Nonnull String stateName,
+               @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
+               @Nonnull KeyExtractorFunction<T> keyExtractor) {
+               return new HeapPriorityQueueSet<>(
+                       elementPriorityComparator,
+                       keyExtractor,
+                       minimumCapacity,
+                       keyGroupRange,
+                       totalKeyGroups);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
index af4d54f..6f4f911 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
@@ -22,7 +22,10 @@ import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.PriorityComparator;
 import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 
 import javax.annotation.Nonnegative;
@@ -30,7 +33,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 /**
  * This implementation of {@link InternalPriorityQueue} is internally 
partitioned into sub-queues per key-group and
@@ -41,7 +47,7 @@ import java.util.Comparator;
  * @param <PQ> type type of sub-queue used for each key-group partition.
  */
 public class KeyGroupPartitionedPriorityQueue<T, PQ extends 
InternalPriorityQueue<T> & HeapPriorityQueueElement>
-       implements InternalPriorityQueue<T> {
+       implements InternalPriorityQueue<T>, KeyGroupedInternalPriorityQueue<T> 
{
 
        /** A heap of heap sets. Each sub-heap represents the partition for a 
key-group.*/
        @Nonnull
@@ -66,7 +72,7 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends 
InternalPriorityQueu
        @SuppressWarnings("unchecked")
        public KeyGroupPartitionedPriorityQueue(
                @Nonnull KeyExtractorFunction<T> keyExtractor,
-               @Nonnull Comparator<T> elementComparator,
+               @Nonnull PriorityComparator<T> elementPriorityComparator,
                @Nonnull PartitionQueueSetFactory<T, PQ> orderedCacheFactory,
                @Nonnull KeyGroupRange keyGroupRange,
                @Nonnegative int totalKeyGroups) {
@@ -76,16 +82,25 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ extends 
InternalPriorityQueu
                this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
                this.keyGroupedHeaps = (PQ[]) new 
InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
                this.heapOfkeyGroupedHeaps = new HeapPriorityQueue<>(
-                       new 
InternalPriorityQueueComparator<>(elementComparator),
+                       new 
InternalPriorityQueueComparator<>(elementPriorityComparator),
                        keyGroupRange.getNumberOfKeyGroups());
                for (int i = 0; i < keyGroupedHeaps.length; i++) {
                        final PQ keyGroupSubHeap =
-                               orderedCacheFactory.create(firstKeyGroup + i, 
totalKeyGroups, elementComparator);
+                               orderedCacheFactory.create(firstKeyGroup + i, 
totalKeyGroups, elementPriorityComparator);
                        keyGroupedHeaps[i] = keyGroupSubHeap;
                        heapOfkeyGroupedHeaps.add(keyGroupSubHeap);
                }
        }
 
+       @Override
+       public void bulkPoll(@Nonnull Predicate<T> canConsume, @Nonnull 
Consumer<T> consumer) {
+               T element;
+               while ((element = peek()) != null && canConsume.test(element)) {
+                       poll();
+                       consumer.accept(element);
+               }
+       }
+
        @Nullable
        @Override
        public T poll() {
@@ -173,9 +188,28 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ 
extends InternalPriorityQueu
        private int computeKeyGroupIndex(T element) {
                final Object extractKeyFromElement = 
keyExtractor.extractKeyFromElement(element);
                final int keyGroupId = 
KeyGroupRangeAssignment.assignToKeyGroup(extractKeyFromElement, totalKeyGroups);
+               return globalKeyGroupToLocalIndex(keyGroupId);
+       }
+
+       private int globalKeyGroupToLocalIndex(int keyGroupId) {
                return keyGroupId - firstKeyGroup;
        }
 
+       @Nonnull
+       @Override
+       public Set<T> getSubsetForKeyGroup(int keyGroupId) {
+               HashSet<T> result = new HashSet<>();
+               PQ partitionQueue = 
keyGroupedHeaps[globalKeyGroupToLocalIndex(keyGroupId)];
+               try (CloseableIterator<T> iterator = partitionQueue.iterator()) 
{
+                       while (iterator.hasNext()) {
+                               result.add(iterator.next());
+                       }
+               } catch (Exception e) {
+                       throw new FlinkRuntimeException("Exception while 
iterating key group.", e);
+               }
+               return result;
+       }
+
        /**
         * Iterator for {@link KeyGroupPartitionedPriorityQueue}. This iterator 
is not guaranteeing any order of elements.
         * Using code must {@link #close()} after usage.
@@ -236,24 +270,24 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ 
extends InternalPriorityQueu
         * @param <Q> type of queue.
         */
        private static final class InternalPriorityQueueComparator<T, Q extends 
InternalPriorityQueue<T>>
-               implements Comparator<Q> {
+               implements PriorityComparator<Q> {
 
                /** Comparator for the queue elements, so we can compare their 
heads. */
                @Nonnull
-               private final Comparator<T> elementComparator;
+               private final PriorityComparator<T> elementPriorityComparator;
 
-               InternalPriorityQueueComparator(@Nonnull Comparator<T> 
elementComparator) {
-                       this.elementComparator = elementComparator;
+               InternalPriorityQueueComparator(@Nonnull PriorityComparator<T> 
elementPriorityComparator) {
+                       this.elementPriorityComparator = 
elementPriorityComparator;
                }
 
                @Override
-               public int compare(Q o1, Q o2) {
+               public int comparePriority(Q o1, Q o2) {
                        final T left = o1.peek();
                        final T right = o2.peek();
                        if (left == null) {
                                return (right == null ? 0 : 1);
                        } else {
-                               return (right == null ? -1 : 
elementComparator.compare(left, right));
+                               return (right == null ? -1 : 
elementPriorityComparator.comparePriority(left, right));
                        }
                }
        }
@@ -271,10 +305,13 @@ public class KeyGroupPartitionedPriorityQueue<T, PQ 
extends InternalPriorityQueu
                 *
                 * @param keyGroupId the key-group of the elements managed by 
the produced queue.
                 * @param numKeyGroups the total number of key-groups in the 
job.
-                * @param elementComparator the comparator that determines the 
order of the managed elements.
+                * @param elementPriorityComparator the comparator that 
determines the order of managed elements by priority.
                 * @return a new queue for the given key-group.
                 */
                @Nonnull
-               PQS create(@Nonnegative int keyGroupId, @Nonnegative int 
numKeyGroups, @Nonnull Comparator<T> elementComparator);
+               PQS create(
+                       @Nonnegative int keyGroupId,
+                       @Nonnegative int numKeyGroups,
+                       @Nonnull PriorityComparator<T> 
elementPriorityComparator);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
index 0e7d9dd..14c281e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCache.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.heap;
 
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
 import it.unimi.dsi.fastutil.objects.ObjectAVLTreeSet;
@@ -125,4 +126,10 @@ public class TreeOrderedSetCache<E> implements 
CachingInternalPriorityQueueSet.O
        public E peekLast() {
                return !avlTree.isEmpty() ? avlTree.last() : null;
        }
+
+       @Nonnull
+       @Override
+       public CloseableIterator<E> orderedIterator() {
+               return CloseableIterator.adapterForIterator(avlTree.iterator());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 3da60e4..d78944c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import org.apache.flink.util.TernaryBoolean;
 
 import javax.annotation.Nullable;
@@ -309,7 +310,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                        TaskKvStateRegistry kvStateRegistry) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
-
+               HeapPriorityQueueSetFactory priorityQueueSetFactory =
+                       new HeapPriorityQueueSetFactory(keyGroupRange, 
numberOfKeyGroups, 128);
                return new HeapKeyedStateBackend<>(
                                kvStateRegistry,
                                keySerializer,
@@ -318,7 +320,8 @@ public class MemoryStateBackend extends 
AbstractFileStateBackend implements Conf
                                keyGroupRange,
                                isUsingAsynchronousSnapshots(),
                                env.getExecutionConfig(),
-                               taskStateManager.createLocalRecoveryConfig());
+                               taskStateManager.createLocalRecoveryConfig(),
+                               priorityQueueSetFactory);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index c0c3ba4..0cd551c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -51,8 +51,16 @@ public abstract class InternalPriorityQueueTestBase extends 
TestLogger {
 
        protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
        protected static final KeyExtractorFunction<TestElement> 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
-       protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR =
-               
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+       protected static final PriorityComparator<TestElement> 
TEST_ELEMENT_PRIORITY_COMPARATOR =
+               (left, right) -> Long.compare(left.getPriority(), 
right.getPriority());
+       protected static final Comparator<TestElement> TEST_ELEMENT_COMPARATOR 
= (o1, o2) -> {
+               int priorityCmp = 
TEST_ELEMENT_PRIORITY_COMPARATOR.comparePriority(o1, o2);
+               if (priorityCmp != 0) {
+                       return priorityCmp;
+               }
+               // to fully comply with compareTo/equals contract.
+               return Long.compare(o1.getKey(), o2.getKey());
+       };
 
        protected static void insertRandomElements(
                @Nonnull InternalPriorityQueue<TestElement> priorityQueue,

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index 3c06b71..dfcdffc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -53,7 +53,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        new KeyGroupRange(0, 15),
                        true,
                        executionConfig,
-                       TestLocalRecoveryConfig.disabled());
+                       TestLocalRecoveryConfig.disabled(),
+                       mock(PriorityQueueSetFactory.class));
 
                try {
                        Assert.assertTrue(
@@ -75,7 +76,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        new KeyGroupRange(0, 15),
                        true,
                        executionConfig,
-                       TestLocalRecoveryConfig.disabled());
+                       TestLocalRecoveryConfig.disabled(),
+                       mock(PriorityQueueSetFactory.class));
 
                try {
                        Assert.assertTrue(
@@ -115,7 +117,8 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        new KeyGroupRange(0, 15),
                        true,
                        executionConfig,
-                       TestLocalRecoveryConfig.disabled());
+                       TestLocalRecoveryConfig.disabled(),
+                       mock(PriorityQueueSetFactory.class));
 
                try {
 
@@ -156,7 +159,8 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        new KeyGroupRange(0, 15),
                        true,
                        executionConfig,
-                       TestLocalRecoveryConfig.disabled());
+                       TestLocalRecoveryConfig.disabled(),
+                       mock(PriorityQueueSetFactory.class));
                try {
 
                        
stateBackend.restore(StateObjectCollection.singleton(stateHandle));

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
index 618da4e..415497d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetTest.java
@@ -25,7 +25,7 @@ public class HeapPriorityQueueSetTest extends 
HeapPriorityQueueTest {
        @Override
        protected HeapPriorityQueueSet<TestElement> newPriorityQueue(int 
initialCapacity) {
                return new HeapPriorityQueueSet<>(
-                       TEST_ELEMENT_COMPARATOR,
+                       TEST_ELEMENT_PRIORITY_COMPARATOR,
                        KEY_EXTRACTOR_FUNCTION,
                        initialCapacity,
                        KEY_GROUP_RANGE,

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
index 8ffb8b8..6ba5a68 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueTest.java
@@ -89,7 +89,7 @@ public class HeapPriorityQueueTest extends 
InternalPriorityQueueTestBase {
 
        @Override
        protected HeapPriorityQueue<TestElement> newPriorityQueue(int 
initialCapacity) {
-               return new HeapPriorityQueue<>(TEST_ELEMENT_COMPARATOR, 
initialCapacity);
+               return new 
HeapPriorityQueue<>(TEST_ELEMENT_PRIORITY_COMPARATOR, initialCapacity);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index bf428dc..cf6aef4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -49,14 +49,18 @@ public abstract class HeapStateBackendTestBase {
        }
 
        public <K> HeapKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
+               final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 15);
+               final int numKeyGroups = keyGroupRange.getNumberOfKeyGroups();
+
                return new HeapKeyedStateBackend<>(
                        mock(TaskKvStateRegistry.class),
                        keySerializer,
                        HeapStateBackendTestBase.class.getClassLoader(),
-                       16,
-                       new KeyGroupRange(0, 15),
+                       numKeyGroups,
+                       keyGroupRange,
                        async,
                        new ExecutionConfig(),
-                       TestLocalRecoveryConfig.disabled());
+                       TestLocalRecoveryConfig.disabled(),
+                       new HeapPriorityQueueSetFactory(keyGroupRange, 
numKeyGroups, 128));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
index 277de19..d348e10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueueTest.java
@@ -29,7 +29,7 @@ public class KeyGroupPartitionedPriorityQueueTest extends 
InternalPriorityQueueT
        protected InternalPriorityQueue<TestElement> newPriorityQueue(int 
initialCapacity) {
                return new KeyGroupPartitionedPriorityQueue<>(
                        KEY_EXTRACTOR_FUNCTION,
-                       TEST_ELEMENT_COMPARATOR,
+                       TEST_ELEMENT_PRIORITY_COMPARATOR,
                        newFactory(initialCapacity),
                        KEY_GROUP_RANGE, 
KEY_GROUP_RANGE.getNumberOfKeyGroups());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
new file mode 100644
index 0000000..ede45e3
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RockDBBackendOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Configuration options for the RocksDB backend.
+ */
+public class RockDBBackendOptions {
+
+       /**
+        * Choice of implementation for priority queue state (e.g. timers).
+        */
+       public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = 
ConfigOptions
+               .key("backend.rocksdb.priority_queue_state_type")
+               
.defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
+               .withDescription("This determines the implementation for the 
priority queue state (e.g. timers). Options are" +
+                       "either " + 
RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, 
default) or " +
+                       RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() 
+ " for in implementation based on RocksDB.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 21d2a65..f2430ae 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -58,14 +58,18 @@ import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.DoneFuture;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
 import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.SnapshotDirectory;
@@ -76,7 +80,13 @@ import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TieBreakingPriorityComparator;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
+import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -243,6 +253,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** The snapshot strategy, e.g., if we use full or incremental 
checkpoints, local state, and so on. */
        private final SnapshotStrategy<SnapshotResult<KeyedStateHandle>> 
snapshotStrategy;
 
+       /** Factory for priority queue state. */
+       private PriorityQueueSetFactory priorityQueueFactory;
+
        public RocksDBKeyedStateBackend(
                String operatorIdentifier,
                ClassLoader userCodeClassLoader,
@@ -255,7 +268,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig,
                boolean enableIncrementalCheckpointing,
-               LocalRecoveryConfig localRecoveryConfig
+               LocalRecoveryConfig localRecoveryConfig,
+               RocksDBStateBackend.PriorityQueueStateType 
priorityQueueStateType
        ) throws IOException {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
@@ -296,6 +310,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
 
+               switch (priorityQueueStateType) {
+                       case HEAP:
+                               this.priorityQueueFactory = new 
HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
+                               break;
+                       case ROCKS:
+                               this.priorityQueueFactory = new 
RocksDBPriorityQueueSetFactory();
+                               break;
+                       default:
+                               break;
+               }
+
                LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
@@ -378,6 +403,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                IOUtils.closeQuietly(columnMetaData.f0);
                        }
 
+                       // ... then close the priority queue related resources 
...
+                       if (priorityQueueFactory instanceof AutoCloseable) {
+                               IOUtils.closeQuietly((AutoCloseable) 
priorityQueueFactory);
+                       }
+
                        // ... and finally close the DB instance ...
                        IOUtils.closeQuietly(db);
 
@@ -394,6 +424,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
        }
 
+       @Nonnull
+       @Override
+       public <T extends HeapPriorityQueueElement> 
KeyGroupedInternalPriorityQueue<T> create(
+               @Nonnull String stateName,
+               @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+               @Nonnull PriorityComparator<T> elementComparator,
+               @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+               return priorityQueueFactory.create(stateName, 
byteOrderedElementSerializer, elementComparator, keyExtractor);
+       }
+
        private void cleanInstanceBasePath() {
                LOG.info("Deleting existing instance base directory {}.", 
instanceBasePath);
 
@@ -1290,7 +1331,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                namespaceSerializer,
                                stateDesc.getSerializer());
 
-                       ColumnFamilyHandle columnFamily = 
createColumnFamily(stateName);
+                       ColumnFamilyHandle columnFamily = 
createColumnFamily(stateName, db);
 
                        stateInfo = Tuple2.of(columnFamily, newMetaInfo);
                        kvStateInformation.put(stateDesc.getName(), stateInfo);
@@ -1302,7 +1343,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /**
         * Creates a column family handle for use with a k/v state.
         */
-       private ColumnFamilyHandle createColumnFamily(String stateName) throws 
IOException {
+       private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB 
db) {
                byte[] nameBytes = 
stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
                
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
                        "The chosen state name 'default' collides with the name 
of the default column family!");
@@ -1312,7 +1353,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                try {
                        return db.createColumnFamily(columnDescriptor);
                } catch (RocksDBException e) {
-                       throw new IOException("Error creating 
ColumnFamilyHandle.", e);
+                       throw new FlinkRuntimeException("Error creating 
ColumnFamilyHandle.", e);
                }
        }
 
@@ -2579,4 +2620,126 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                ReadOptions readOptions) {
                return new 
RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
        }
+
+       /**
+        * Encapsulates the logic and resources in connection with creating 
priority queue state structures.
+        */
+       class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory, AutoCloseable {
+
+               /** Default cache size per key-group. */
+               private static final int DEFAULT_CACHES_SIZE = 8 * 1024;
+
+               /** A shared buffer to serialize elements for the priority 
queue. */
+               @Nonnull
+               private final ByteArrayOutputStreamWithPos 
elementSerializationOutStream;
+
+               /** A shared adapter wrapper around 
elementSerializationOutStream to become a {@link DataOutputView}. */
+               @Nonnull
+               private final DataOutputViewStreamWrapper 
elementSerializationOutView;
+
+               /** A shared {@link RocksDBWriteBatchWrapper} to batch 
modifications to priority queues. */
+               @Nonnull
+               private final RocksDBWriteBatchWrapper writeBatchWrapper;
+
+               /** Map to track all column families created to back priority 
queues. */
+               @Nonnull
+               private final Map<String, ColumnFamilyHandle> 
priorityQueueColumnFamilies;
+
+               /** The mandatory default column family, so that we can close 
it later. */
+               @Nonnull
+               private final ColumnFamilyHandle defaultColumnFamily;
+
+               /** Path of the RocksDB instance that holds the priority 
queues. */
+               @Nonnull
+               private final File pqInstanceRocksDBPath;
+
+               /** RocksDB instance that holds the priority queues. */
+               @Nonnull
+               private final RocksDB pqDb;
+
+               RocksDBPriorityQueueSetFactory() throws IOException {
+                       this.pqInstanceRocksDBPath = new File(instanceBasePath, 
"pqdb");
+                       if (pqInstanceRocksDBPath.exists()) {
+                               try {
+                                       
FileUtils.deleteDirectory(pqInstanceRocksDBPath);
+                               } catch (IOException ex) {
+                                       LOG.warn("Could not delete instance 
path for PQ RocksDB: " + pqInstanceRocksDBPath, ex);
+                               }
+                       }
+                       List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
+                       this.pqDb = 
openDB(pqInstanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), 
columnFamilyHandles);
+                       this.elementSerializationOutStream = new 
ByteArrayOutputStreamWithPos();
+                       this.elementSerializationOutView = new 
DataOutputViewStreamWrapper(elementSerializationOutStream);
+                       this.writeBatchWrapper = new 
RocksDBWriteBatchWrapper(pqDb, writeOptions);
+                       this.defaultColumnFamily = columnFamilyHandles.get(0);
+                       this.priorityQueueColumnFamilies = new HashMap<>();
+               }
+
+               @Nonnull
+               @Override
+               public <T extends HeapPriorityQueueElement> 
KeyGroupedInternalPriorityQueue<T> create(
+                       @Nonnull String stateName,
+                       @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+                       @Nonnull PriorityComparator<T> 
elementPriorityComparator,
+                       @Nonnull KeyExtractorFunction<T> keyExtractor) {
+
+                       final ColumnFamilyHandle columnFamilyHandle =
+                               priorityQueueColumnFamilies.computeIfAbsent(
+                                       stateName,
+                                       (name) -> 
RocksDBKeyedStateBackend.this.createColumnFamily(name, pqDb));
+
+                       @Nonnull
+                       TieBreakingPriorityComparator<T> tieBreakingComparator =
+                               new TieBreakingPriorityComparator<>(
+                                       elementPriorityComparator,
+                                       byteOrderedElementSerializer,
+                                       elementSerializationOutStream,
+                                       elementSerializationOutView);
+
+                       return new KeyGroupPartitionedPriorityQueue<>(
+                               keyExtractor,
+                               elementPriorityComparator,
+                               new 
KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, 
CachingInternalPriorityQueueSet<T>>() {
+                                       @Nonnull
+                                       @Override
+                                       public 
CachingInternalPriorityQueueSet<T> create(
+                                               int keyGroupId,
+                                               int numKeyGroups,
+                                               @Nonnull PriorityComparator<T> 
elementPriorityComparator) {
+
+                                               
CachingInternalPriorityQueueSet.OrderedSetCache<T> cache =
+                                                       new 
TreeOrderedSetCache<>(tieBreakingComparator, DEFAULT_CACHES_SIZE);
+                                               
CachingInternalPriorityQueueSet.OrderedSetStore<T> store =
+                                                       new 
RocksDBOrderedSetStore<>(
+                                                               keyGroupId,
+                                                               
keyGroupPrefixBytes,
+                                                               pqDb,
+                                                               
columnFamilyHandle,
+                                                               
byteOrderedElementSerializer,
+                                                               
elementSerializationOutStream,
+                                                               
elementSerializationOutView,
+                                                               
writeBatchWrapper);
+
+                                               return new 
CachingInternalPriorityQueueSet<>(cache, store);
+                                       }
+                               },
+                               keyGroupRange,
+                               numberOfKeyGroups);
+               }
+
+               @Override
+               public void close() {
+                       IOUtils.closeQuietly(writeBatchWrapper);
+                       for (ColumnFamilyHandle columnFamilyHandle : 
priorityQueueColumnFamilies.values()) {
+                               IOUtils.closeQuietly(columnFamilyHandle);
+                       }
+                       IOUtils.closeQuietly(defaultColumnFamily);
+                       IOUtils.closeQuietly(pqDb);
+                       try {
+                               
FileUtils.deleteDirectory(pqInstanceRocksDBPath);
+                       } catch (IOException ex) {
+                               LOG.warn("Could not delete instance path for PQ 
RocksDB: " + pqInstanceRocksDBPath, ex);
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
index e512933..5284314 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 
@@ -61,10 +60,6 @@ public class RocksDBOrderedSetStore<T> implements 
CachingInternalPriorityQueueSe
        @Nonnull
        private final ColumnFamilyHandle columnFamilyHandle;
 
-       /** Read options for RocksDB. */
-       @Nonnull
-       private final ReadOptions readOptions;
-
        /**
         * Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
         * aligned with their logical order.
@@ -93,14 +88,12 @@ public class RocksDBOrderedSetStore<T> implements 
CachingInternalPriorityQueueSe
                @Nonnegative int keyGroupPrefixBytes,
                @Nonnull RocksDB db,
                @Nonnull ColumnFamilyHandle columnFamilyHandle,
-               @Nonnull ReadOptions readOptions,
                @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
                @Nonnull ByteArrayOutputStreamWithPos outputStream,
                @Nonnull DataOutputViewStreamWrapper outputView,
                @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
                this.db = db;
                this.columnFamilyHandle = columnFamilyHandle;
-               this.readOptions = readOptions;
                this.byteOrderProducingSerializer = 
byteOrderProducingSerializer;
                this.outputStream = outputStream;
                this.outputView = outputView;
@@ -169,7 +162,7 @@ public class RocksDBOrderedSetStore<T> implements 
CachingInternalPriorityQueueSe
 
                return new RocksToJavaIteratorAdapter(
                        new RocksIteratorWrapper(
-                               db.newIterator(columnFamilyHandle, 
readOptions)));
+                               db.newIterator(columnFamilyHandle)));
        }
 
        /**
@@ -232,6 +225,10 @@ public class RocksDBOrderedSetStore<T> implements 
CachingInternalPriorityQueueSe
                private RocksToJavaIteratorAdapter(@Nonnull 
RocksIteratorWrapper iterator) {
                        this.iterator = iterator;
                        try {
+                               // TODO we could check if it is more efficient 
to make the seek more specific, e.g. with a provided hint
+                               // that is lexicographically closer the first 
expected element in the key-group. I wonder if this could
+                               // help to improve the seek if there are many 
tombstones for elements at the beginning of the key-group
+                               // (like for elements that have been removed in 
previous polling, before they are compacted away).
                                iterator.seek(groupPrefixBytes);
                                deserializeNextElementIfAvailable();
                        } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 81d6265..998521b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -59,6 +59,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
+import static 
org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -76,6 +77,14 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class RocksDBStateBackend extends AbstractStateBackend implements 
ConfigurableStateBackend {
 
+       /**
+        * The options to chose for the type of priority queue state.
+        */
+       public enum PriorityQueueStateType {
+               HEAP,
+               ROCKS
+       }
+
        private static final long serialVersionUID = 1L;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
@@ -109,6 +118,9 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        /** This determines if incremental checkpointing is enabled. */
        private final TernaryBoolean enableIncrementalCheckpointing;
 
+       /** This determines the type of priority queue state. */
+       private final PriorityQueueStateType priorityQueueStateType;
+
        // -- runtime values, set on TaskManager when initializing / using the 
backend
 
        /** Base paths for RocksDB directory, as initialized. */
@@ -221,6 +233,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
        public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
                this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
+               // for now, we use still the heap-based implementation as 
default
+               this.priorityQueueStateType = PriorityQueueStateType.HEAP;
        }
 
        /**
@@ -256,6 +270,11 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing.resolveUndefined(
                        
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));
 
+               final String priorityQueueTypeString = 
config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
+
+               this.priorityQueueStateType = priorityQueueTypeString.length() 
> 0 ?
+                       
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : 
original.priorityQueueStateType;
+
                // configure local directories
                if (original.localRocksDbDirectories != null) {
                        this.localRocksDbDirectories = 
original.localRocksDbDirectories;
@@ -422,7 +441,8 @@ public class RocksDBStateBackend extends 
AbstractStateBackend implements Configu
                                keyGroupRange,
                                env.getExecutionConfig(),
                                isIncrementalCheckpointsEnabled(),
-                               localRecoveryConfig);
+                               localRecoveryConfig,
+                               priorityQueueStateType);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
index ae20cf2..5f26835 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
@@ -57,7 +57,6 @@ public class 
CachingInternalPriorityQueueSetWithRocksDBStoreTest extends Caching
                        prefixBytes,
                        rocksDBResource.getRocksDB(),
                        rocksDBResource.getDefaultColumnFamily(),
-                       rocksDBResource.getReadOptions(),
                        TestElementSerializer.INSTANCE,
                        outputStream,
                        outputView,

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
index 256a83b..0b1d07b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
@@ -124,7 +124,6 @@ public class RocksDBOrderedSetStoreTest {
                        keyGroupPrefixBytes,
                        rocksDBResource.getRocksDB(),
                        rocksDBResource.getDefaultColumnFamily(),
-                       rocksDBResource.getReadOptions(),
                        byteOrderSerializer,
                        outputStreamWithPos,
                        outputView,

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index ad89583..69069d6 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -240,7 +240,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                                new KeyGroupRange(0, 0),
                                new ExecutionConfig(),
                                enableIncrementalCheckpointing,
-                               TestLocalRecoveryConfig.disabled());
+                               TestLocalRecoveryConfig.disabled(),
+                               
RocksDBStateBackend.PriorityQueueStateType.HEAP);
 
                        verify(columnFamilyOptions, Mockito.times(1))
                                
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);

http://git-wip-us.apache.org/repos/asf/flink/blob/79b38f8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9915dd5..797a26a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -730,9 +730,11 @@ public abstract class AbstractStreamOperator<OUT>
                checkTimerServiceInitialization();
 
                // the following casting is to overcome type restrictions.
-               TypeSerializer<K> keySerializer = (TypeSerializer<K>) 
getKeyedStateBackend().getKeySerializer();
+               KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
+               TypeSerializer<K> keySerializer = 
keyedStateBackend.getKeySerializer();
                InternalTimeServiceManager<K> keyedTimeServiceHandler = 
(InternalTimeServiceManager<K>) timeServiceManager;
-               return keyedTimeServiceHandler.getInternalTimerService(name, 
keySerializer, namespaceSerializer, triggerable);
+               TimerSerializer<K, N> timerSerializer = new 
TimerSerializer<>(keySerializer, namespaceSerializer);
+               return keyedTimeServiceHandler.getInternalTimerService(name, 
timerSerializer, triggerable);
        }
 
        public void processWatermark(Watermark mark) throws Exception {

Reply via email to