http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
new file mode 100644
index 0000000..1d627ed
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/OrderedSetCacheTestBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.heap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+import java.util.TreeSet;
+
+/**
+ * Test base for instances of
+ * {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetCache}.
+ */
+public abstract class OrderedSetCacheTestBase {
+
+       @Test
+       public void testOrderedSetCacheContract() {
+               final Random random = new Random(0x42);
+               final int capacity = 5000;
+               final int keySpaceUpperBound = 100 * capacity;
+               final TreeSet<Integer> checkSet = new 
TreeSet<>(Integer::compareTo);
+               final CachingInternalPriorityQueueSet.OrderedSetCache<Integer> 
testInstance = createInstance(capacity);
+
+               Assert.assertTrue(testInstance.isEmpty());
+
+               while (checkSet.size() < capacity) {
+                       Assert.assertEquals(checkSet.size() >= capacity, 
testInstance.isFull());
+                       if (!checkSet.isEmpty() && random.nextInt(10) == 0) {
+                               final int toDelete = 
pickContainedRandomElement(checkSet, random);
+                               Assert.assertTrue(checkSet.remove(toDelete));
+                               testInstance.remove(toDelete);
+                       } else {
+                               final int randomValue = 
random.nextInt(keySpaceUpperBound);
+                               checkSet.add(randomValue);
+                               testInstance.add(randomValue);
+                       }
+                       Assert.assertEquals(checkSet.isEmpty(), 
testInstance.isEmpty());
+
+                       Assert.assertEquals(checkSet.first(), 
testInstance.peekFirst());
+                       Assert.assertEquals(checkSet.last(), 
testInstance.peekLast());
+
+                       
Assert.assertFalse(testInstance.isInLowerBound(checkSet.last()));
+                       
Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 1));
+               }
+
+               Assert.assertTrue(testInstance.isFull());
+               
Assert.assertFalse(testInstance.isInLowerBound(checkSet.last()));
+               Assert.assertTrue(testInstance.isInLowerBound(checkSet.last() - 
1));
+
+               testInstance.remove(pickNotContainedRandomElement(checkSet, 
random, keySpaceUpperBound));
+               Assert.assertTrue(testInstance.isFull());
+
+               int containedKey = pickContainedRandomElement(checkSet, random);
+
+               Assert.assertTrue(checkSet.remove(containedKey));
+               testInstance.remove(containedKey);
+
+               Assert.assertFalse(testInstance.isFull());
+
+               for (int i = 0; i < capacity; ++i) {
+                       if (random.nextInt(1) == 0) {
+                               Assert.assertEquals(checkSet.pollFirst(), 
testInstance.removeFirst());
+                       } else {
+                               Assert.assertEquals(checkSet.pollLast(), 
testInstance.removeLast());
+                       }
+               }
+
+               Assert.assertFalse(testInstance.isFull());
+               Assert.assertTrue(testInstance.isEmpty());
+       }
+
+       private int pickNotContainedRandomElement(TreeSet<Integer> checkSet, 
Random random, int upperBound) {
+               int notContainedKey;
+               do {
+                       notContainedKey = random.nextInt(upperBound);
+               } while (checkSet.contains(notContainedKey));
+               return notContainedKey;
+       }
+
+       private int pickContainedRandomElement(TreeSet<Integer> checkSet, 
Random random) {
+               assert !checkSet.isEmpty();
+               return checkSet.ceiling(1 + random.nextInt(checkSet.last()));
+       }
+
+       protected abstract 
CachingInternalPriorityQueueSet.OrderedSetCache<Integer> createInstance(int 
capacity);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
new file mode 100644
index 0000000..4c4eb46
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/SimpleCachingInternalPriorityQueueSetTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Test for {@link CachingInternalPriorityQueueSet}.
+ */
+public class SimpleCachingInternalPriorityQueueSetTest extends 
CachingInternalPriorityQueueSetTestBase {
+
+       @Override
+       protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> 
createOrderedSetStore() {
+               return new TestOrderedStore<>(TEST_ELEMENT_COMPARATOR);
+       }
+
+       @Override
+       protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> 
createOrderedSetCache() {
+               return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 3);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
new file mode 100644
index 0000000..36a3341
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TestOrderedStore.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnull;
+
+import java.util.Comparator;
+import java.util.TreeSet;
+
+/**
+ * Simple implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * for tests.
+ */
+public class TestOrderedStore<T> implements 
CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+       private final TreeSet<T> treeSet;
+
+       public TestOrderedStore(Comparator<T> comparator) {
+               this.treeSet = new TreeSet<>(comparator);
+       }
+
+       @Override
+       public void add(@Nonnull T element) {
+               treeSet.add(element);
+       }
+
+       @Override
+       public void remove(@Nonnull T element) {
+               treeSet.remove(element);
+       }
+
+       @Override
+       public int size() {
+               return treeSet.size();
+       }
+
+       @Nonnull
+       @Override
+       public CloseableIterator<T> orderedIterator() {
+               return CloseableIterator.adapterForIterator(treeSet.iterator());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
new file mode 100644
index 0000000..cfe823e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/TreeOrderedSetCacheTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+/**
+ * Test for {@link TreeOrderedSetCache}.
+ */
+public class TreeOrderedSetCacheTest extends OrderedSetCacheTestBase {
+
+       @Override
+       protected CachingInternalPriorityQueueSet.OrderedSetCache<Integer> 
createInstance(int capacity) {
+               return new TreeOrderedSetCache<>(Integer::compareTo, capacity);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
index fb7db34..5f1c650 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
@@ -149,4 +149,8 @@ class RocksDBKeySerializationUtils {
        private static byte extractByteAtPosition(int value, int byteIdx) {
                return (byte) ((value >>> (byteIdx << 3)));
        }
+
+       public static int computeRequiredBytesInKeyGroupPrefix(int 
totalKeyGroupsInJob) {
+               return totalKeyGroupsInJob > (Byte.MAX_VALUE + 1) ? 2 : 1;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index e5f443a..21d2a65 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -283,7 +283,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                this.localRecoveryConfig = 
Preconditions.checkNotNull(localRecoveryConfig);
-               this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
+               this.keyGroupPrefixBytes =
+                       
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(getNumberOfKeyGroups());
                this.kvStateInformation = new LinkedHashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
new file mode 100644
index 0000000..e512933
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * <p>IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param <T> the type of stored elements.
+ */
+public class RocksDBOrderedSetStore<T> implements 
CachingInternalPriorityQueueSet.OrderedSetStore<T> {
+
+       /** Serialized empty value to insert into RocksDB. */
+       private static final byte[] DUMMY_BYTES = new byte[] {0};
+
+       /** The RocksDB instance that serves as store. */
+       @Nonnull
+       private final RocksDB db;
+
+       /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+       @Nonnull
+       private final ColumnFamilyHandle columnFamilyHandle;
+
+       /** Read options for RocksDB. */
+       @Nonnull
+       private final ReadOptions readOptions;
+
+       /**
+        * Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+        * aligned with their logical order.
+        */
+       @Nonnull
+       private final TypeSerializer<T> byteOrderProducingSerializer;
+
+       /** Wrapper to batch all writes to RocksDB. */
+       @Nonnull
+       private final RocksDBWriteBatchWrapper batchWrapper;
+
+       /** The key-group id in serialized form. */
+       @Nonnull
+       private final byte[] groupPrefixBytes;
+
+       /** Output stream that helps to serialize elements. */
+       @Nonnull
+       private final ByteArrayOutputStreamWithPos outputStream;
+
+       /** Output view that helps to serialize elements, must wrap the output 
stream. */
+       @Nonnull
+       private final DataOutputViewStreamWrapper outputView;
+
+       public RocksDBOrderedSetStore(
+               @Nonnegative int keyGroupId,
+               @Nonnegative int keyGroupPrefixBytes,
+               @Nonnull RocksDB db,
+               @Nonnull ColumnFamilyHandle columnFamilyHandle,
+               @Nonnull ReadOptions readOptions,
+               @Nonnull TypeSerializer<T> byteOrderProducingSerializer,
+               @Nonnull ByteArrayOutputStreamWithPos outputStream,
+               @Nonnull DataOutputViewStreamWrapper outputView,
+               @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+               this.db = db;
+               this.columnFamilyHandle = columnFamilyHandle;
+               this.readOptions = readOptions;
+               this.byteOrderProducingSerializer = 
byteOrderProducingSerializer;
+               this.outputStream = outputStream;
+               this.outputView = outputView;
+               this.batchWrapper = batchWrapper;
+               this.groupPrefixBytes = createKeyGroupBytes(keyGroupId, 
keyGroupPrefixBytes);
+       }
+
+       private byte[] createKeyGroupBytes(int keyGroupId, int numPrefixBytes) {
+
+               outputStream.reset();
+
+               try {
+                       RocksDBKeySerializationUtils.writeKeyGroup(keyGroupId, 
numPrefixBytes, outputView);
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Could not write 
key-group bytes.", e);
+               }
+
+               return outputStream.toByteArray();
+       }
+
+       @Override
+       public void add(@Nonnull T element) {
+               byte[] elementBytes = serializeElement(element);
+               try {
+                       batchWrapper.put(columnFamilyHandle, elementBytes, 
DUMMY_BYTES);
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException("Error while getting 
element from RocksDB.", e);
+               }
+       }
+
+       @Override
+       public void remove(@Nonnull T element) {
+               byte[] elementBytes = serializeElement(element);
+               try {
+                       batchWrapper.remove(columnFamilyHandle, elementBytes);
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException("Error while removing 
element from RocksDB.", e);
+               }
+       }
+
+       /**
+        * This implementation comes at a relatively high cost per invocation. 
It should not be called repeatedly when it is
+        * clear that the value did not change. Currently this is only truly 
used to realize certain higher-level tests.
+        *
+        * @see 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore
+        */
+       @Override
+       public int size() {
+
+               int count = 0;
+               try (final RocksToJavaIteratorAdapter iterator = 
orderedIterator()) {
+                       while (iterator.hasNext()) {
+                               iterator.next();
+                               ++count;
+                       }
+               }
+
+               return count;
+       }
+
+       @Nonnull
+       @Override
+       public RocksToJavaIteratorAdapter orderedIterator() {
+
+               flushWriteBatch();
+
+               return new RocksToJavaIteratorAdapter(
+                       new RocksIteratorWrapper(
+                               db.newIterator(columnFamilyHandle, 
readOptions)));
+       }
+
+       /**
+        * Ensures that recent writes are flushed and reflect in the RocksDB 
instance.
+        */
+       private void flushWriteBatch() {
+               try {
+                       batchWrapper.flush();
+               } catch (RocksDBException e) {
+                       throw new FlinkRuntimeException(e);
+               }
+       }
+
+       private static boolean isPrefixWith(byte[] bytes, byte[] prefixBytes) {
+               for (int i = 0; i < prefixBytes.length; ++i) {
+                       if (bytes[i] != prefixBytes[i]) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private byte[] serializeElement(T element) {
+               try {
+                       outputStream.reset();
+                       outputView.write(groupPrefixBytes);
+                       byteOrderProducingSerializer.serialize(element, 
outputView);
+                       return outputStream.toByteArray();
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Error while 
serializing the element.", e);
+               }
+       }
+
+       private T deserializeElement(byte[] bytes) {
+               try {
+                       // TODO introduce a stream in which we can change the 
internal byte[] to avoid creating instances per call
+                       ByteArrayInputStreamWithPos inputStream = new 
ByteArrayInputStreamWithPos(bytes);
+                       DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(inputStream);
+                       inputView.skipBytes(groupPrefixBytes.length);
+                       return 
byteOrderProducingSerializer.deserialize(inputView);
+               } catch (IOException e) {
+                       throw new FlinkRuntimeException("Error while 
deserializing the element.", e);
+               }
+       }
+
+       /**
+        * Adapter between RocksDB iterator and Java iterator. This is also 
closeable to release the native resources after
+        * use.
+        */
+       private class RocksToJavaIteratorAdapter implements 
CloseableIterator<T> {
+
+               /** The RocksDb iterator to which we forward ops. */
+               @Nonnull
+               private final RocksIteratorWrapper iterator;
+
+               /** Cache for the current element of the iteration. */
+               @Nullable
+               private T currentElement;
+
+               private RocksToJavaIteratorAdapter(@Nonnull 
RocksIteratorWrapper iterator) {
+                       this.iterator = iterator;
+                       try {
+                               iterator.seek(groupPrefixBytes);
+                               deserializeNextElementIfAvailable();
+                       } catch (Exception ex) {
+                               // ensure resource cleanup also in the face of 
(runtime) exceptions in the constructor.
+                               iterator.close();
+                               throw new FlinkRuntimeException("Could not 
initialize ordered iterator.", ex);
+                       }
+               }
+
+               @Override
+               public void close() {
+                       iterator.close();
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return currentElement != null;
+               }
+
+               @Override
+               public T next() {
+                       final T returnElement = this.currentElement;
+                       if (returnElement == null) {
+                               throw new NoSuchElementException("Iterator has 
no more elements!");
+                       }
+                       iterator.next();
+                       deserializeNextElementIfAvailable();
+                       return returnElement;
+               }
+
+               private void deserializeNextElementIfAvailable() {
+                       if (iterator.isValid()) {
+                               final byte[] elementBytes = iterator.key();
+                               if (isPrefixWith(elementBytes, 
groupPrefixBytes)) {
+                                       this.currentElement = 
deserializeElement(elementBytes);
+                               } else {
+                                       this.currentElement = null;
+                               }
+                       } else {
+                               this.currentElement = null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
new file mode 100644
index 0000000..ae20cf2
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/CachingInternalPriorityQueueSetWithRocksDBStoreTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSetTestBase;
+import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+
+import org.junit.Rule;
+
+/**
+ * Test for {@link CachingInternalPriorityQueueSet} with a
+ * {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
 based on RocksDB.
+ */
+public class CachingInternalPriorityQueueSetWithRocksDBStoreTest extends 
CachingInternalPriorityQueueSetTestBase {
+
+       @Rule
+       public final RocksDBResource rocksDBResource = new RocksDBResource();
+
+       @Override
+       protected CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> 
createOrderedSetStore() {
+               return createRocksDBStore(0, 1, rocksDBResource);
+       }
+
+       @Override
+       protected CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> 
createOrderedSetCache() {
+               return new TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
+       }
+
+       public static 
CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> createRocksDBStore(
+               int keyGroupId,
+               int totalKeyGroups,
+               RocksDBResource rocksDBResource) {
+               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(16);
+               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
+               int prefixBytes = 
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
+               return new RocksDBOrderedSetStore<>(
+                       keyGroupId,
+                       prefixBytes,
+                       rocksDBResource.getRocksDB(),
+                       rocksDBResource.getDefaultColumnFamily(),
+                       rocksDBResource.getReadOptions(),
+                       TestElementSerializer.INSTANCE,
+                       outputStream,
+                       outputView,
+                       rocksDBResource.getBatchWrapper());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
new file mode 100644
index 0000000..42782a4
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
+import 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueueTest;
+import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+
+import org.junit.Rule;
+
+/**
+ * Test of {@link KeyGroupPartitionedPriorityQueue} powered by a {@link 
RocksDBOrderedSetStore}.
+ */
+public class KeyGroupPartitionedPriorityQueueWithRocksDBStoreTest extends 
KeyGroupPartitionedPriorityQueueTest {
+
+       @Rule
+       public final RocksDBResource rocksDBResource = new RocksDBResource();
+
+       @Override
+       protected KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<
+                       TestElement, 
CachingInternalPriorityQueueSet<TestElement>> newFactory(
+               int initialCapacity) {
+
+               return (keyGroupId, numKeyGroups, elementComparator) -> {
+                       
CachingInternalPriorityQueueSet.OrderedSetCache<TestElement> cache =
+                               new 
TreeOrderedSetCache<>(TEST_ELEMENT_COMPARATOR, 32);
+                       
CachingInternalPriorityQueueSet.OrderedSetStore<TestElement> store =
+                               
RocksDBOrderedSetStoreTest.createRocksDBOrderedStore(
+                                       rocksDBResource,
+                                       TestElementSerializer.INSTANCE,
+                                       keyGroupId,
+                                       numKeyGroups);
+                       return new CachingInternalPriorityQueueSet<>(cache, 
store);
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
new file mode 100644
index 0000000..256a83b
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStoreTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.util.NoSuchElementException;
+
+/**
+ * Test for RocksDBOrderedStore.
+ */
+public class RocksDBOrderedSetStoreTest {
+
+       @Rule
+       public final RocksDBResource rocksDBResource = new RocksDBResource();
+
+       @Test
+       public void testOrderedIterator() throws Exception {
+               CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store 
= createRocksDBOrderedStore();
+
+               //test empty iterator
+               try (final CloseableIterator<Integer> emptyIterator = 
store.orderedIterator()) {
+                       Assert.assertFalse(emptyIterator.hasNext());
+                       try {
+                               emptyIterator.next();
+                               Assert.fail();
+                       } catch (NoSuchElementException expected) {
+                       }
+               }
+
+               store.add(43);
+               store.add(42);
+               store.add(41);
+               store.add(41);
+               store.remove(42);
+
+               // test in-order iteration
+               try (final CloseableIterator<Integer> iterator = 
store.orderedIterator()) {
+                       Assert.assertTrue(iterator.hasNext());
+                       Assert.assertEquals(Integer.valueOf(41), 
iterator.next());
+                       Assert.assertTrue(iterator.hasNext());
+                       Assert.assertEquals(Integer.valueOf(43), 
iterator.next());
+                       Assert.assertFalse(iterator.hasNext());
+                       try {
+                               iterator.next();
+                               Assert.fail();
+                       } catch (NoSuchElementException expected) {
+                       }
+               }
+       }
+
+       @Test
+       public void testAddRemoveSize() {
+
+               CachingInternalPriorityQueueSet.OrderedSetStore<Integer> store 
= createRocksDBOrderedStore();
+
+               // test empty size
+               Assert.assertEquals(0, store.size());
+
+               // test add uniques
+               store.remove(41);
+               Assert.assertEquals(0, store.size());
+               store.add(41);
+               Assert.assertEquals(1, store.size());
+               store.add(42);
+               Assert.assertEquals(2, store.size());
+               store.add(43);
+               Assert.assertEquals(3, store.size());
+               store.add(44);
+               Assert.assertEquals(4, store.size());
+               store.add(45);
+               Assert.assertEquals(5, store.size());
+
+               // test remove
+               store.remove(41);
+               Assert.assertEquals(4, store.size());
+               store.remove(41);
+               Assert.assertEquals(4, store.size());
+
+               // test set semantics by attempt to insert duplicate
+               store.add(42);
+               Assert.assertEquals(4, store.size());
+       }
+
+       public static <E> RocksDBOrderedSetStore<E> createRocksDBOrderedStore(
+               @Nonnull RocksDBResource rocksDBResource,
+               @Nonnull TypeSerializer<E> byteOrderSerializer,
+               @Nonnegative int keyGroupId,
+               @Nonnegative int totalKeyGroups) {
+
+               ByteArrayOutputStreamWithPos outputStreamWithPos = new 
ByteArrayOutputStreamWithPos(32);
+               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStreamWithPos);
+               int keyGroupPrefixBytes = 
RocksDBKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(totalKeyGroups);
+               return new RocksDBOrderedSetStore<>(
+                       keyGroupId,
+                       keyGroupPrefixBytes,
+                       rocksDBResource.getRocksDB(),
+                       rocksDBResource.getDefaultColumnFamily(),
+                       rocksDBResource.getReadOptions(),
+                       byteOrderSerializer,
+                       outputStreamWithPos,
+                       outputView,
+                       rocksDBResource.getBatchWrapper());
+       }
+
+       protected RocksDBOrderedSetStore<Integer> createRocksDBOrderedStore() {
+               return createRocksDBOrderedStore(rocksDBResource, 
IntSerializer.INSTANCE, 0, 1);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
new file mode 100644
index 0000000..0407cc7
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * External resource for tests that require an instance of RocksDB.
+ */
+public class RocksDBResource extends ExternalResource {
+
+       /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
+       private final OptionsFactory optionsFactory;
+
+       /** Temporary folder that provides the working directory for the 
RocksDB instance. */
+       private TemporaryFolder temporaryFolder;
+
+       /** The options for the RocksDB instance. */
+       private DBOptions dbOptions;
+
+       /** The options for column families created with the RocksDB instance. 
*/
+       private ColumnFamilyOptions columnFamilyOptions;
+
+       /** The options for writes. */
+       private WriteOptions writeOptions;
+
+       /** The options for reads. */
+       private ReadOptions readOptions;
+
+       /** The RocksDB instance object. */
+       private RocksDB rocksDB;
+
+       /** List of all column families that have been created with the RocksDB 
instance. */
+       private List<ColumnFamilyHandle> columnFamilyHandles;
+
+       /** Wrapper for batched writes to the RocksDB instance. */
+       private RocksDBWriteBatchWrapper batchWrapper;
+
+       public RocksDBResource() {
+               this(new OptionsFactory() {
+                       @Override
+                       public DBOptions createDBOptions(DBOptions 
currentOptions) {
+                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createDBOptions();
+                       }
+
+                       @Override
+                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
+                               return 
PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions();
+                       }
+               });
+       }
+
+       public RocksDBResource(@Nonnull OptionsFactory optionsFactory) {
+               this.optionsFactory = optionsFactory;
+       }
+
+       public ColumnFamilyHandle getDefaultColumnFamily() {
+               return columnFamilyHandles.get(0);
+       }
+
+       public WriteOptions getWriteOptions() {
+               return writeOptions;
+       }
+
+       public RocksDB getRocksDB() {
+               return rocksDB;
+       }
+
+       public ReadOptions getReadOptions() {
+               return readOptions;
+       }
+
+       public RocksDBWriteBatchWrapper getBatchWrapper() {
+               return batchWrapper;
+       }
+
+       /**
+        * Creates and returns a new column family with the given name.
+        */
+       public ColumnFamilyHandle createNewColumnFamily(String name) {
+               try {
+                       final ColumnFamilyHandle columnFamily = 
rocksDB.createColumnFamily(
+                               new ColumnFamilyDescriptor(name.getBytes(), 
columnFamilyOptions));
+                       columnFamilyHandles.add(columnFamily);
+                       return columnFamily;
+               } catch (Exception ex) {
+                       throw new FlinkRuntimeException("Could not create 
column family.", ex);
+               }
+       }
+
+       @Override
+       protected void before() throws Throwable {
+               this.temporaryFolder = new TemporaryFolder();
+               this.temporaryFolder.create();
+               final File rocksFolder = temporaryFolder.newFolder();
+               this.dbOptions = 
optionsFactory.createDBOptions(PredefinedOptions.DEFAULT.createDBOptions()).
+                       setCreateIfMissing(true);
+               this.columnFamilyOptions = 
optionsFactory.createColumnOptions(PredefinedOptions.DEFAULT.createColumnOptions());
+               this.writeOptions = new WriteOptions();
+               this.writeOptions.disableWAL();
+               this.readOptions = new ReadOptions();
+               this.columnFamilyHandles = new ArrayList<>(1);
+               this.rocksDB = RocksDB.open(
+                       dbOptions,
+                       rocksFolder.getAbsolutePath(),
+                       Collections.singletonList(new 
ColumnFamilyDescriptor("default".getBytes(), columnFamilyOptions)),
+                       columnFamilyHandles);
+               this.batchWrapper = new RocksDBWriteBatchWrapper(rocksDB, 
writeOptions);
+       }
+
+       @Override
+       protected void after() {
+               // destruct in reversed order of creation.
+               IOUtils.closeQuietly(this.batchWrapper);
+               for (ColumnFamilyHandle columnFamilyHandle : 
columnFamilyHandles) {
+                       IOUtils.closeQuietly(columnFamilyHandle);
+               }
+               IOUtils.closeQuietly(this.rocksDB);
+               IOUtils.closeQuietly(this.readOptions);
+               IOUtils.closeQuietly(this.writeOptions);
+               IOUtils.closeQuietly(this.columnFamilyOptions);
+               IOUtils.closeQuietly(this.dbOptions);
+               temporaryFolder.delete();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index c5a68fb..7bf652f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -22,9 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.List;
@@ -46,12 +50,12 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
        /**
         * Processing time timers that are currently in-flight.
         */
-       private final InternalTimerHeap<K, N> processingTimeTimersQueue;
+       private final HeapPriorityQueueSet<TimerHeapInternalTimer<K, N>> 
processingTimeTimersQueue;
 
        /**
         * Event time timers that are currently in-flight.
         */
-       private final InternalTimerHeap<K, N> eventTimeTimersQueue;
+       private final HeapPriorityQueueSet<TimerHeapInternalTimer<K, N>> 
eventTimeTimersQueue;
 
        /**
         * Information concerning the local key-group range.
@@ -106,8 +110,8 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
                }
                this.localKeyGroupRangeStartIdx = startIdx;
 
-               this.eventTimeTimersQueue = new InternalTimerHeap<>(128, 
localKeyGroupRange, totalKeyGroups);
-               this.processingTimeTimersQueue = new InternalTimerHeap<>(128, 
localKeyGroupRange, totalKeyGroups);
+               this.eventTimeTimersQueue = 
createPriorityQueue(localKeyGroupRange, totalKeyGroups);
+               this.processingTimeTimersQueue = 
createPriorityQueue(localKeyGroupRange, totalKeyGroups);
        }
 
        /**
@@ -188,7 +192,7 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
        @Override
        public void registerProcessingTimeTimer(N namespace, long time) {
                InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
-               if (processingTimeTimersQueue.scheduleTimer(time, (K) 
keyContext.getCurrentKey(), namespace)) {
+               if (processingTimeTimersQueue.add(new 
TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
                        long nextTriggerTime = oldHead != null ? 
oldHead.getTimestamp() : Long.MAX_VALUE;
                        // check if we need to re-schedule our timer to earlier
                        if (time < nextTriggerTime) {
@@ -202,17 +206,17 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
 
        @Override
        public void registerEventTimeTimer(N namespace, long time) {
-               eventTimeTimersQueue.scheduleTimer(time, (K) 
keyContext.getCurrentKey(), namespace);
+               eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace));
        }
 
        @Override
        public void deleteProcessingTimeTimer(N namespace, long time) {
-               processingTimeTimersQueue.stopTimer(time, (K) 
keyContext.getCurrentKey(), namespace);
+               processingTimeTimersQueue.remove(new 
TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
        }
 
        @Override
        public void deleteEventTimeTimer(N namespace, long time) {
-               eventTimeTimersQueue.stopTimer(time, (K) 
keyContext.getCurrentKey(), namespace);
+               eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, 
(K) keyContext.getCurrentKey(), namespace));
        }
 
        @Override
@@ -256,12 +260,12 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
         */
        public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int 
keyGroupIdx) {
                return new InternalTimersSnapshot<>(
-                               keySerializer,
-                               keySerializer.snapshotConfiguration(),
-                               namespaceSerializer,
-                               namespaceSerializer.snapshotConfiguration(),
-                               
eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
-                               
processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
+                       keySerializer,
+                       keySerializer.snapshotConfiguration(),
+                       namespaceSerializer,
+                       namespaceSerializer.snapshotConfiguration(),
+                       
eventTimeTimersQueue.getElementsForKeyGroup(keyGroupIdx),
+                       
processingTimeTimersQueue.getElementsForKeyGroup(keyGroupIdx));
        }
 
        /**
@@ -287,36 +291,43 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
                        "Key Group " + keyGroupIdx + " does not belong to the 
local range.");
 
                // restore the event time timers
-               
eventTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getEventTimeTimers());
+               
eventTimeTimersQueue.addAll(restoredTimersSnapshot.getEventTimeTimers());
 
                // restore the processing time timers
-               
processingTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getProcessingTimeTimers());
+               
processingTimeTimersQueue.addAll(restoredTimersSnapshot.getProcessingTimeTimers());
        }
 
+       @VisibleForTesting
        public int numProcessingTimeTimers() {
                return this.processingTimeTimersQueue.size();
        }
 
+       @VisibleForTesting
        public int numEventTimeTimers() {
                return this.eventTimeTimersQueue.size();
        }
 
+       @VisibleForTesting
        public int numProcessingTimeTimers(N namespace) {
-               int count = 0;
-               for (InternalTimer<K, N> timer : processingTimeTimersQueue) {
-                       if (timer.getNamespace().equals(namespace)) {
-                               count++;
-                       }
-               }
-               return count;
+               return countTimersInNamespaceInternal(namespace, 
processingTimeTimersQueue);
        }
 
+       @VisibleForTesting
        public int numEventTimeTimers(N namespace) {
+               return countTimersInNamespaceInternal(namespace, 
eventTimeTimersQueue);
+       }
+
+       private int countTimersInNamespaceInternal(N namespace, 
InternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) {
                int count = 0;
-               for (InternalTimer<K, N> timer : eventTimeTimersQueue) {
-                       if (timer.getNamespace().equals(namespace)) {
-                               count++;
+               try (final CloseableIterator<TimerHeapInternalTimer<K, N>> 
iterator = queue.iterator()) {
+                       while (iterator.hasNext()) {
+                               final TimerHeapInternalTimer<K, N> timer = 
iterator.next();
+                               if (timer.getNamespace().equals(namespace)) {
+                                       count++;
+                               }
                        }
+               } catch (Exception e) {
+                       throw new FlinkRuntimeException("Exception when closing 
iterator.", e);
                }
                return count;
        }
@@ -327,17 +338,28 @@ public class HeapInternalTimerService<K, N> implements 
InternalTimerService<N>,
        }
 
        @VisibleForTesting
-       List<Set<InternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
-               return eventTimeTimersQueue.getTimersByKeyGroup();
+       List<Set<TimerHeapInternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() 
{
+               return eventTimeTimersQueue.getElementsByKeyGroup();
        }
 
        @VisibleForTesting
-       List<Set<InternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
-               return processingTimeTimersQueue.getTimersByKeyGroup();
+       List<Set<TimerHeapInternalTimer<K, N>>> 
getProcessingTimeTimersPerKeyGroup() {
+               return processingTimeTimersQueue.getElementsByKeyGroup();
        }
 
        private boolean 
areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> 
restoredSnapshot) {
                return (this.keyDeserializer != null && 
!this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
                        (this.namespaceDeserializer != null && 
!this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
        }
+
+       private static <K, N> HeapPriorityQueueSet<TimerHeapInternalTimer<K, 
N>> createPriorityQueue(
+               KeyGroupRange localKeyGroupRange,
+               int totalKeyGroups) {
+               return new HeapPriorityQueueSet<>(
+                       TimerHeapInternalTimer.getTimerComparator(),
+                       TimerHeapInternalTimer.getKeyExtractorFunction(),
+                       128,
+                       localKeyGroupRange,
+                       totalKeyGroups);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
new file mode 100644
index 0000000..cc7fbc4
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapPriorityQueueStateSnapshot.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupPartitioner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateSnapshot;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Array;
+
+/**
+ * This class represents the snapshot of an {@link HeapPriorityQueueSet}.
+ *
+ * @param <T> type of the state elements.
+ */
+public class HeapPriorityQueueStateSnapshot<T> implements StateSnapshot {
+
+       /** Function that extracts keys from elements. */
+       @Nonnull
+       private final KeyExtractorFunction<T> keyExtractor;
+
+       /** Copy of the heap array containing all the (immutable or deeply 
copied) elements. */
+       @Nonnull
+       private final T[] heapArrayCopy;
+
+       /** The element serializer. */
+       @Nonnull
+       private final TypeSerializer<T> elementSerializer;
+
+       /** The key-group range covered by this snapshot. */
+       @Nonnull
+       private final KeyGroupRange keyGroupRange;
+
+       /** The total number of key-groups in the job. */
+       @Nonnegative
+       private final int totalKeyGroups;
+
+       /** Result of partitioning the snapshot by key-group. */
+       @Nullable
+       private KeyGroupPartitionedSnapshot partitionedSnapshot;
+
+       HeapPriorityQueueStateSnapshot(
+               @Nonnull T[] heapArrayCopy,
+               @Nonnull KeyExtractorFunction<T> keyExtractor,
+               @Nonnull TypeSerializer<T> elementSerializer,
+               @Nonnull KeyGroupRange keyGroupRange,
+               @Nonnegative int totalKeyGroups) {
+
+               // TODO ensure that the array contains a deep copy of elements 
if we are *not* dealing with immutable types.
+               assert elementSerializer.isImmutableType();
+
+               this.keyExtractor = keyExtractor;
+               this.heapArrayCopy = heapArrayCopy;
+               this.elementSerializer = elementSerializer;
+               this.keyGroupRange = keyGroupRange;
+               this.totalKeyGroups = totalKeyGroups;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Nonnull
+       @Override
+       public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
+
+               if (partitionedSnapshot == null) {
+
+                       T[] partitioningOutput = (T[]) Array.newInstance(
+                               heapArrayCopy.getClass().getComponentType(),
+                               heapArrayCopy.length);
+
+                       KeyGroupPartitioner<T> keyGroupPartitioner =
+                               new KeyGroupPartitioner<>(
+                                       heapArrayCopy,
+                                       heapArrayCopy.length,
+                                       partitioningOutput,
+                                       keyGroupRange,
+                                       totalKeyGroups,
+                                       keyExtractor,
+                                       elementSerializer::serialize);
+
+                       partitionedSnapshot = 
keyGroupPartitioner.partitionByKeyGroup();
+               }
+
+               return partitionedSnapshot;
+       }
+
+       @Override
+       public void release() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
deleted file mode 100644
index 09f5a14..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java
+++ /dev/null
@@ -1,413 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.StateSnapshot;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * A heap-based priority queue for internal timers. This heap is supported by 
hash sets for fast contains
- * (de-duplication) and deletes. The heap implementation is a simple binary 
tree stored inside an array. Element indexes
- * in the heap array start at 1 instead of 0 to make array index computations 
a bit simpler in the hot methods.
- *
- * <p>Possible future improvements:
- * <ul>
- *  <li>We could also implement shrinking for the heap and the deduplication 
maps.</li>
- *  <li>We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
- * would be enough if it could return existing elements on unsuccessful 
adding, etc..</li>
- * </ul>
- *
- * @param <K> type of the key of the internal timers managed by this priority 
queue.
- * @param <N> type of the namespace of the internal timers managed by this 
priority queue.
- */
-public class InternalTimerHeap<K, N> implements Iterable<InternalTimer<K, N>> {
-
-       /**
-        * Comparator for {@link TimerHeapInternalTimer}, based on the 
timestamp in ascending order.
-        */
-       private static final Comparator<TimerHeapInternalTimer<?, ?>> 
COMPARATOR =
-               (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
-
-       /**
-        * This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of timers.
-        */
-       private final HashMap<TimerHeapInternalTimer<K, N>, 
TimerHeapInternalTimer<K, N>>[] deduplicationMapsByKeyGroup;
-
-       /**
-        * The array that represents the heap-organized priority queue.
-        */
-       private TimerHeapInternalTimer<K, N>[] queue;
-
-       /**
-        * The current size of the priority queue.
-        */
-       private int size;
-
-       /**
-        * The key-group range of timers that are managed by this queue.
-        */
-       private final KeyGroupRange keyGroupRange;
-
-       /**
-        * The total number of key-groups of the job.
-        */
-       private final int totalNumberOfKeyGroups;
-
-
-       /**
-        * Creates an empty {@link InternalTimerHeap} with the requested 
initial capacity.
-        *
-        * @param minimumCapacity the minimum and initial capacity of this 
priority queue.
-        */
-       @SuppressWarnings("unchecked")
-       InternalTimerHeap(
-               @Nonnegative int minimumCapacity,
-               @Nonnull KeyGroupRange keyGroupRange,
-               @Nonnegative int totalNumberOfKeyGroups) {
-
-               this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
-               this.keyGroupRange = keyGroupRange;
-
-               final int keyGroupsInLocalRange = 
keyGroupRange.getNumberOfKeyGroups();
-               final int deduplicationSetSize = 1 + minimumCapacity / 
keyGroupsInLocalRange;
-               this.deduplicationMapsByKeyGroup = new 
HashMap[keyGroupsInLocalRange];
-               for (int i = 0; i < keyGroupsInLocalRange; ++i) {
-                       deduplicationMapsByKeyGroup[i] = new 
HashMap<>(deduplicationSetSize);
-               }
-
-               this.queue = new TimerHeapInternalTimer[1 + minimumCapacity];
-       }
-
-       @Nullable
-       public InternalTimer<K, N> poll() {
-               return size() > 0 ? removeElementAtIndex(1) : null;
-       }
-
-       @Nullable
-       public InternalTimer<K, N> peek() {
-               return size() > 0 ? queue[1] : null;
-       }
-
-       /**
-        * Adds a new timer with the given timestamp, key, and namespace to the 
heap, if an identical timer was not yet
-        * registered.
-        *
-        * @param timestamp the timer timestamp.
-        * @param key the timer key.
-        * @param namespace the timer namespace.
-        * @return true iff a new timer with given timestamp, key, and 
namespace was added to the heap.
-        */
-       public boolean scheduleTimer(long timestamp, @Nonnull K key, @Nonnull N 
namespace) {
-               return addInternal(new TimerHeapInternalTimer<>(timestamp, key, 
namespace));
-       }
-
-       /**
-        * Stops timer with the given timestamp, key, and namespace by removing 
it from the heap, if it exists on the heap.
-        *
-        * @param timestamp the timer timestamp.
-        * @param key the timer key.
-        * @param namespace the timer namespace.
-        * @return true iff a timer with given timestamp, key, and namespace 
was found and removed from the heap.
-        */
-       public boolean stopTimer(long timestamp, @Nonnull K key, @Nonnull N 
namespace) {
-               return removeInternal(new TimerHeapInternalTimer<>(timestamp, 
key, namespace));
-       }
-
-       public boolean isEmpty() {
-               return size() == 0;
-       }
-
-       @Nonnegative
-       public int size() {
-               return size;
-       }
-
-       public void clear() {
-               Arrays.fill(queue, null);
-               for (HashMap<TimerHeapInternalTimer<K, N>, 
TimerHeapInternalTimer<K, N>> timerHashMap :
-                       deduplicationMapsByKeyGroup) {
-                       timerHashMap.clear();
-               }
-               size = 0;
-       }
-
-       @SuppressWarnings({"unchecked"})
-       @Nonnull
-       public InternalTimer<K, N>[] toArray() {
-               return (InternalTimer<K, N>[]) Arrays.copyOfRange(queue, 1, 
size + 1, queue.getClass());
-       }
-
-       /**
-        * Returns an iterator over the elements in this queue. The iterator
-        * does not return the elements in any particular order.
-        *
-        * @return an iterator over the elements in this queue.
-        */
-       @Nonnull
-       public Iterator<InternalTimer<K, N>> iterator() {
-               return new InternalTimerPriorityQueueIterator();
-       }
-
-       /**
-        * This method adds all the given timers to the heap.
-        */
-       void bulkAddRestoredTimers(Collection<? extends InternalTimer<K, N>> 
restoredTimers) {
-
-               if (restoredTimers == null) {
-                       return;
-               }
-
-               resizeForBulkLoad(restoredTimers.size());
-
-               for (InternalTimer<K, N> timer : restoredTimers) {
-                       if (timer instanceof TimerHeapInternalTimer) {
-                               addInternal((TimerHeapInternalTimer<K, N>) 
timer);
-                       } else {
-                               scheduleTimer(timer.getTimestamp(), 
timer.getKey(), timer.getNamespace());
-                       }
-               }
-       }
-
-       /**
-        * Returns an unmodifiable set of all timers in the given key-group.
-        */
-       @Nonnull
-       Set<InternalTimer<K, N>> getTimersForKeyGroup(@Nonnegative int 
keyGroupIdx) {
-               return 
Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet());
-       }
-
-       @VisibleForTesting
-       @SuppressWarnings("unchecked")
-       @Nonnull
-       List<Set<InternalTimer<K, N>>> getTimersByKeyGroup() {
-               List<Set<InternalTimer<K, N>>> result = new 
ArrayList<>(deduplicationMapsByKeyGroup.length);
-               for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) {
-                       result.add(i, 
Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet()));
-               }
-               return result;
-       }
-
-       @Nonnull
-       StateSnapshot snapshot(TimerHeapInternalTimer.TimerSerializer<K, N> 
serializer) {
-               return new InternalTimerHeapSnapshot<>(
-                       Arrays.copyOfRange(queue, 1, size + 1),
-                       serializer,
-                       keyGroupRange,
-                       totalNumberOfKeyGroups);
-       }
-
-       private boolean addInternal(TimerHeapInternalTimer<K, N> timer) {
-
-               if (getDedupMapForTimer(timer).putIfAbsent(timer, timer) == 
null) {
-                       final int newSize = increaseSizeByOne();
-                       moveElementToIdx(timer, newSize);
-                       siftUp(newSize);
-                       return true;
-               } else {
-                       return false;
-               }
-       }
-
-       private boolean removeInternal(TimerHeapInternalTimer<K, N> 
timerToRemove) {
-
-               TimerHeapInternalTimer<K, N> storedTimer = 
getDedupMapForTimer(timerToRemove).remove(timerToRemove);
-
-               if (storedTimer != null) {
-                       removeElementAtIndex(storedTimer.getTimerHeapIndex());
-                       return true;
-               }
-
-               return false;
-       }
-
-       private TimerHeapInternalTimer<K, N> removeElementAtIndex(int 
removeIdx) {
-               TimerHeapInternalTimer<K, N>[] heap = this.queue;
-               TimerHeapInternalTimer<K, N> removedValue = heap[removeIdx];
-
-               assert removedValue.getTimerHeapIndex() == removeIdx;
-
-               final int oldSize = size;
-
-               if (removeIdx != oldSize) {
-                       TimerHeapInternalTimer<K, N> timer = heap[oldSize];
-                       moveElementToIdx(timer, removeIdx);
-                       siftDown(removeIdx);
-                       if (heap[removeIdx] == timer) {
-                               siftUp(removeIdx);
-                       }
-               }
-
-               heap[oldSize] = null;
-               getDedupMapForTimer(removedValue).remove(removedValue);
-
-               --size;
-               return removedValue;
-       }
-
-       private void siftUp(int idx) {
-               final TimerHeapInternalTimer<K, N>[] heap = this.queue;
-               final TimerHeapInternalTimer<K, N> currentTimer = heap[idx];
-               int parentIdx = idx >>> 1;
-
-               while (parentIdx > 0 && isTimerLessThen(currentTimer, 
heap[parentIdx])) {
-                       moveElementToIdx(heap[parentIdx], idx);
-                       idx = parentIdx;
-                       parentIdx >>>= 1;
-               }
-
-               moveElementToIdx(currentTimer, idx);
-       }
-
-       private void siftDown(int idx) {
-               final TimerHeapInternalTimer<K, N>[] heap = this.queue;
-               final int heapSize = this.size;
-
-               final TimerHeapInternalTimer<K, N> currentTimer = heap[idx];
-               int firstChildIdx = idx << 1;
-               int secondChildIdx = firstChildIdx + 1;
-
-               if (isTimerIndexValid(secondChildIdx, heapSize) &&
-                       isTimerLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
-                       firstChildIdx = secondChildIdx;
-               }
-
-               while (isTimerIndexValid(firstChildIdx, heapSize) &&
-                       isTimerLessThen(heap[firstChildIdx], currentTimer)) {
-                       moveElementToIdx(heap[firstChildIdx], idx);
-                       idx = firstChildIdx;
-                       firstChildIdx = idx << 1;
-                       secondChildIdx = firstChildIdx + 1;
-
-                       if (isTimerIndexValid(secondChildIdx, heapSize) &&
-                               isTimerLessThen(heap[secondChildIdx], 
heap[firstChildIdx])) {
-                               firstChildIdx = secondChildIdx;
-                       }
-               }
-
-               moveElementToIdx(currentTimer, idx);
-       }
-
-       private HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, 
N>> getDedupMapForKeyGroup(
-               @Nonnegative int keyGroupIdx) {
-               return 
deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)];
-       }
-
-       private boolean isTimerIndexValid(int timerIndex, int heapSize) {
-               return timerIndex <= heapSize;
-       }
-
-       private boolean isTimerLessThen(TimerHeapInternalTimer<K, N> a, 
TimerHeapInternalTimer<K, N> b) {
-               return COMPARATOR.compare(a, b) < 0;
-       }
-
-       private void moveElementToIdx(TimerHeapInternalTimer<K, N> element, int 
idx) {
-               queue[idx] = element;
-               element.setTimerHeapIndex(idx);
-       }
-
-       private HashMap<TimerHeapInternalTimer<K, N>, TimerHeapInternalTimer<K, 
N>> getDedupMapForTimer(
-               InternalTimer<K, N> timer) {
-               int keyGroup = 
KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), 
totalNumberOfKeyGroups);
-               return getDedupMapForKeyGroup(keyGroup);
-       }
-
-       private int globalKeyGroupToLocalIndex(int keyGroup) {
-               checkArgument(keyGroupRange.contains(keyGroup));
-               return keyGroup - keyGroupRange.getStartKeyGroup();
-       }
-
-       private int increaseSizeByOne() {
-               final int oldArraySize = queue.length;
-               final int minRequiredNewSize = ++size;
-               if (minRequiredNewSize >= oldArraySize) {
-                       final int grow = (oldArraySize < 64) ? oldArraySize + 2 
: oldArraySize >> 1;
-                       resizeQueueArray(oldArraySize + grow, 
minRequiredNewSize);
-               }
-               // TODO implement shrinking as well?
-               return minRequiredNewSize;
-       }
-
-       private void resizeForBulkLoad(int totalSize) {
-               if (totalSize > queue.length) {
-                       int desiredSize = totalSize + (totalSize >>> 3);
-                       resizeQueueArray(desiredSize, totalSize);
-               }
-       }
-
-       private void resizeQueueArray(int desiredSize, int minRequiredSize) {
-               if (isValidArraySize(desiredSize)) {
-                       queue = Arrays.copyOf(queue, desiredSize);
-               } else if (isValidArraySize(minRequiredSize)) {
-                       queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE);
-               } else {
-                       throw new OutOfMemoryError("Required minimum timer heap 
size " + minRequiredSize +
-                               " exceeds maximum size of " + MAX_ARRAY_SIZE + 
".");
-               }
-       }
-
-       private static boolean isValidArraySize(int size) {
-               return size >= 0 && size <= MAX_ARRAY_SIZE;
-       }
-
-       /**
-        * {@link Iterator} implementation for {@link 
InternalTimerPriorityQueueIterator}.
-        * {@link Iterator#remove()} is not supported.
-        */
-       private class InternalTimerPriorityQueueIterator implements 
Iterator<InternalTimer<K, N>> {
-
-               private int iterationIdx;
-
-               InternalTimerPriorityQueueIterator() {
-                       this.iterationIdx = 0;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return iterationIdx < size;
-               }
-
-               @Override
-               public InternalTimer<K, N> next() {
-                       if (iterationIdx >= size) {
-                               throw new NoSuchElementException("Iterator has 
no next element.");
-                       }
-                       return queue[++iterationIdx];
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java
deleted file mode 100644
index 6eb4057..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeapSnapshot.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.runtime.state.KeyGroupPartitioner;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateSnapshot;
-
-import javax.annotation.Nonnegative;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-/**
- * This class represents the snapshot of an {@link InternalTimerHeap}.
- *
- * @param <K> type of key.
- * @param <N> type of namespace.
- */
-public class InternalTimerHeapSnapshot<K, N> implements StateSnapshot {
-
-       /** Copy of the heap array containing all the (immutable timers). */
-       @Nonnull
-       private final TimerHeapInternalTimer<K, N>[] timerHeapArrayCopy;
-
-       /** The timer serializer. */
-       @Nonnull
-       private final TimerHeapInternalTimer.TimerSerializer<K, N> 
timerSerializer;
-
-       /** The key-group range covered by this snapshot. */
-       @Nonnull
-       private final KeyGroupRange keyGroupRange;
-
-       /** The total number of key-groups in the job. */
-       @Nonnegative
-       private final int totalKeyGroups;
-
-       /** Result of partitioning the snapshot by key-group. */
-       @Nullable
-       private KeyGroupPartitionedSnapshot partitionedSnapshot;
-
-       InternalTimerHeapSnapshot(
-               @Nonnull TimerHeapInternalTimer<K, N>[] timerHeapArrayCopy,
-               @Nonnull TimerHeapInternalTimer.TimerSerializer<K, N> 
timerSerializer,
-               @Nonnull KeyGroupRange keyGroupRange,
-               @Nonnegative int totalKeyGroups) {
-
-               this.timerHeapArrayCopy = timerHeapArrayCopy;
-               this.timerSerializer = timerSerializer;
-               this.keyGroupRange = keyGroupRange;
-               this.totalKeyGroups = totalKeyGroups;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Nonnull
-       @Override
-       public KeyGroupPartitionedSnapshot partitionByKeyGroup() {
-
-               if (partitionedSnapshot == null) {
-
-                       TimerHeapInternalTimer<K, N>[] partitioningOutput = new 
TimerHeapInternalTimer[timerHeapArrayCopy.length];
-
-                       KeyGroupPartitioner<TimerHeapInternalTimer<K, N>> 
timerPartitioner =
-                               new KeyGroupPartitioner<>(
-                                       timerHeapArrayCopy,
-                                       timerHeapArrayCopy.length,
-                                       partitioningOutput,
-                                       keyGroupRange,
-                                       totalKeyGroups,
-                                       TimerHeapInternalTimer::getKey,
-                                       timerSerializer::serialize);
-
-                       partitionedSnapshot = 
timerPartitioner.partitionByKeyGroup();
-               }
-
-               return partitionedSnapshot;
-       }
-
-       @Override
-       public void release() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
index ddaee6b..bedd4e0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshot.java
@@ -37,8 +37,8 @@ public class InternalTimersSnapshot<K, N> {
        private TypeSerializer<N> namespaceSerializer;
        private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot;
 
-       private Set<InternalTimer<K, N>> eventTimeTimers;
-       private Set<InternalTimer<K, N>> processingTimeTimers;
+       private Set<TimerHeapInternalTimer<K, N>> eventTimeTimers;
+       private Set<TimerHeapInternalTimer<K, N>> processingTimeTimers;
 
        /** Empty constructor used when restoring the timers. */
        public InternalTimersSnapshot() {}
@@ -49,8 +49,8 @@ public class InternalTimersSnapshot<K, N> {
                        TypeSerializerConfigSnapshot 
keySerializerConfigSnapshot,
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializerConfigSnapshot 
namespaceSerializerConfigSnapshot,
-                       @Nullable Set<InternalTimer<K, N>> eventTimeTimers,
-                       @Nullable Set<InternalTimer<K, N>> 
processingTimeTimers) {
+                       @Nullable Set<TimerHeapInternalTimer<K, N>> 
eventTimeTimers,
+                       @Nullable Set<TimerHeapInternalTimer<K, N>> 
processingTimeTimers) {
 
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializerConfigSnapshot);
@@ -92,19 +92,19 @@ public class InternalTimersSnapshot<K, N> {
                this.namespaceSerializerConfigSnapshot = 
namespaceSerializerConfigSnapshot;
        }
 
-       public Set<InternalTimer<K, N>> getEventTimeTimers() {
+       public Set<TimerHeapInternalTimer<K, N>> getEventTimeTimers() {
                return eventTimeTimers;
        }
 
-       public void setEventTimeTimers(Set<InternalTimer<K, N>> 
eventTimeTimers) {
+       public void setEventTimeTimers(Set<TimerHeapInternalTimer<K, N>> 
eventTimeTimers) {
                this.eventTimeTimers = eventTimeTimers;
        }
 
-       public Set<InternalTimer<K, N>> getProcessingTimeTimers() {
+       public Set<TimerHeapInternalTimer<K, N>> getProcessingTimeTimers() {
                return processingTimeTimers;
        }
 
-       public void setProcessingTimeTimers(Set<InternalTimer<K, N>> 
processingTimeTimers) {
+       public void setProcessingTimeTimers(Set<TimerHeapInternalTimer<K, N>> 
processingTimeTimers) {
                this.processingTimeTimers = processingTimeTimers;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
index 05b77a7..dbb74e5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java
@@ -101,10 +101,10 @@ public class InternalTimersSnapshotReaderWriters {
                                timersSnapshot.getNamespaceSerializer());
 
                        // write the event time timers
-                       Set<InternalTimer<K, N>> eventTimers = 
timersSnapshot.getEventTimeTimers();
+                       Set<TimerHeapInternalTimer<K, N>> eventTimers = 
timersSnapshot.getEventTimeTimers();
                        if (eventTimers != null) {
                                out.writeInt(eventTimers.size());
-                               for (InternalTimer<K, N> eventTimer : 
eventTimers) {
+                               for (TimerHeapInternalTimer<K, N> eventTimer : 
eventTimers) {
                                        timerSerializer.serialize(eventTimer, 
out);
                                }
                        } else {
@@ -112,10 +112,10 @@ public class InternalTimersSnapshotReaderWriters {
                        }
 
                        // write the processing time timers
-                       Set<InternalTimer<K, N>> processingTimers = 
timersSnapshot.getProcessingTimeTimers();
+                       Set<TimerHeapInternalTimer<K, N>> processingTimers = 
timersSnapshot.getProcessingTimeTimers();
                        if (processingTimers != null) {
                                out.writeInt(processingTimers.size());
-                               for (InternalTimer<K, N> processingTimer : 
processingTimers) {
+                               for (TimerHeapInternalTimer<K, N> 
processingTimer : processingTimers) {
                                        
timerSerializer.serialize(processingTimer, out);
                                }
                        } else {
@@ -222,10 +222,10 @@ public class InternalTimersSnapshotReaderWriters {
 
                        // read the event time timers
                        int sizeOfEventTimeTimers = in.readInt();
-                       Set<InternalTimer<K, N>> restoredEventTimers = new 
HashSet<>(sizeOfEventTimeTimers);
+                       Set<TimerHeapInternalTimer<K, N>> restoredEventTimers = 
new HashSet<>(sizeOfEventTimeTimers);
                        if (sizeOfEventTimeTimers > 0) {
                                for (int i = 0; i < sizeOfEventTimeTimers; i++) 
{
-                                       InternalTimer<K, N> timer = 
timerSerializer.deserialize(in);
+                                       TimerHeapInternalTimer<K, N> timer = 
timerSerializer.deserialize(in);
                                        restoredEventTimers.add(timer);
                                }
                        }
@@ -233,10 +233,10 @@ public class InternalTimersSnapshotReaderWriters {
 
                        // read the processing time timers
                        int sizeOfProcessingTimeTimers = in.readInt();
-                       Set<InternalTimer<K, N>> restoredProcessingTimers = new 
HashSet<>(sizeOfProcessingTimeTimers);
+                       Set<TimerHeapInternalTimer<K, N>> 
restoredProcessingTimers = new HashSet<>(sizeOfProcessingTimeTimers);
                        if (sizeOfProcessingTimeTimers > 0) {
                                for (int i = 0; i < sizeOfProcessingTimeTimers; 
i++) {
-                                       InternalTimer<K, N> timer = 
timerSerializer.deserialize(in);
+                                       TimerHeapInternalTimer<K, N> timer = 
timerSerializer.deserialize(in);
                                        restoredProcessingTimers.add(timer);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
index 906b090..bd821c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java
@@ -19,35 +19,48 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.Comparator;
 
 /**
- * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}.
+ * Implementation of {@link InternalTimer} to use with a {@link 
HeapPriorityQueueSet}.
  *
  * @param <K> Type of the keys to which timers are scoped.
  * @param <N> Type of the namespace to which timers are scoped.
  */
 @Internal
-public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> 
{
+public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, 
N>, HeapPriorityQueueElement {
 
-       /** The index that indicates that a tracked internal timer is not 
tracked. */
-       private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = 
Integer.MIN_VALUE;
+       /** Function to extract the key from a {@link TimerHeapInternalTimer}. 
*/
+       private static final KeyExtractorFunction<TimerHeapInternalTimer<?, ?>> 
KEY_EXTRACTOR_FUNCTION =
+               TimerHeapInternalTimer::getKey;
 
+       /** Function to compare instances of {@link TimerHeapInternalTimer}. */
+       private static final Comparator<TimerHeapInternalTimer<?, ?>> 
TIMER_COMPARATOR =
+               (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp());
+
+       /** The key for which the timer is scoped. */
        @Nonnull
        private final K key;
 
+       /** The namespace for which the timer is scoped. */
        @Nonnull
        private final N namespace;
 
+       /** The expiration timestamp. */
        private final long timestamp;
 
        /**
@@ -60,7 +73,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
                this.timestamp = timestamp;
                this.key = key;
                this.namespace = namespace;
-               this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX;
+               this.timerHeapIndex = NOT_CONTAINED;
        }
 
        @Override
@@ -96,19 +109,14 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
                return false;
        }
 
-       /**
-        * Returns the current index of this timer in the owning timer heap.
-        */
-       int getTimerHeapIndex() {
+       @Override
+       public int getInternalIndex() {
                return timerHeapIndex;
        }
 
-       /**
-        * Sets the current index of this timer in the owning timer heap and 
should only be called by the managing heap.
-        * @param timerHeapIndex the new index in the timer heap.
-        */
-       void setTimerHeapIndex(int timerHeapIndex) {
-               this.timerHeapIndex = timerHeapIndex;
+       @Override
+       public void setInternalIndex(int newIndex) {
+               this.timerHeapIndex = newIndex;
        }
 
        /**
@@ -116,7 +124,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
         * removed.
         */
        void removedFromTimerQueue() {
-               setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX);
+               setInternalIndex(NOT_CONTAINED);
        }
 
        @Override
@@ -136,10 +144,22 @@ public final class TimerHeapInternalTimer<K, N> 
implements InternalTimer<K, N> {
                                '}';
        }
 
+       @VisibleForTesting
+       @SuppressWarnings("unchecked")
+       static <T extends TimerHeapInternalTimer> Comparator<T> 
getTimerComparator() {
+               return (Comparator<T>) TIMER_COMPARATOR;
+       }
+
+       @SuppressWarnings("unchecked")
+       @VisibleForTesting
+       static <T extends TimerHeapInternalTimer> KeyExtractorFunction<T> 
getKeyExtractorFunction() {
+               return (KeyExtractorFunction<T>) KEY_EXTRACTOR_FUNCTION;
+       }
+
        /**
         * A {@link TypeSerializer} used to serialize/deserialize a {@link 
TimerHeapInternalTimer}.
         */
-       public static class TimerSerializer<K, N> extends 
TypeSerializer<InternalTimer<K, N>> {
+       public static class TimerSerializer<K, N> extends 
TypeSerializer<TimerHeapInternalTimer<K, N>> {
 
                private static final long serialVersionUID = 
1119562170939152304L;
 
@@ -160,7 +180,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
                }
 
                @Override
-               public TypeSerializer<InternalTimer<K, N>> duplicate() {
+               public TypeSerializer<TimerHeapInternalTimer<K, N>> duplicate() 
{
 
                        final TypeSerializer<K> keySerializerDuplicate = 
keySerializer.duplicate();
                        final TypeSerializer<N> namespaceSerializerDuplicate = 
namespaceSerializer.duplicate();
@@ -176,17 +196,17 @@ public final class TimerHeapInternalTimer<K, N> 
implements InternalTimer<K, N> {
                }
 
                @Override
-               public InternalTimer<K, N> createInstance() {
+               public TimerHeapInternalTimer<K, N> createInstance() {
                        throw new UnsupportedOperationException();
                }
 
                @Override
-               public InternalTimer<K, N> copy(InternalTimer<K, N> from) {
+               public TimerHeapInternalTimer<K, N> 
copy(TimerHeapInternalTimer<K, N> from) {
                        return new 
TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), 
from.getNamespace());
                }
 
                @Override
-               public InternalTimer<K, N> copy(InternalTimer<K, N> from, 
InternalTimer<K, N> reuse) {
+               public TimerHeapInternalTimer<K, N> 
copy(TimerHeapInternalTimer<K, N> from, TimerHeapInternalTimer<K, N> reuse) {
                        return copy(from);
                }
 
@@ -197,14 +217,14 @@ public final class TimerHeapInternalTimer<K, N> 
implements InternalTimer<K, N> {
                }
 
                @Override
-               public void serialize(InternalTimer<K, N> record, 
DataOutputView target) throws IOException {
+               public void serialize(TimerHeapInternalTimer<K, N> record, 
DataOutputView target) throws IOException {
                        keySerializer.serialize(record.getKey(), target);
                        namespaceSerializer.serialize(record.getNamespace(), 
target);
                        
LongSerializer.INSTANCE.serialize(record.getTimestamp(), target);
                }
 
                @Override
-               public InternalTimer<K, N> deserialize(DataInputView source) 
throws IOException {
+               public TimerHeapInternalTimer<K, N> deserialize(DataInputView 
source) throws IOException {
                        K key = keySerializer.deserialize(source);
                        N namespace = namespaceSerializer.deserialize(source);
                        Long timestamp = 
LongSerializer.INSTANCE.deserialize(source);
@@ -212,7 +232,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
                }
 
                @Override
-               public InternalTimer<K, N> deserialize(InternalTimer<K, N> 
reuse, DataInputView source) throws IOException {
+               public TimerHeapInternalTimer<K, N> 
deserialize(TimerHeapInternalTimer<K, N> reuse, DataInputView source) throws 
IOException {
                        return deserialize(source);
                }
 
@@ -247,7 +267,7 @@ public final class TimerHeapInternalTimer<K, N> implements 
InternalTimer<K, N> {
                }
 
                @Override
-               public CompatibilityResult<InternalTimer<K, N>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               public CompatibilityResult<TimerHeapInternalTimer<K, N>> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
                        throw new UnsupportedOperationException("This 
serializer is not registered for managed state.");
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6ad421e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
index 1ce400d..b008fa2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -104,7 +104,7 @@ public class HeapInternalTimerServiceTest {
                int endKeyGroupIdx = totalNoOfKeyGroups - 1; // we have 0 to 99
 
                @SuppressWarnings("unchecked")
-               Set<InternalTimer<Integer, String>>[] expectedNonEmptyTimerSets 
= new HashSet[totalNoOfKeyGroups];
+               Set<TimerHeapInternalTimer<Integer, String>>[] 
expectedNonEmptyTimerSets = new HashSet[totalNoOfKeyGroups];
 
                TestKeyContext keyContext = new TestKeyContext();
                HeapInternalTimerService<Integer, String> timerService =
@@ -123,7 +123,7 @@ public class HeapInternalTimerServiceTest {
                        int keyGroupIdx =  
KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNoOfKeyGroups);
 
                        // add it in the adequate expected set of timers per 
keygroup
-                       Set<InternalTimer<Integer, String>> timerSet = 
expectedNonEmptyTimerSets[keyGroupIdx];
+                       Set<TimerHeapInternalTimer<Integer, String>> timerSet = 
expectedNonEmptyTimerSets[keyGroupIdx];
                        if (timerSet == null) {
                                timerSet = new HashSet<>();
                                expectedNonEmptyTimerSets[keyGroupIdx] = 
timerSet;
@@ -136,16 +136,16 @@ public class HeapInternalTimerServiceTest {
                        
timerService.registerProcessingTimeTimer(timer.getNamespace(), 
timer.getTimestamp());
                }
 
-               List<Set<InternalTimer<Integer, String>>> eventTimeTimers =
+               List<Set<TimerHeapInternalTimer<Integer, String>>> 
eventTimeTimers =
                        timerService.getEventTimeTimersPerKeyGroup();
-               List<Set<InternalTimer<Integer, String>>> processingTimeTimers =
+               List<Set<TimerHeapInternalTimer<Integer, String>>> 
processingTimeTimers =
                        timerService.getProcessingTimeTimersPerKeyGroup();
 
                // finally verify that the actual timers per key group sets are 
the expected ones.
                for (int i = 0; i < expectedNonEmptyTimerSets.length; i++) {
-                       Set<InternalTimer<Integer, String>> expected = 
expectedNonEmptyTimerSets[i];
-                       Set<InternalTimer<Integer, String>> actualEvent = 
eventTimeTimers.get(i);
-                       Set<InternalTimer<Integer, String>> actualProcessing = 
processingTimeTimers.get(i);
+                       Set<TimerHeapInternalTimer<Integer, String>> expected = 
expectedNonEmptyTimerSets[i];
+                       Set<TimerHeapInternalTimer<Integer, String>> 
actualEvent = eventTimeTimers.get(i);
+                       Set<TimerHeapInternalTimer<Integer, String>> 
actualProcessing = processingTimeTimers.get(i);
 
                        if (expected == null) {
                                Assert.assertTrue(actualEvent.isEmpty());

Reply via email to