Repository: flink
Updated Branches:
  refs/heads/master bc4638a3c -> f48f5340a


http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index 2567004..aa4e6d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -35,10 +34,6 @@ import 
org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -53,18 +48,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link KvStateRequestSerializer}.
+ * Tests for {@link KvStateSerializer}.
  */
 @RunWith(Parameterized.class)
 public class KvStateRequestSerializerTest {
 
-       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
-
        @Parameterized.Parameters
        public static Collection<Boolean> parameters() {
                return Arrays.asList(false, true);
@@ -74,155 +66,6 @@ public class KvStateRequestSerializerTest {
        public boolean async;
 
        /**
-        * Tests KvState request serialization.
-        */
-       @Test
-       public void testKvStateRequestSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1337L;
-               KvStateID kvStateId = new KvStateID();
-               byte[] serializedKeyAndNamespace = randomByteArray(1024);
-
-               ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
-                               alloc,
-                               requestId,
-                               kvStateId,
-                               serializedKeyAndNamespace);
-
-               int frameLength = buf.readInt();
-               assertEquals(KvStateRequestType.REQUEST, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequest request = 
KvStateRequestSerializer.deserializeKvStateRequest(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-               assertEquals(kvStateId, request.getKvStateId());
-               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests KvState request serialization with zero-length serialized key 
and namespace.
-        */
-       @Test
-       public void 
testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception 
{
-               byte[] serializedKeyAndNamespace = new byte[0];
-
-               ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest(
-                               alloc,
-                               1823,
-                               new KvStateID(),
-                               serializedKeyAndNamespace);
-
-               int frameLength = buf.readInt();
-               assertEquals(KvStateRequestType.REQUEST, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequest request = 
KvStateRequestSerializer.deserializeKvStateRequest(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedKeyAndNamespace, 
request.getSerializedKeyAndNamespace());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
-               new KvStateRequest(0, new KvStateID(), null);
-       }
-
-       /**
-        * Tests KvState request result serialization.
-        */
-       @Test
-       public void testKvStateRequestResultSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 72727278L;
-               byte[] serializedResult = randomByteArray(1024);
-
-               ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                               alloc,
-                               requestId,
-                               serializedResult);
-
-               int frameLength = buf.readInt();
-               assertEquals(KvStateRequestType.REQUEST_RESULT, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestResult request = 
KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-
-               assertArrayEquals(serializedResult, 
request.getSerializedResult());
-       }
-
-       /**
-        * Tests KvState request result serialization with zero-length 
serialized result.
-        */
-       @Test
-       public void 
testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws 
Exception {
-               byte[] serializedResult = new byte[0];
-
-               ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                               alloc,
-                               72727278,
-                               serializedResult);
-
-               int frameLength = buf.readInt();
-
-               assertEquals(KvStateRequestType.REQUEST_RESULT, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestResult request = 
KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertArrayEquals(serializedResult, 
request.getSerializedResult());
-       }
-
-       /**
-        * Tests that we don't try to be smart about <code>null</code> results.
-        * They should be treated explicitly.
-        */
-       @Test(expected = NullPointerException.class)
-       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
-               new KvStateRequestResult(0, null);
-       }
-
-       /**
-        * Tests KvState request failure serialization.
-        */
-       @Test
-       public void testKvStateRequestFailureSerialization() throws Exception {
-               long requestId = Integer.MAX_VALUE + 1111222L;
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                               alloc,
-                               requestId,
-                               cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(KvStateRequestType.REQUEST_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               KvStateRequestFailure request = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(requestId, request.getRequestId());
-               assertEquals(cause.getClass(), request.getCause().getClass());
-               assertEquals(cause.getMessage(), 
request.getCause().getMessage());
-       }
-
-       /**
-        * Tests KvState server failure serialization.
-        */
-       @Test
-       public void testServerFailureSerialization() throws Exception {
-               IllegalStateException cause = new 
IllegalStateException("Expected test");
-
-               ByteBuf buf = 
KvStateRequestSerializer.serializeServerFailure(alloc, cause);
-
-               int frameLength = buf.readInt();
-               assertEquals(KvStateRequestType.SERVER_FAILURE, 
KvStateRequestSerializer.deserializeHeader(buf));
-               Throwable request = 
KvStateRequestSerializer.deserializeServerFailure(buf);
-               assertEquals(buf.readerIndex(), frameLength + 4);
-
-               assertEquals(cause.getClass(), request.getClass());
-               assertEquals(cause.getMessage(), request.getMessage());
-       }
-
-       /**
         * Tests key and namespace serialization utils.
         */
        @Test
@@ -233,10 +76,10 @@ public class KvStateRequestSerializerTest {
                long expectedKey = Integer.MAX_VALUE + 12323L;
                String expectedNamespace = "knilf";
 
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
                                expectedKey, keySerializer, expectedNamespace, 
namespaceSerializer);
 
-               Tuple2<Long, String> actual = 
KvStateRequestSerializer.deserializeKeyAndNamespace(
+               Tuple2<Long, String> actual = 
KvStateSerializer.deserializeKeyAndNamespace(
                                serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
 
                assertEquals(expectedKey, actual.f0.longValue());
@@ -248,7 +91,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test(expected = IOException.class)
        public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
-               KvStateRequestSerializer.deserializeKeyAndNamespace(
+               KvStateSerializer.deserializeKeyAndNamespace(
                        new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
        }
 
@@ -257,7 +100,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test(expected = IOException.class)
        public void testKeyAndNamespaceDeserializationTooShort() throws 
Exception {
-               KvStateRequestSerializer.deserializeKeyAndNamespace(
+               KvStateSerializer.deserializeKeyAndNamespace(
                        new byte[] {1}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
        }
 
@@ -267,7 +110,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testKeyAndNamespaceDeserializationTooMany1() throws 
Exception {
                // Long + null String + 1 byte
-               KvStateRequestSerializer.deserializeKeyAndNamespace(
+               KvStateSerializer.deserializeKeyAndNamespace(
                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, 
LongSerializer.INSTANCE,
                        StringSerializer.INSTANCE);
        }
@@ -278,7 +121,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testKeyAndNamespaceDeserializationTooMany2() throws 
Exception {
                // Long + null String + 2 bytes
-               KvStateRequestSerializer.deserializeKeyAndNamespace(
+               KvStateSerializer.deserializeKeyAndNamespace(
                        new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, 
LongSerializer.INSTANCE,
                        StringSerializer.INSTANCE);
        }
@@ -291,8 +134,8 @@ public class KvStateRequestSerializerTest {
                TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
                long expectedValue = Long.MAX_VALUE - 1292929292L;
 
-               byte[] serializedValue = 
KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer);
-               long actualValue = 
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+               byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
+               long actualValue = 
KvStateSerializer.deserializeValue(serializedValue, valueSerializer);
 
                assertEquals(expectedValue, actualValue);
        }
@@ -302,7 +145,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test(expected = IOException.class)
        public void testDeserializeValueEmpty() throws Exception {
-               KvStateRequestSerializer.deserializeValue(new byte[] {}, 
LongSerializer.INSTANCE);
+               KvStateSerializer.deserializeValue(new byte[] {}, 
LongSerializer.INSTANCE);
        }
 
        /**
@@ -311,7 +154,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeValueTooShort() throws Exception {
                // 1 byte (incomplete Long)
-               KvStateRequestSerializer.deserializeValue(new byte[] {1}, 
LongSerializer.INSTANCE);
+               KvStateSerializer.deserializeValue(new byte[] {1}, 
LongSerializer.INSTANCE);
        }
 
        /**
@@ -320,7 +163,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeValueTooMany1() throws Exception {
                // Long + 1 byte
-               KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2},
+               KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 
1, 1, 1, 2},
                        LongSerializer.INSTANCE);
        }
 
@@ -330,7 +173,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeValueTooMany2() throws Exception {
                // Long + 2 bytes
-               KvStateRequestSerializer.deserializeValue(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 2},
+               KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 
1, 1, 1, 2, 2},
                        LongSerializer.INSTANCE);
        }
 
@@ -363,7 +206,7 @@ public class KvStateRequestSerializerTest {
 
        /**
         * Verifies that the serialization of a list using the given list state
-        * matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
+        * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
         *
         * @param key
         *              key of the list state
@@ -390,19 +233,19 @@ public class KvStateRequestSerializerTest {
                }
 
                final byte[] serializedKey =
-                       KvStateRequestSerializer.serializeKeyAndNamespace(
+                       KvStateSerializer.serializeKeyAndNamespace(
                                key, LongSerializer.INSTANCE,
                                VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
 
                final byte[] serializedValues = 
listState.getSerializedValue(serializedKey);
 
-               List<Long> actualValues = 
KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
+               List<Long> actualValues = 
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
                assertEquals(expectedValues, actualValues);
 
                // Single value
                long expectedValue = ThreadLocalRandom.current().nextLong();
-               byte[] serializedValue = 
KvStateRequestSerializer.serializeValue(expectedValue, valueSerializer);
-               List<Long> actualValue = 
KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
+               byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
+               List<Long> actualValue = 
KvStateSerializer.deserializeList(serializedValue, valueSerializer);
                assertEquals(1, actualValue.size());
                assertEquals(expectedValue, actualValue.get(0).longValue());
        }
@@ -412,7 +255,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test
        public void testDeserializeListEmpty() throws Exception {
-               List<Long> actualValue = KvStateRequestSerializer
+               List<Long> actualValue = KvStateSerializer
                        .deserializeList(new byte[] {}, 
LongSerializer.INSTANCE);
                assertEquals(0, actualValue.size());
        }
@@ -423,7 +266,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeListTooShort1() throws Exception {
                // 1 byte (incomplete Long)
-               KvStateRequestSerializer.deserializeList(new byte[] {1}, 
LongSerializer.INSTANCE);
+               KvStateSerializer.deserializeList(new byte[] {1}, 
LongSerializer.INSTANCE);
        }
 
        /**
@@ -432,7 +275,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeListTooShort2() throws Exception {
                // Long + 1 byte (separator) + 1 byte (incomplete Long)
-               KvStateRequestSerializer.deserializeList(new byte[] {1, 1, 1, 
1, 1, 1, 1, 1, 2, 3},
+               KvStateSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 
1, 1, 2, 3},
                        LongSerializer.INSTANCE);
        }
 
@@ -466,7 +309,7 @@ public class KvStateRequestSerializerTest {
 
        /**
         * Verifies that the serialization of a map using the given map state
-        * matches the deserialization with {@link 
KvStateRequestSerializer#deserializeList}.
+        * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
         *
         * @param key
         *              key of the map state
@@ -497,13 +340,13 @@ public class KvStateRequestSerializerTest {
                mapState.put(0L, null);
 
                final byte[] serializedKey =
-                       KvStateRequestSerializer.serializeKeyAndNamespace(
+                       KvStateSerializer.serializeKeyAndNamespace(
                                key, LongSerializer.INSTANCE,
                                VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
 
                final byte[] serializedValues = 
mapState.getSerializedValue(serializedKey);
 
-               Map<Long, String> actualValues = 
KvStateRequestSerializer.deserializeMap(serializedValues, userKeySerializer, 
userValueSerializer);
+               Map<Long, String> actualValues = 
KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, 
userValueSerializer);
                assertEquals(expectedValues.size(), actualValues.size());
                for (Map.Entry<Long, String> actualEntry : 
actualValues.entrySet()) {
                        assertEquals(expectedValues.get(actualEntry.getKey()), 
actualEntry.getValue());
@@ -515,12 +358,12 @@ public class KvStateRequestSerializerTest {
                String expectedValue = Long.toString(expectedKey);
                byte[] isNull = {0};
 
-               baos.write(KvStateRequestSerializer.serializeValue(expectedKey, 
userKeySerializer));
+               baos.write(KvStateSerializer.serializeValue(expectedKey, 
userKeySerializer));
                baos.write(isNull);
-               
baos.write(KvStateRequestSerializer.serializeValue(expectedValue, 
userValueSerializer));
+               baos.write(KvStateSerializer.serializeValue(expectedValue, 
userValueSerializer));
                byte[] serializedValue = baos.toByteArray();
 
-               Map<Long, String> actualValue = 
KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
+               Map<Long, String> actualValue = 
KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
                assertEquals(1, actualValue.size());
                assertEquals(expectedValue, actualValue.get(expectedKey));
        }
@@ -530,7 +373,7 @@ public class KvStateRequestSerializerTest {
         */
        @Test
        public void testDeserializeMapEmpty() throws Exception {
-               Map<Long, String> actualValue = KvStateRequestSerializer
+               Map<Long, String> actualValue = KvStateSerializer
                        .deserializeMap(new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
                assertEquals(0, actualValue.size());
        }
@@ -541,7 +384,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeMapTooShort1() throws Exception {
                // 1 byte (incomplete Key)
-               KvStateRequestSerializer.deserializeMap(new byte[] {1}, 
LongSerializer.INSTANCE, StringSerializer.INSTANCE);
+               KvStateSerializer.deserializeMap(new byte[] {1}, 
LongSerializer.INSTANCE, StringSerializer.INSTANCE);
        }
 
        /**
@@ -550,7 +393,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeMapTooShort2() throws Exception {
                // Long (Key) + 1 byte (incomplete Value)
-               KvStateRequestSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 
1, 1, 1, 1, 0},
+               KvStateSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 
1, 1, 0},
                                LongSerializer.INSTANCE, 
LongSerializer.INSTANCE);
        }
 
@@ -560,7 +403,7 @@ public class KvStateRequestSerializerTest {
        @Test(expected = IOException.class)
        public void testDeserializeMapTooShort3() throws Exception {
                // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte 
(incomplete Key2)
-               KvStateRequestSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 
1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3},
+               KvStateSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 1, 1, 
1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3},
                        LongSerializer.INSTANCE, LongSerializer.INSTANCE);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ed280a7..dbf131f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -56,7 +56,7 @@ import 
org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
 import org.apache.flink.runtime.state.heap.AbstractHeapState;
 import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
 import org.apache.flink.runtime.state.heap.StateTable;
@@ -3070,7 +3070,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializer<V> valueSerializer) throws Exception {
 
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
                                key, keySerializer, namespace, 
namespaceSerializer);
 
                byte[] serializedValue = 
kvState.getSerializedValue(serializedKeyAndNamespace);
@@ -3078,7 +3078,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                if (serializedValue == null) {
                        return null;
                } else {
-                       return 
KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+                       return 
KvStateSerializer.deserializeValue(serializedValue, valueSerializer);
                }
        }
 
@@ -3094,7 +3094,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        TypeSerializer<N> namespaceSerializer,
                        TypeSerializer<V> valueSerializer) throws Exception {
 
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
                                key, keySerializer, namespace, 
namespaceSerializer);
 
                byte[] serializedValue = 
kvState.getSerializedValue(serializedKeyAndNamespace);
@@ -3102,7 +3102,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                if (serializedValue == null) {
                        return null;
                } else {
-                       return 
KvStateRequestSerializer.deserializeList(serializedValue, valueSerializer);
+                       return 
KvStateSerializer.deserializeList(serializedValue, valueSerializer);
                }
        }
 
@@ -3120,7 +3120,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        TypeSerializer<UV> userValueSerializer
        ) throws Exception {
 
-               byte[] serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
                                key, keySerializer, namespace, 
namespaceSerializer);
 
                byte[] serializedValue = 
kvState.getSerializedValue(serializedKeyAndNamespace);
@@ -3128,7 +3128,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                if (serializedValue == null) {
                        return null;
                } else {
-                       return 
KvStateRequestSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
+                       return 
KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
deleted file mode 100644
index 8ac3d2f..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,1128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.query.QueryableStateClient;
-import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.QueryableStateStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.OnSuccess;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicLongArray;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Base class for queryable state integration tests with a configurable state 
backend.
- */
-public abstract class AbstractQueryableStateITCase extends TestLogger {
-
-       protected static final FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(10000, TimeUnit.SECONDS);
-       private static final FiniteDuration QUERY_RETRY_DELAY = new 
FiniteDuration(100, TimeUnit.MILLISECONDS);
-
-       protected static ActorSystem testActorSystem;
-
-       /**
-        * State backend to use.
-        */
-       protected AbstractStateBackend stateBackend;
-
-       /**
-        * Shared between all the test. Make sure to have at least NUM_SLOTS
-        * available after your test finishes, e.g. cancel the job you 
submitted.
-        */
-       protected static FlinkMiniCluster cluster;
-
-       protected static int maxParallelism;
-
-       @Before
-       public void setUp() throws Exception {
-               // NOTE: do not use a shared instance for all tests as the 
tests may brake
-               this.stateBackend = createStateBackend();
-
-               Assert.assertNotNull(cluster);
-
-               maxParallelism = 
cluster.configuration().getInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
1) *
-                               
cluster.configuration().getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
-       }
-
-       /**
-        * Creates a state backend instance which is used in the {@link 
#setUp()} method before each
-        * test case.
-        *
-        * @return a state backend instance for each unit test
-        */
-       protected abstract AbstractStateBackend createStateBackend() throws 
Exception;
-
-       /**
-        * Runs a simple topology producing random (key, 1) pairs at the 
sources (where
-        * number of keys is in fixed in range 0...numKeys). The records are 
keyed and
-        * a reducing queryable state instance is created, which sums up the 
records.
-        *
-        * <p>After submitting the job in detached mode, the 
QueryableStateCLient is used
-        * to query the counts of each key in rounds until all keys have 
non-zero counts.
-        */
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testQueryableState() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-               final int numKeys = 256;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-
-               try {
-                       //
-                       // Test program
-                       //
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestKeyRangeSource(numKeys));
-
-                       // Reducing state
-                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState = new ReducingStateDescriptor<>(
-                                       "any-name",
-                                       new SumReduce(),
-                                       source.getType());
-
-                       final String queryName = "hakuna-matata";
-
-                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 7143749578983540352L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState(queryName, 
reducingState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       cluster.submitJobDetached(jobGraph);
-
-                       //
-                       // Start querying
-                       //
-                       jobId = jobGraph.getJobID();
-
-                       final AtomicLongArray counts = new 
AtomicLongArray(numKeys);
-
-                       boolean allNonZero = false;
-                       while (!allNonZero && deadline.hasTimeLeft()) {
-                               allNonZero = true;
-
-                               final List<Future<Tuple2<Integer, Long>>> 
futures = new ArrayList<>(numKeys);
-
-                               for (int i = 0; i < numKeys; i++) {
-                                       final int key = i;
-
-                                       if (counts.get(key) > 0) {
-                                               // Skip this one
-                                               continue;
-                                       } else {
-                                               allNonZero = false;
-                                       }
-
-                                       Future<Tuple2<Integer, Long>> result = 
getKvStateWithRetries(
-                                                       client,
-                                                       jobId,
-                                                       queryName,
-                                                       key,
-                                                       
BasicTypeInfo.INT_TYPE_INFO,
-                                                       reducingState,
-                                                       QUERY_RETRY_DELAY,
-                                                       false);
-
-                                       result.onSuccess(new 
OnSuccess<Tuple2<Integer, Long>>() {
-                                               @Override
-                                               public void 
onSuccess(Tuple2<Integer, Long> result) throws Throwable {
-                                                       counts.set(key, 
result.f1);
-                                                       assertEquals("Key 
mismatch", key, result.f0.intValue());
-                                               }
-                                       }, testActorSystem.dispatcher());
-
-                                       futures.add(result);
-                               }
-
-                               Future<Iterable<Tuple2<Integer, Long>>> 
futureSequence = Futures.sequence(
-                                               futures,
-                                               testActorSystem.dispatcher());
-
-                               Await.ready(futureSequence, 
deadline.timeLeft());
-                       }
-
-                       assertTrue("Not all keys are non-zero", allNonZero);
-
-                       // All should be non-zero
-                       for (int i = 0; i < numKeys; i++) {
-                               long count = counts.get(i);
-                               assertTrue("Count at position " + i + " is " + 
count, count > 0);
-                       }
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests that duplicate query registrations fail the job at the 
JobManager.
-        *
-        * <b>NOTE: </b> This test is only in the non-HA variant of the tests 
because
-        * in the HA mode we use the actual JM code which does not recognize the
-        * {@code NotifyWhenJobStatus} message.  *
-        */
-       @Test
-       public void testDuplicateRegistrationFailsJob() throws Exception {
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-               final int numKeys = 256;
-
-               JobID jobId = null;
-
-               try {
-                       //
-                       // Test program
-                       //
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestKeyRangeSource(numKeys));
-
-                       // Reducing state
-                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState = new ReducingStateDescriptor<>(
-                                       "any-name",
-                                       new SumReduce(),
-                                       source.getType());
-
-                       final String queryName = "duplicate-me";
-
-                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = -4126824763829132959L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState(queryName, 
reducingState);
-
-                       final QueryableStateStream<Integer, Tuple2<Integer, 
Long>> duplicate =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = -6265024000462809436L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState(queryName);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       Future<TestingJobManagerMessages.JobStatusIs> 
failedFuture = cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.FAILED), 
deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class));
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       TestingJobManagerMessages.JobStatusIs jobStatus = 
Await.result(failedFuture, deadline.timeLeft());
-                       assertEquals(JobStatus.FAILED, jobStatus.state());
-
-                       // Get the job and check the cause
-                       JobManagerMessages.JobFound jobFound = Await.result(
-                                       
cluster.getLeaderGateway(deadline.timeLeft())
-                                                       .ask(new 
JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
-                                                       
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)),
-                                       deadline.timeLeft());
-
-                       String failureCause = 
jobFound.executionGraph().getFailureCause().getExceptionAsString();
-
-                       assertTrue("Not instance of SuppressRestartsException", 
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
-                       int causedByIndex = failureCause.indexOf("Caused by: ");
-                       String subFailureCause = 
failureCause.substring(causedByIndex + "Caused by: ".length());
-                       assertTrue("Not caused by IllegalStateException", 
subFailureCause.startsWith("java.lang.IllegalStateException"));
-                       assertTrue("Exception does not contain registration 
name", subFailureCause.contains(queryName));
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<JobManagerMessages.CancellationSuccess> 
cancellation = cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.CancellationSuccess>apply(JobManagerMessages.CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-               }
-       }
-
-       /**
-        * Tests simple value state queryable state instance. Each source emits
-        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
-        * queried. The tests succeeds after each subtask index is queried with
-        * value numElements (the latest element updated the state).
-        */
-       @Test
-       public void testValueState() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Value state
-                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState 
= new ValueStateDescriptor<>(
-                                       "any",
-                                       source.getType());
-
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 7662520075515707428L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("hakuna", 
valueState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       // Now query
-                       long expected = numElements;
-
-                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Similar tests as {@link #testValueState()} but before submitting the
-        * job, we already issue one request which fails.
-        */
-       @Test
-       public void testQueryNonStartedJobState() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                               .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Value state
-                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState 
= new ValueStateDescriptor<>(
-                               "any",
-                               source.getType(),
-                               null);
-
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                               source.keyBy(new KeySelector<Tuple2<Integer, 
Long>, Integer>() {
-                                       private static final long 
serialVersionUID = 7480503339992214681L;
-
-                                       @Override
-                                       public Integer getKey(Tuple2<Integer, 
Long> value) throws Exception {
-                                               return value.f0;
-                                       }
-                               }).asQueryableState("hakuna", valueState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       // Now query
-                       long expected = numElements;
-
-                       // query once
-                       client.getKvState(
-                                       jobId,
-                                       queryableState.getQueryableStateName(),
-                                       0,
-                                       VoidNamespace.INSTANCE,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       VoidNamespaceTypeInfo.INSTANCE,
-                                       valueState);
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       executeQuery(deadline, client, jobId, "hakuna", 
valueState, expected);
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
-        * <tt>expected</tt> equals the value of the result tuple's second 
field.
-        */
-       private void executeQuery(
-                       final Deadline deadline,
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final StateDescriptor<?, Tuple2<Integer, Long>> 
stateDescriptor,
-                       final long expected) throws Exception {
-
-               for (int key = 0; key < maxParallelism; key++) {
-                       boolean success = false;
-                       while (deadline.hasTimeLeft() && !success) {
-                               Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                                       jobId,
-                                       queryableStateName,
-                                       key,
-                                       BasicTypeInfo.INT_TYPE_INFO,
-                                       stateDescriptor,
-                                       QUERY_RETRY_DELAY,
-                                       false);
-
-                               Tuple2<Integer, Long> value = 
Await.result(future, deadline.timeLeft());
-
-                               assertEquals("Key mismatch", key, 
value.f0.intValue());
-                               if (expected == value.f1) {
-                                       success = true;
-                               } else {
-                                       // Retry
-                                       Thread.sleep(50);
-                               }
-                       }
-
-                       assertTrue("Did not succeed query", success);
-               }
-       }
-
-       /**
-        * Retry a query for state for keys between 0 and {@link 
#maxParallelism} until
-        * <tt>expected</tt> equals the value of the result tuple's second 
field.
-        */
-       private void executeQuery(
-                       final Deadline deadline,
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final TypeSerializer<Tuple2<Integer, Long>> 
valueSerializer,
-                       final long expected) throws Exception {
-
-               for (int key = 0; key < maxParallelism; key++) {
-                       boolean success = false;
-                       while (deadline.hasTimeLeft() && !success) {
-                               Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                                               jobId,
-                                               queryableStateName,
-                                               key,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               valueSerializer,
-                                               QUERY_RETRY_DELAY,
-                                               false);
-
-                               Tuple2<Integer, Long> value = 
Await.result(future, deadline.timeLeft());
-
-                               assertEquals("Key mismatch", key, 
value.f0.intValue());
-                               if (expected == value.f1) {
-                                       success = true;
-                               } else {
-                                       // Retry
-                                       Thread.sleep(50);
-                               }
-                       }
-
-                       assertTrue("Did not succeed query", success);
-               }
-       }
-
-       /**
-        * Tests simple value state queryable state instance with a default 
value
-        * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
-        * tuples, the key is mapped to 1 but key 0 is queried which should 
throw
-        * a {@link UnknownKeyOrNamespace} exception.
-        *
-        * @throws UnknownKeyOrNamespace thrown due querying a non-existent key
-        */
-       @Test(expected = UnknownKeyOrNamespace.class)
-       public void testValueStateDefault() throws
-               Exception, UnknownKeyOrNamespace {
-
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env =
-                               
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       env.setRestartStrategy(RestartStrategies
-                               .fixedDelayRestart(Integer.MAX_VALUE, 1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                               .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Value state
-                       ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
-                               new ValueStateDescriptor<>(
-                                       "any",
-                                       source.getType(),
-                                       Tuple2.of(0, 1337L));
-
-                       // only expose key "1"
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>>
-                               queryableState =
-                               source.keyBy(
-                                       new KeySelector<Tuple2<Integer, Long>, 
Integer>() {
-                                               private static final long 
serialVersionUID = 4509274556892655887L;
-
-                                               @Override
-                                               public Integer getKey(
-                                                       Tuple2<Integer, Long> 
value) throws
-                                                       Exception {
-                                                       return 1;
-                                               }
-                                       }).asQueryableState("hakuna", 
valueState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       // Now query
-                       int key = 0;
-                       Future<Tuple2<Integer, Long>> future = 
getKvStateWithRetries(client,
-                               jobId,
-                               queryableState.getQueryableStateName(),
-                               key,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               valueState,
-                               QUERY_RETRY_DELAY,
-                               true);
-
-                       Await.result(future, deadline.timeLeft());
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                       .getLeaderGateway(deadline.timeLeft())
-                                       .ask(new 
JobManagerMessages.CancelJob(jobId),
-                                               deadline.timeLeft())
-                                       
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
-                                               CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests simple value state queryable state instance. Each source emits
-        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
-        * queried. The tests succeeds after each subtask index is queried with
-        * value numElements (the latest element updated the state).
-        *
-        * <p>This is the same as the simple value state test, but uses the API 
shortcut.
-        */
-       @Test
-       public void testValueStateShortcut() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Value state shortcut
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 9168901838808830068L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("matata");
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       // Now query
-                       long expected = numElements;
-
-                       executeQuery(deadline, client, jobId, "matata",
-                                       queryableState.getValueSerializer(), 
expected);
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests simple folding state queryable state instance. Each source 
emits
-        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
-        * queried. The folding state sums these up and maps them to Strings. 
The
-        * test succeeds after each subtask index is queried with result 
n*(n+1)/2
-        * (as a String).
-        */
-       @Test
-       public void testFoldingState() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Folding state
-                       FoldingStateDescriptor<Tuple2<Integer, Long>, String> 
foldingState =
-                                       new FoldingStateDescriptor<>(
-                                                       "any",
-                                                       "0",
-                                                       new SumFold(),
-                                                       
StringSerializer.INSTANCE);
-
-                       QueryableStateStream<Integer, String> queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = -842809958106747539L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("pumba", 
foldingState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       // Now query
-                       String expected = Integer.toString(numElements * 
(numElements + 1) / 2);
-
-                       for (int key = 0; key < maxParallelism; key++) {
-                               boolean success = false;
-                               while (deadline.hasTimeLeft() && !success) {
-                                       Future<String> future = 
getKvStateWithRetries(client,
-                                                       jobId,
-                                                       
queryableState.getQueryableStateName(),
-                                                       key,
-                                                       
BasicTypeInfo.INT_TYPE_INFO,
-                                                       foldingState,
-                                                       QUERY_RETRY_DELAY,
-                                                       false);
-
-                                       String value = Await.result(future, 
deadline.timeLeft());
-                                       if (expected.equals(value)) {
-                                               success = true;
-                                       } else {
-                                               // Retry
-                                               Thread.sleep(50);
-                                       }
-                               }
-
-                               assertTrue("Did not succeed query", success);
-                       }
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       /**
-        * Tests simple reducing state queryable state instance. Each source 
emits
-        * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
-        * queried. The reducing state instance sums these up. The test succeeds
-        * after each subtask index is queried with result n*(n+1)/2.
-        */
-       @Test
-       public void testReducingState() throws Exception {
-               // Config
-               final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-               final int numElements = 1024;
-
-               final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
-
-               JobID jobId = null;
-               try {
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-                       env.setStateBackend(stateBackend);
-                       env.setParallelism(maxParallelism);
-                       // Very important, because cluster is shared between 
tests and we
-                       // don't explicitly check that all slots are available 
before
-                       // submitting.
-                       
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
1000));
-
-                       DataStream<Tuple2<Integer, Long>> source = env
-                                       .addSource(new 
TestAscendingValueSource(numElements));
-
-                       // Reducing state
-                       ReducingStateDescriptor<Tuple2<Integer, Long>> 
reducingState =
-                                       new ReducingStateDescriptor<>(
-                                                       "any",
-                                                       new SumReduce(),
-                                                       source.getType());
-
-                       QueryableStateStream<Integer, Tuple2<Integer, Long>> 
queryableState =
-                                       source.keyBy(new 
KeySelector<Tuple2<Integer, Long>, Integer>() {
-                                               private static final long 
serialVersionUID = 8470749712274833552L;
-
-                                               @Override
-                                               public Integer 
getKey(Tuple2<Integer, Long> value) throws Exception {
-                                                       return value.f0;
-                                               }
-                                       }).asQueryableState("jungle", 
reducingState);
-
-                       // Submit the job graph
-                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-                       jobId = jobGraph.getJobID();
-
-                       cluster.submitJobDetached(jobGraph);
-
-                       // Wait until job is running
-
-                       // Now query
-                       long expected = numElements * (numElements + 1) / 2;
-
-                       executeQuery(deadline, client, jobId, "jungle", 
reducingState, expected);
-               } finally {
-                       // Free cluster resources
-                       if (jobId != null) {
-                               Future<CancellationSuccess> cancellation = 
cluster
-                                               
.getLeaderGateway(deadline.timeLeft())
-                                               .ask(new 
JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-                                               
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-                               Await.ready(cancellation, deadline.timeLeft());
-                       }
-
-                       client.shutDown();
-               }
-       }
-
-       private static <K, V> Future<V> getKvStateWithRetries(
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeSerializer<V> valueTypeSerializer,
-                       final FiniteDuration retryDelay,
-                       final boolean failForUnknownKeyOrNamespace) {
-
-               return client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
valueTypeSerializer)
-                               .recoverWith(new Recover<Future<V>>() {
-                                       @Override
-                                       public Future<V> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
AssertionError) {
-                                                       return 
Futures.failed(failure);
-                                               } else if 
(failForUnknownKeyOrNamespace &&
-                                                               (failure 
instanceof UnknownKeyOrNamespace)) {
-                                                       return 
Futures.failed(failure);
-                                               } else {
-                                                       // At startup some 
failures are expected
-                                                       // due to races. Make 
sure that they don't
-                                                       // fail this test.
-                                                       return Patterns.after(
-                                                                       
retryDelay,
-                                                                       
testActorSystem.scheduler(),
-                                                                       
testActorSystem.dispatcher(),
-                                                                       new 
Callable<Future<V>>() {
-                                                                               
@Override
-                                                                               
public Future<V> call() throws Exception {
-                                                                               
        return getKvStateWithRetries(
-                                                                               
                        client,
-                                                                               
                        jobId,
-                                                                               
                        queryName,
-                                                                               
                        key,
-                                                                               
                        keyTypeInfo,
-                                                                               
                        valueTypeSerializer,
-                                                                               
                        retryDelay,
-                                                                               
                        failForUnknownKeyOrNamespace);
-                                                                               
}
-                                                                       });
-                                               }
-                                       }
-                               }, testActorSystem.dispatcher());
-
-       }
-
-       private static <K, V> Future<V> getKvStateWithRetries(
-                       final QueryableStateClient client,
-                       final JobID jobId,
-                       final String queryName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor,
-                       final FiniteDuration retryDelay,
-                       final boolean failForUnknownKeyOrNamespace) {
-
-               return client.getKvState(jobId, queryName, key, 
VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor)
-                               .recoverWith(new Recover<Future<V>>() {
-                                       @Override
-                                       public Future<V> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
AssertionError) {
-                                                       return 
Futures.failed(failure);
-                                               } else if 
(failForUnknownKeyOrNamespace &&
-                                                               (failure 
instanceof UnknownKeyOrNamespace)) {
-                                                       return 
Futures.failed(failure);
-                                               } else {
-                                                       // At startup some 
failures are expected
-                                                       // due to races. Make 
sure that they don't
-                                                       // fail this test.
-                                                       return Patterns.after(
-                                                                       
retryDelay,
-                                                                       
testActorSystem.scheduler(),
-                                                                       
testActorSystem.dispatcher(),
-                                                                       new 
Callable<Future<V>>() {
-                                                                               
@Override
-                                                                               
public Future<V> call() throws Exception {
-                                                                               
        return getKvStateWithRetries(
-                                                                               
                        client,
-                                                                               
                        jobId,
-                                                                               
                        queryName,
-                                                                               
                        key,
-                                                                               
                        keyTypeInfo,
-                                                                               
                        stateDescriptor,
-                                                                               
                        retryDelay,
-                                                                               
                        failForUnknownKeyOrNamespace);
-                                                                               
}
-                                                                       });
-                                               }
-                                       }
-                               }, testActorSystem.dispatcher());
-       }
-
-       /**
-        * Test source producing (key, 0)..(key, maxValue) with key being the 
sub
-        * task index.
-        *
-        * <p>After all tuples have been emitted, the source waits to be 
cancelled
-        * and does not immediately finish.
-        */
-       private static class TestAscendingValueSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>> {
-
-               private static final long serialVersionUID = 
1459935229498173245L;
-
-               private final long maxValue;
-               private volatile boolean isRunning = true;
-
-               TestAscendingValueSource(long maxValue) {
-                       Preconditions.checkArgument(maxValue >= 0);
-                       this.maxValue = maxValue;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, Long>> ctx) 
throws Exception {
-                       // f0 => key
-                       int key = getRuntimeContext().getIndexOfThisSubtask();
-                       Tuple2<Integer, Long> record = new Tuple2<>(key, 0L);
-
-                       long currentValue = 0;
-                       while (isRunning && currentValue <= maxValue) {
-                               synchronized (ctx.getCheckpointLock()) {
-                                       record.f1 = currentValue;
-                                       ctx.collect(record);
-                               }
-
-                               currentValue++;
-                       }
-
-                       while (isRunning) {
-                               synchronized (this) {
-                                       this.wait();
-                               }
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-
-                       synchronized (this) {
-                               this.notifyAll();
-                       }
-               }
-
-       }
-
-       /**
-        * Test source producing (key, 1) tuples with random key in key range 
(numKeys).
-        */
-       protected static class TestKeyRangeSource extends 
RichParallelSourceFunction<Tuple2<Integer, Long>>
-                       implements CheckpointListener {
-               private static final long serialVersionUID = 
-5744725196953582710L;
-
-               private static final AtomicLong LATEST_CHECKPOINT_ID = new 
AtomicLong();
-               private final int numKeys;
-               private final ThreadLocalRandom random = 
ThreadLocalRandom.current();
-               private volatile boolean isRunning = true;
-
-               TestKeyRangeSource(int numKeys) {
-                       this.numKeys = numKeys;
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       super.open(parameters);
-                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                               LATEST_CHECKPOINT_ID.set(0);
-                       }
-               }
-
-               @Override
-               public void run(SourceContext<Tuple2<Integer, Long>> ctx) 
throws Exception {
-                       // f0 => key
-                       Tuple2<Integer, Long> record = new Tuple2<>(0, 1L);
-
-                       while (isRunning) {
-                               synchronized (ctx.getCheckpointLock()) {
-                                       record.f0 = random.nextInt(numKeys);
-                                       ctx.collect(record);
-                               }
-                               // mild slow down
-                               Thread.sleep(1);
-                       }
-               }
-
-               @Override
-               public void cancel() {
-                       isRunning = false;
-               }
-
-               @Override
-               public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
-                       if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
-                               LATEST_CHECKPOINT_ID.set(checkpointId);
-                       }
-               }
-       }
-
-       /**
-        * Test {@link FoldFunction} concatenating the already stored string 
with the long passed as argument.
-        */
-       private static class SumFold implements FoldFunction<Tuple2<Integer, 
Long>, String> {
-               private static final long serialVersionUID = 
-6249227626701264599L;
-
-               @Override
-               public String fold(String accumulator, Tuple2<Integer, Long> 
value) throws Exception {
-                       long acc = Long.valueOf(accumulator);
-                       acc += value.f1;
-                       return Long.toString(acc);
-               }
-       }
-
-       /**
-        * Test {@link ReduceFunction} summing up its two arguments.
-        */
-       protected static class SumReduce implements 
ReduceFunction<Tuple2<Integer, Long>> {
-               private static final long serialVersionUID = 
-8651235077342052336L;
-
-               @Override
-               public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> 
value1, Tuple2<Integer, Long> value2) throws Exception {
-                       value1.f1 += value2.f1;
-                       return value1;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
deleted file mode 100644
index cd89e00..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.apache.curator.test.TestingServer;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.rules.TemporaryFolder;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the NON-HA mode.
- */
-public abstract class HAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
-
-       private static final int NUM_JMS = 2;
-       private static final int NUM_TMS = 4;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       private static TestingServer zkServer;
-       private static TemporaryFolder temporaryFolder;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       zkServer = new TestingServer();
-                       temporaryFolder = new TemporaryFolder();
-                       temporaryFolder.create();
-
-                       Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
-                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
temporaryFolder.newFolder().toString());
-                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
-                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start();
-
-                       testActorSystem = AkkaUtils.createDefaultActorSystem();
-
-                       // verify that we are in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.ZOOKEEPER);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() {
-               if (cluster != null) {
-                       cluster.stop();
-                       cluster.awaitTermination();
-               }
-
-               testActorSystem.shutdown();
-               testActorSystem.awaitTermination();
-
-               try {
-                       zkServer.stop();
-                       zkServer.close();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-
-               temporaryFolder.delete();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index 5d5b671..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
FsStateBackend}.
- */
-public class HAQueryableStateITCaseFsBackend extends 
HAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index 22570b5..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/HAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
- */
-public class HAQueryableStateITCaseRocksDBBackend extends 
HAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
deleted file mode 100644
index 0c628e4..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.contrib.streaming.state.PredefinedOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.File;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Additional tests for the serialization and deserialization of {@link
- * org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer}
- * with a RocksDB state back-end.
- */
-public final class KVStateRequestSerializerRocksDBTest {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       /**
-        * Extension of {@link RocksDBKeyedStateBackend} to make {@link
-        * #createListState(TypeSerializer, ListStateDescriptor)} public for 
use in
-        * the tests.
-        *
-        * @param <K> key type
-        */
-       static final class RocksDBKeyedStateBackend2<K> extends 
RocksDBKeyedStateBackend<K> {
-
-               RocksDBKeyedStateBackend2(
-                               final String operatorIdentifier,
-                               final ClassLoader userCodeClassLoader,
-                               final File instanceBasePath,
-                               final DBOptions dbOptions,
-                               final ColumnFamilyOptions columnFamilyOptions,
-                               final TaskKvStateRegistry kvStateRegistry,
-                               final TypeSerializer<K> keySerializer,
-                               final int numberOfKeyGroups,
-                               final KeyGroupRange keyGroupRange,
-                               final ExecutionConfig executionConfig) throws 
Exception {
-
-                       super(operatorIdentifier, userCodeClassLoader,
-                               instanceBasePath,
-                               dbOptions, columnFamilyOptions, 
kvStateRegistry, keySerializer,
-                               numberOfKeyGroups, keyGroupRange, 
executionConfig, false);
-               }
-
-               @Override
-               public <N, T> InternalListState<N, T> createListState(
-                       final TypeSerializer<N> namespaceSerializer,
-                       final ListStateDescriptor<T> stateDesc) throws 
Exception {
-
-                       return super.createListState(namespaceSerializer, 
stateDesc);
-               }
-       }
-
-       /**
-        * Tests list serialization and deserialization match.
-        *
-        * @see KvStateRequestSerializerTest#testListSerialization()
-        * KvStateRequestSerializerTest#testListSerialization() using the heap 
state back-end
-        * test
-        */
-       @Test
-       public void testListSerialization() throws Exception {
-               final long key = 0L;
-
-               // objects for RocksDB state list serialisation
-               DBOptions dbOptions = 
PredefinedOptions.DEFAULT.createDBOptions();
-               dbOptions.setCreateIfMissing(true);
-               ColumnFamilyOptions columnFamilyOptions = 
PredefinedOptions.DEFAULT.createColumnOptions();
-               final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend 
=
-                       new RocksDBKeyedStateBackend2<>(
-                               "no-op",
-                               ClassLoader.getSystemClassLoader(),
-                               temporaryFolder.getRoot(),
-                               dbOptions,
-                               columnFamilyOptions,
-                               mock(TaskKvStateRegistry.class),
-                               LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0),
-                               new ExecutionConfig()
-                       );
-               longHeapKeyedStateBackend.restore(null);
-               longHeapKeyedStateBackend.setCurrentKey(key);
-
-               final InternalListState<VoidNamespace, Long> listState = 
longHeapKeyedStateBackend
-                       .createListState(VoidNamespaceSerializer.INSTANCE,
-                               new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
-
-               KvStateRequestSerializerTest.testListSerialization(key, 
listState);
-       }
-
-       /**
-        * Tests map serialization and deserialization match.
-        *
-        * @see KvStateRequestSerializerTest#testMapSerialization()
-        * KvStateRequestSerializerTest#testMapSerialization() using the heap 
state back-end
-        * test
-        */
-       @Test
-       public void testMapSerialization() throws Exception {
-               final long key = 0L;
-
-               // objects for RocksDB state list serialisation
-               DBOptions dbOptions = 
PredefinedOptions.DEFAULT.createDBOptions();
-               dbOptions.setCreateIfMissing(true);
-               ColumnFamilyOptions columnFamilyOptions = 
PredefinedOptions.DEFAULT.createColumnOptions();
-               final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                       new RocksDBKeyedStateBackend<>(
-                               "no-op",
-                               ClassLoader.getSystemClassLoader(),
-                               temporaryFolder.getRoot(),
-                               dbOptions,
-                               columnFamilyOptions,
-                               mock(TaskKvStateRegistry.class),
-                               LongSerializer.INSTANCE,
-                               1, new KeyGroupRange(0, 0),
-                               new ExecutionConfig(),
-                               false);
-               longHeapKeyedStateBackend.restore(null);
-               longHeapKeyedStateBackend.setCurrentKey(key);
-
-               final InternalMapState<VoidNamespace, Long, String> mapState = 
(InternalMapState<VoidNamespace, Long, String>)
-                               longHeapKeyedStateBackend.getPartitionedState(
-                                               VoidNamespace.INSTANCE,
-                                               
VoidNamespaceSerializer.INSTANCE,
-                                               new 
MapStateDescriptor<>("test", LongSerializer.INSTANCE, 
StringSerializer.INSTANCE));
-
-               KvStateRequestSerializerTest.testMapSerialization(key, 
mapState);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
deleted file mode 100644
index 83f86e4..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAAbstractQueryableStateITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class with the cluster configuration for the tests on the HA mode.
- */
-public abstract class NonHAAbstractQueryableStateITCase extends 
AbstractQueryableStateITCase {
-
-       private static final int NUM_TMS = 2;
-       private static final int NUM_SLOTS_PER_TM = 4;
-
-       @BeforeClass
-       public static void setup() {
-               try {
-                       Configuration config = new Configuration();
-                       config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
4L);
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
NUM_SLOTS_PER_TM);
-                       
config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
-                       config.setBoolean(QueryableStateOptions.SERVER_ENABLE, 
true);
-                       
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
-
-                       cluster = new TestingCluster(config, false);
-                       cluster.start(true);
-
-                       testActorSystem = AkkaUtils.createDefaultActorSystem();
-
-                       // verify that we are not in HA mode
-                       Assert.assertTrue(cluster.haMode() == 
HighAvailabilityMode.NONE);
-
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void tearDown() {
-               try {
-                       cluster.shutdown();
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-
-               if (testActorSystem != null) {
-                       testActorSystem.shutdown();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
deleted file mode 100644
index d4dbe83..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseFsBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
FsStateBackend}.
- */
-public class NonHAQueryableStateITCaseFsBackend extends 
NonHAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
FsStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
deleted file mode 100644
index a15e6a4..0000000
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/NonHAQueryableStateITCaseRocksDBBackend.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.query;
-
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
-
-/**
- * Several integration tests for queryable state using the {@link 
RocksDBStateBackend}.
- */
-public class NonHAQueryableStateITCaseRocksDBBackend extends 
NonHAAbstractQueryableStateITCase {
-
-       @Rule
-       public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       @Override
-       protected AbstractStateBackend createStateBackend() throws Exception {
-               return new 
RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 524e718..1bb3732 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@ under the License.
                <module>flink-yarn</module>
                <module>flink-yarn-tests</module>
                <module>flink-fs-tests</module>
+               <module>flink-queryable-state</module>
        </modules>
 
        <properties>

Reply via email to