http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
new file mode 100644
index 0000000..c37c822
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -0,0 +1,728 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.ChunkedByteBuf;
+import org.apache.flink.queryablestate.server.KvStateServerHandler;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link KvStateServerHandler}.
+ */
+public class KvStateServerHandlerTest extends TestLogger {
+
+       /** Shared Thread pool for query execution. */
+       private static final ExecutorService TEST_THREAD_POOL = 
Executors.newSingleThreadExecutor();
+
+       private static final int READ_TIMEOUT_MILLIS = 10000;
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (TEST_THREAD_POOL != null) {
+                       TEST_THREAD_POOL.shutdown();
+               }
+       }
+
+       /**
+        * Tests a simple successful query via an EmbeddedChannel.
+        */
+       @Test
+       public void testSimpleQuery() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               // Register state
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+               desc.setQueryable("vanilla");
+
+               int numKeyGroups = 1;
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
+               // Update the KvState and request it
+               int expectedValue = 712828289;
+
+               int key = 99812822;
+               backend.setCurrentKey(key);
+               ValueState<Integer> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               desc);
+
+               state.update(expectedValue);
+
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                               key,
+                               IntSerializer.INSTANCE,
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE);
+
+               long requestId = Integer.MAX_VALUE + 182828L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               requestId,
+                               registryListener.kvStateId,
+                               serializedKeyAndNamespace);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestResult response = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+
+               assertEquals(requestId, response.getRequestId());
+
+               int actualValue = 
KvStateSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
+               assertEquals(expectedValue, actualValue);
+
+               assertEquals(stats.toString(), 1, stats.getNumRequests());
+
+               // Wait for async successful request report
+               long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+               while (stats.getNumSuccessful() != 1 && System.nanoTime() <= 
deadline) {
+                       Thread.sleep(10);
+               }
+
+               assertEquals(stats.toString(), 1, stats.getNumSuccessful());
+       }
+
+       /**
+        * Tests the failure response with {@link UnknownKvStateID} as cause on
+        * queries for unregistered KvStateIDs.
+        */
+       @Test
+       public void testQueryUnknownKvStateID() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               long requestId = Integer.MAX_VALUE + 182828L;
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               requestId,
+                               new KvStateID(),
+                               new byte[0]);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+               assertEquals(requestId, response.getRequestId());
+
+               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKvStateID);
+
+               assertEquals(1, stats.getNumRequests());
+               assertEquals(1, stats.getNumFailed());
+       }
+
+       /**
+        * Tests the failure response with {@link UnknownKeyOrNamespace} as 
cause
+        * on queries for non-existing keys.
+        */
+       @Test
+       public void testQueryUnknownKey() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               int numKeyGroups = 1;
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
+               // Register state
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+               desc.setQueryable("vanilla");
+
+               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
+
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                               1238283,
+                               IntSerializer.INSTANCE,
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE);
+
+               long requestId = Integer.MAX_VALUE + 22982L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               requestId,
+                               registryListener.kvStateId,
+                               serializedKeyAndNamespace);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+               assertEquals(requestId, response.getRequestId());
+
+               assertTrue("Did not respond with expected failure cause", 
response.getCause() instanceof UnknownKeyOrNamespace);
+
+               assertEquals(1, stats.getNumRequests());
+               assertEquals(1, stats.getNumFailed());
+       }
+
+       /**
+        * Tests the failure response on a failure on the {@link 
InternalKvState#getSerializedValue(byte[])}
+        * call.
+        */
+       @Test
+       public void testFailureOnGetSerializedValue() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               // Failing KvState
+               InternalKvState<?> kvState = mock(InternalKvState.class);
+               when(kvState.getSerializedValue(any(byte[].class)))
+                               .thenThrow(new RuntimeException("Expected test 
Exception"));
+
+               KvStateID kvStateId = registry.registerKvState(
+                               new JobID(),
+                               new JobVertexID(),
+                               new KeyGroupRange(0, 0),
+                               "vanilla",
+                               kvState);
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               282872,
+                               kvStateId,
+                               new byte[0]);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+               assertTrue(response.getCause().getMessage().contains("Expected 
test Exception"));
+
+               assertEquals(1, stats.getNumRequests());
+               assertEquals(1, stats.getNumFailed());
+       }
+
+       /**
+        * Tests that the channel is closed if an Exception reaches the channel
+        * handler.
+        */
+       @Test
+       public void testCloseChannelOnExceptionCaught() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+               channel.pipeline().fireExceptionCaught(new 
RuntimeException("Expected test Exception"));
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               Throwable response = 
MessageSerializer.deserializeServerFailure(buf);
+
+               assertTrue(response.getMessage().contains("Expected test 
Exception"));
+
+               channel.closeFuture().await(READ_TIMEOUT_MILLIS);
+               assertFalse(channel.isActive());
+       }
+
+       /**
+        * Tests the failure response on a rejected execution, because the query
+        * executor has been closed.
+        */
+       @Test
+       public void testQueryExecutorShutDown() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               ExecutorService closedExecutor = 
Executors.newSingleThreadExecutor();
+               closedExecutor.shutdown();
+               assertTrue(closedExecutor.isShutdown());
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, closedExecutor, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               int numKeyGroups = 1;
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               KeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
+               // Register state
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+               desc.setQueryable("vanilla");
+
+               backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, desc);
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               282872,
+                               registryListener.kvStateId,
+                               new byte[0]);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+
+               
assertTrue(response.getCause().getMessage().contains("RejectedExecutionException"));
+
+               assertEquals(1, stats.getNumRequests());
+               assertEquals(1, stats.getNumFailed());
+       }
+
+       /**
+        * Tests response on unexpected messages.
+        */
+       @Test
+       public void testUnexpectedMessage() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               // Write the request and wait for the response
+               ByteBuf unexpectedMessage = Unpooled.buffer(8);
+               unexpectedMessage.writeInt(4);
+               unexpectedMessage.writeInt(123238213);
+
+               channel.writeInbound(unexpectedMessage);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               Throwable response = 
MessageSerializer.deserializeServerFailure(buf);
+
+               assertEquals(0, stats.getNumRequests());
+               assertEquals(0, stats.getNumFailed());
+
+               unexpectedMessage = 
MessageSerializer.serializeKvStateRequestResult(
+                               channel.alloc(),
+                               192,
+                               new byte[0]);
+
+               channel.writeInbound(unexpectedMessage);
+
+               buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               response = MessageSerializer.deserializeServerFailure(buf);
+
+               assertTrue("Unexpected failure cause " + 
response.getClass().getName(), response instanceof IllegalArgumentException);
+
+               assertEquals(0, stats.getNumRequests());
+               assertEquals(0, stats.getNumFailed());
+       }
+
+       /**
+        * Tests that incoming buffer instances are recycled.
+        */
+       @Test
+       public void testIncomingBufferIsRecycled() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               282872,
+                               new KvStateID(),
+                               new byte[0]);
+
+               assertEquals(1, request.refCnt());
+
+               // Write regular request
+               channel.writeInbound(request);
+               assertEquals("Buffer not recycled", 0, request.refCnt());
+
+               // Write unexpected msg
+               ByteBuf unexpected = channel.alloc().buffer(8);
+               unexpected.writeInt(4);
+               unexpected.writeInt(4);
+
+               assertEquals(1, unexpected.refCnt());
+
+               channel.writeInbound(unexpected);
+               assertEquals("Buffer not recycled", 0, unexpected.refCnt());
+       }
+
+       /**
+        * Tests the failure response if the serializers don't match.
+        */
+       @Test
+       public void testSerializerMismatch() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               AtomicKvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               int numKeyGroups = 1;
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
+               // Register state
+               ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+               desc.setQueryable("vanilla");
+
+               ValueState<Integer> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               desc);
+
+               int key = 99812822;
+
+               // Update the KvState
+               backend.setCurrentKey(key);
+               state.update(712828289);
+
+               byte[] wrongKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                               "wrong-key-type",
+                               StringSerializer.INSTANCE,
+                               "wrong-namespace-type",
+                               StringSerializer.INSTANCE);
+
+               byte[] wrongNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                               key,
+                               IntSerializer.INSTANCE,
+                               "wrong-namespace-type",
+                               StringSerializer.INSTANCE);
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               182828,
+                               registryListener.kvStateId,
+                               wrongKeyAndNamespace);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               ByteBuf buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               KvStateRequestFailure response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               assertEquals(182828, response.getRequestId());
+               
assertTrue(response.getCause().getMessage().contains("IOException"));
+
+               // Repeat with wrong namespace only
+               request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               182829,
+                               registryListener.kvStateId,
+                               wrongNamespace);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               buf = (ByteBuf) readInboundBlocking(channel);
+               buf.skipBytes(4); // skip frame length
+
+               // Verify the response
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               response = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
+               assertEquals(182829, response.getRequestId());
+               
assertTrue(response.getCause().getMessage().contains("IOException"));
+
+               assertEquals(2, stats.getNumRequests());
+               assertEquals(2, stats.getNumFailed());
+       }
+
+       /**
+        * Tests that large responses are chunked.
+        */
+       @Test
+       public void testChunkedResponse() throws Exception {
+               KvStateRegistry registry = new KvStateRegistry();
+               KvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+               KvStateServerHandler handler = new 
KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
+               EmbeddedChannel channel = new 
EmbeddedChannel(getFrameDecoder(), handler);
+
+               int numKeyGroups = 1;
+               AbstractStateBackend abstractBackend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+               dummyEnv.setKvStateRegistry(registry);
+               AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                               dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+
+               final TestRegistryListener registryListener = new 
TestRegistryListener();
+               registry.registerListener(registryListener);
+
+               // Register state
+               ValueStateDescriptor<byte[]> desc = new 
ValueStateDescriptor<>("any", BytePrimitiveArraySerializer.INSTANCE);
+               desc.setQueryable("vanilla");
+
+               ValueState<byte[]> state = backend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               desc);
+
+               // Update KvState
+               byte[] bytes = new byte[2 * 
channel.config().getWriteBufferHighWaterMark()];
+
+               byte current = 0;
+               for (int i = 0; i < bytes.length; i++) {
+                       bytes[i] = current++;
+               }
+
+               int key = 99812822;
+               backend.setCurrentKey(key);
+               state.update(bytes);
+
+               // Request
+               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                               key,
+                               IntSerializer.INSTANCE,
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE);
+
+               long requestId = Integer.MAX_VALUE + 182828L;
+
+               assertTrue(registryListener.registrationName.equals("vanilla"));
+
+               ByteBuf request = MessageSerializer.serializeKvStateRequest(
+                               channel.alloc(),
+                               requestId,
+                               registryListener.kvStateId,
+                               serializedKeyAndNamespace);
+
+               // Write the request and wait for the response
+               channel.writeInbound(request);
+
+               Object msg = readInboundBlocking(channel);
+               assertTrue("Not ChunkedByteBuf", msg instanceof ChunkedByteBuf);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Queries the embedded channel for data.
+        */
+       private Object readInboundBlocking(EmbeddedChannel channel) throws 
InterruptedException, TimeoutException {
+               final int sleepMillis = 50;
+
+               int sleptMillis = 0;
+
+               Object msg = null;
+               while (sleptMillis < READ_TIMEOUT_MILLIS &&
+                               (msg = channel.readOutbound()) == null) {
+
+                       Thread.sleep(sleepMillis);
+                       sleptMillis += sleepMillis;
+               }
+
+               if (msg == null) {
+                       throw new TimeoutException();
+               } else {
+                       return msg;
+               }
+       }
+
+       /**
+        * Frame length decoder (expected by the serialized messages).
+        */
+       private ChannelHandler getFrameDecoder() {
+               return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 
4, 0, 4);
+       }
+
+       /**
+        * A listener that keeps the last updated KvState information so that a 
test
+        * can retrieve it.
+        */
+       static class TestRegistryListener implements KvStateRegistryListener {
+               volatile JobVertexID jobVertexID;
+               volatile KeyGroupRange keyGroupIndex;
+               volatile String registrationName;
+               volatile KvStateID kvStateId;
+
+               @Override
+               public void notifyKvStateRegistered(JobID jobId,
+                               JobVertexID jobVertexId,
+                               KeyGroupRange keyGroupRange,
+                               String registrationName,
+                               KvStateID kvStateId) {
+                       this.jobVertexID = jobVertexId;
+                       this.keyGroupIndex = keyGroupRange;
+                       this.registrationName = registrationName;
+                       this.kvStateId = kvStateId;
+               }
+
+               @Override
+               public void notifyKvStateUnregistered(JobID jobId,
+                               JobVertexID jobVertexId,
+                               KeyGroupRange keyGroupRange,
+                               String registrationName) {
+
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
new file mode 100644
index 0000000..9332e68
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateServerTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KvStateServer}.
+ */
+public class KvStateServerTest {
+
+       // Thread pool for client bootstrap (shared between tests)
+       private static final NioEventLoopGroup NIO_GROUP = new 
NioEventLoopGroup();
+
+       private static final int TIMEOUT_MILLIS = 10000;
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (NIO_GROUP != null) {
+                       NIO_GROUP.shutdownGracefully();
+               }
+       }
+
+       /**
+        * Tests a simple successful query via a SocketChannel.
+        */
+       @Test
+       public void testSimpleRequest() throws Exception {
+               KvStateServer server = null;
+               Bootstrap bootstrap = null;
+               try {
+                       KvStateRegistry registry = new KvStateRegistry();
+                       KvStateRequestStats stats = new 
AtomicKvStateRequestStats();
+
+                       server = new 
KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registry, stats);
+                       server.start();
+
+                       KvStateServerAddress serverAddress = 
server.getAddress();
+                       int numKeyGroups = 1;
+                       AbstractStateBackend abstractBackend = new 
MemoryStateBackend();
+                       DummyEnvironment dummyEnv = new 
DummyEnvironment("test", 1, 0);
+                       dummyEnv.setKvStateRegistry(registry);
+                       AbstractKeyedStateBackend<Integer> backend = 
abstractBackend.createKeyedStateBackend(
+                                       dummyEnv,
+                                       new JobID(),
+                                       "test_op",
+                                       IntSerializer.INSTANCE,
+                                       numKeyGroups,
+                                       new KeyGroupRange(0, 0),
+                                       registry.createTaskRegistry(new 
JobID(), new JobVertexID()));
+
+                       final KvStateServerHandlerTest.TestRegistryListener 
registryListener =
+                                       new 
KvStateServerHandlerTest.TestRegistryListener();
+
+                       registry.registerListener(registryListener);
+
+                       ValueStateDescriptor<Integer> desc = new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+                       desc.setQueryable("vanilla");
+
+                       ValueState<Integer> state = backend.getPartitionedState(
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE,
+                                       desc);
+
+                       // Update KvState
+                       int expectedValue = 712828289;
+
+                       int key = 99812822;
+                       backend.setCurrentKey(key);
+                       state.update(expectedValue);
+
+                       // Request
+                       byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
+                                       key,
+                                       IntSerializer.INSTANCE,
+                                       VoidNamespace.INSTANCE,
+                                       VoidNamespaceSerializer.INSTANCE);
+
+                       // Connect to the server
+                       final BlockingQueue<ByteBuf> responses = new 
LinkedBlockingQueue<>();
+                       bootstrap = createBootstrap(
+                                       new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+                                       new ChannelInboundHandlerAdapter() {
+                                               @Override
+                                               public void 
channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+                                                       responses.add((ByteBuf) 
msg);
+                                               }
+                                       });
+
+                       Channel channel = bootstrap
+                                       .connect(serverAddress.getHost(), 
serverAddress.getPort())
+                                       .sync().channel();
+
+                       long requestId = Integer.MAX_VALUE + 182828L;
+
+                       
assertTrue(registryListener.registrationName.equals("vanilla"));
+                       ByteBuf request = 
MessageSerializer.serializeKvStateRequest(
+                                       channel.alloc(),
+                                       requestId,
+                                       registryListener.kvStateId,
+                                       serializedKeyAndNamespace);
+
+                       channel.writeAndFlush(request);
+
+                       ByteBuf buf = responses.poll(TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
+
+                       assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+                       KvStateRequestResult response = 
MessageSerializer.deserializeKvStateRequestResult(buf);
+
+                       assertEquals(requestId, response.getRequestId());
+                       int actualValue = 
KvStateSerializer.deserializeValue(response.getSerializedResult(), 
IntSerializer.INSTANCE);
+                       assertEquals(expectedValue, actualValue);
+               } finally {
+                       if (server != null) {
+                               server.shutDown();
+                       }
+
+                       if (bootstrap != null) {
+                               EventLoopGroup group = bootstrap.group();
+                               if (group != null) {
+                                       group.shutdownGracefully();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Creates a client bootstrap.
+        */
+       private Bootstrap createBootstrap(final ChannelHandler... handlers) {
+               return new 
Bootstrap().group(NIO_GROUP).channel(NioSocketChannel.class)
+                               .handler(new 
ChannelInitializer<SocketChannel>() {
+                                       @Override
+                                       protected void 
initChannel(SocketChannel ch) throws Exception {
+                                               ch.pipeline().addLast(handlers);
+                                       }
+                               });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
new file mode 100644
index 0000000..5d4a861
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/QueryableStateClientTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownKvStateID;
+import org.apache.flink.queryablestate.UnknownKvStateKeyGroupLocation;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
+import org.apache.flink.queryablestate.client.QueryableStateClient;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.heap.HeapValueState;
+import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.MathUtils;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link QueryableStateClient}.
+ */
+public class QueryableStateClientTest {
+
+       private static final ActorSystem testActorSystem = 
AkkaUtils.createLocalActorSystem(new Configuration());
+
+       private static final FiniteDuration timeout = new FiniteDuration(100, 
TimeUnit.SECONDS);
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (testActorSystem != null) {
+                       testActorSystem.shutdown();
+               }
+       }
+
+       /**
+        * All failures should lead to a retry with a forced location lookup.
+        *
+        * <p>UnknownKvStateID, UnknownKvStateKeyGroupLocation, 
UnknownKvStateLocation,
+        * ConnectException are checked explicitly as these indicate out-of-sync
+        * KvStateLocation.
+        */
+       @Test
+       public void testForceLookupOnOutdatedLocation() throws Exception {
+               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
+               KvStateClient networkClient = mock(KvStateClient.class);
+
+               QueryableStateClient client = new QueryableStateClient(
+                               lookupService,
+                               networkClient,
+                               testActorSystem.dispatcher());
+
+               try {
+                       JobID jobId = new JobID();
+                       int numKeyGroups = 4;
+
+                       //
+                       // UnknownKvStateLocation
+                       //
+                       String query1 = "lucky";
+
+                       Future<KvStateLocation> unknownKvStateLocation = 
Futures.failed(
+                                       new UnknownKvStateLocation(query1));
+
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query1)))
+                                       .thenReturn(unknownKvStateLocation);
+
+                       Future<Integer> result = client.getKvState(
+                                       jobId,
+                                       query1,
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       try {
+                               Await.result(result, timeout);
+                               fail("Did not throw expected 
UnknownKvStateLocation exception");
+                       } catch (UnknownKvStateLocation ignored) {
+                               // Expected
+                       }
+
+                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query1));
+
+                       //
+                       // UnknownKvStateKeyGroupLocation
+                       //
+                       String query2 = "unlucky";
+
+                       Future<KvStateLocation> unknownKeyGroupLocation = 
Futures.successful(
+                                       new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query2));
+
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query2)))
+                                       .thenReturn(unknownKeyGroupLocation);
+
+                       result = client.getKvState(
+                                       jobId,
+                                       query2,
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       try {
+                               Await.result(result, timeout);
+                               fail("Did not throw expected 
UnknownKvStateKeyGroupLocation exception");
+                       } catch (UnknownKvStateKeyGroupLocation ignored) {
+                               // Expected
+                       }
+
+                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query2));
+
+                       //
+                       // UnknownKvStateID
+                       //
+                       String query3 = "water";
+                       KvStateID kvStateId = new KvStateID();
+                       Future<byte[]> unknownKvStateId = Futures.failed(new 
UnknownKvStateID(kvStateId));
+
+                       KvStateServerAddress serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 12323);
+                       KvStateLocation location = new KvStateLocation(jobId, 
new JobVertexID(), numKeyGroups, query3);
+                       for (int i = 0; i < numKeyGroups; i++) {
+                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
+                       }
+
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query3)))
+                                       
.thenReturn(Futures.successful(location));
+
+                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
+                                       .thenReturn(unknownKvStateId);
+
+                       result = client.getKvState(
+                                       jobId,
+                                       query3,
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       try {
+                               Await.result(result, timeout);
+                               fail("Did not throw expected UnknownKvStateID 
exception");
+                       } catch (UnknownKvStateID ignored) {
+                               // Expected
+                       }
+
+                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query3));
+
+                       //
+                       // ConnectException
+                       //
+                       String query4 = "space";
+                       Future<byte[]> connectException = Futures.failed(new 
ConnectException());
+                       kvStateId = new KvStateID();
+
+                       serverAddress = new 
KvStateServerAddress(InetAddress.getLocalHost(), 11123);
+                       location = new KvStateLocation(jobId, new 
JobVertexID(), numKeyGroups, query4);
+                       for (int i = 0; i < numKeyGroups; i++) {
+                               location.registerKvState(new KeyGroupRange(i, 
i), kvStateId, serverAddress);
+                       }
+
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query4)))
+                                       
.thenReturn(Futures.successful(location));
+
+                       when(networkClient.getKvState(eq(serverAddress), 
eq(kvStateId), any(byte[].class)))
+                                       .thenReturn(connectException);
+
+                       result = client.getKvState(
+                                       jobId,
+                                       query4,
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       try {
+                               Await.result(result, timeout);
+                               fail("Did not throw expected ConnectException 
exception");
+                       } catch (ConnectException ignored) {
+                               // Expected
+                       }
+
+                       verify(lookupService, 
times(2)).getKvStateLookupInfo(eq(jobId), eq(query4));
+
+                       //
+                       // Other Exceptions don't lead to a retry no retry
+                       //
+                       String query5 = "universe";
+                       Future<KvStateLocation> exception = Futures.failed(new 
RuntimeException("Test exception"));
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq(query5)))
+                                       .thenReturn(exception);
+
+                       client.getKvState(
+                                       jobId,
+                                       query5,
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       new ValueStateDescriptor<>("test", 
IntSerializer.INSTANCE));
+
+                       verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId), eq(query5));
+               } finally {
+                       client.shutDown();
+               }
+       }
+
+       /**
+        * Tests queries against multiple servers.
+        *
+        * <p>The servers are populated with different keys and the client 
queries
+        * all available keys from all servers.
+        */
+       @Test
+       public void testIntegrationWithKvStateServer() throws Exception {
+               // Config
+               int numServers = 2;
+               int numKeys = 1024;
+               int numKeyGroups = 1;
+
+               JobID jobId = new JobID();
+               JobVertexID jobVertexId = new JobVertexID();
+
+               KvStateServer[] servers = new KvStateServer[numServers];
+               AtomicKvStateRequestStats[] serverStats = new 
AtomicKvStateRequestStats[numServers];
+
+               QueryableStateClient client = null;
+               KvStateClient networkClient = null;
+               AtomicKvStateRequestStats networkClientStats = new 
AtomicKvStateRequestStats();
+
+               MemoryStateBackend backend = new MemoryStateBackend();
+               DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+
+               AbstractKeyedStateBackend<Integer> keyedStateBackend = 
backend.createKeyedStateBackend(dummyEnv,
+                               new JobID(),
+                               "test_op",
+                               IntSerializer.INSTANCE,
+                               numKeyGroups,
+                               new KeyGroupRange(0, 0),
+                               new KvStateRegistry().createTaskRegistry(new 
JobID(), new JobVertexID()));
+
+               try {
+                       KvStateRegistry[] registries = new 
KvStateRegistry[numServers];
+                       KvStateID[] kvStateIds = new KvStateID[numServers];
+                       List<HeapValueState<Integer, VoidNamespace, Integer>> 
kvStates = new ArrayList<>();
+
+                       // Start the servers
+                       for (int i = 0; i < numServers; i++) {
+                               registries[i] = new KvStateRegistry();
+                               serverStats[i] = new 
AtomicKvStateRequestStats();
+                               servers[i] = new 
KvStateServerImpl(InetAddress.getLocalHost(), 0, 1, 1, registries[i], 
serverStats[i]);
+                               servers[i].start();
+                               ValueStateDescriptor<Integer> descriptor =
+                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+
+                               
RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> 
registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+                                               descriptor.getType(),
+                                               descriptor.getName(),
+                                               
VoidNamespaceSerializer.INSTANCE,
+                                               IntSerializer.INSTANCE);
+
+                               // Register state
+                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = new HeapValueState<>(
+                                               descriptor,
+                                               new 
NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
+                                               IntSerializer.INSTANCE,
+                                               
VoidNamespaceSerializer.INSTANCE);
+
+                               kvStates.add(kvState);
+
+                               kvStateIds[i] = registries[i].registerKvState(
+                                               jobId,
+                                               new JobVertexID(),
+                                               new KeyGroupRange(i, i),
+                                               "choco",
+                                               kvState);
+                       }
+
+                       int[] expectedRequests = new int[numServers];
+
+                       for (int key = 0; key < numKeys; key++) {
+                               int targetKeyGroupIndex = 
MathUtils.murmurHash(key) % numServers;
+                               expectedRequests[targetKeyGroupIndex]++;
+
+                               HeapValueState<Integer, VoidNamespace, Integer> 
kvState = kvStates.get(targetKeyGroupIndex);
+
+                               keyedStateBackend.setCurrentKey(key);
+                               
kvState.setCurrentNamespace(VoidNamespace.INSTANCE);
+                               kvState.update(1337 + key);
+                       }
+
+                       // Location lookup service
+                       KvStateLocation location = new KvStateLocation(jobId, 
jobVertexId, numServers, "choco");
+                       for (int keyGroupIndex = 0; keyGroupIndex < numServers; 
keyGroupIndex++) {
+                               location.registerKvState(new 
KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], 
servers[keyGroupIndex].getAddress());
+                       }
+
+                       KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
+                       when(lookupService.getKvStateLookupInfo(eq(jobId), 
eq("choco")))
+                                       
.thenReturn(Futures.successful(location));
+
+                       // The client
+                       networkClient = new KvStateClient(1, 
networkClientStats);
+
+                       client = new QueryableStateClient(lookupService, 
networkClient, testActorSystem.dispatcher());
+
+                       // Send all queries
+                       List<Future<Integer>> futures = new 
ArrayList<>(numKeys);
+                       for (int key = 0; key < numKeys; key++) {
+                               ValueStateDescriptor<Integer> descriptor =
+                                               new 
ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+                               futures.add(client.getKvState(
+                                               jobId,
+                                               "choco",
+                                               key,
+                                               BasicTypeInfo.INT_TYPE_INFO,
+                                               descriptor));
+                       }
+
+                       // Verify results
+                       Future<Iterable<Integer>> future = 
Futures.sequence(futures, testActorSystem.dispatcher());
+                       Iterable<Integer> results = Await.result(future, 
timeout);
+
+                       int index = 0;
+                       for (int buffer : results) {
+                               assertEquals(1337 + index, buffer);
+                               index++;
+                       }
+
+                       // Verify requests
+                       for (int i = 0; i < numServers; i++) {
+                               int numRetries = 10;
+                               for (int retry = 0; retry < numRetries; 
retry++) {
+                                       try {
+                                               assertEquals("Unexpected number 
of requests", expectedRequests[i], serverStats[i].getNumRequests());
+                                               assertEquals("Unexpected 
success requests", expectedRequests[i], serverStats[i].getNumSuccessful());
+                                               assertEquals("Unexpected failed 
requests", 0, serverStats[i].getNumFailed());
+                                               break;
+                                       } catch (Throwable t) {
+                                               // Retry
+                                               if (retry == numRetries - 1) {
+                                                       throw t;
+                                               } else {
+                                                       Thread.sleep(100);
+                                               }
+                                       }
+                               }
+                       }
+               } finally {
+                       if (client != null) {
+                               client.shutDown();
+                       }
+
+                       if (networkClient != null) {
+                               networkClient.shutDown();
+                       }
+
+                       for (KvStateServer server : servers) {
+                               if (server != null) {
+                                       server.shutDown();
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Tests that the QueryableState client correctly caches location 
lookups
+        * keyed by both job and name. This test is mainly due to a previous 
bug due
+        * to which cache entries were by name only. This is a problem, because 
the
+        * same client can be used to query multiple jobs.
+        */
+       @Test
+       public void testLookupMultipleJobIds() throws Exception {
+               String name = "unique-per-job";
+
+               // Exact contents don't matter here
+               KvStateLocation location = new KvStateLocation(new JobID(), new 
JobVertexID(), 1, name);
+               location.registerKvState(new KeyGroupRange(0, 0), new 
KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892));
+
+               JobID jobId1 = new JobID();
+               JobID jobId2 = new JobID();
+
+               KvStateLocationLookupService lookupService = 
mock(KvStateLocationLookupService.class);
+
+               when(lookupService.getKvStateLookupInfo(any(JobID.class), 
anyString()))
+                               .thenReturn(Futures.successful(location));
+
+               KvStateClient networkClient = mock(KvStateClient.class);
+               when(networkClient.getKvState(any(KvStateServerAddress.class), 
any(KvStateID.class), any(byte[].class)))
+                               .thenReturn(Futures.successful(new byte[0]));
+
+               QueryableStateClient client = new QueryableStateClient(
+                               lookupService,
+                               networkClient,
+                               testActorSystem.dispatcher());
+
+               ValueStateDescriptor<Integer> stateDesc = new 
ValueStateDescriptor<>("test", IntSerializer.INSTANCE);
+
+               // Query ies with same name, but different job IDs should lead 
to a
+               // single lookup per query and job ID.
+               client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
+               client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, 
stateDesc);
+
+               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId1), eq(name));
+               verify(lookupService, 
times(1)).getKvStateLookupInfo(eq(jobId2), eq(name));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
 
b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml
new file mode 100644
index 0000000..e9e7496
--- /dev/null
+++ b/flink-queryable-state/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-parent</artifactId>
+        <version>1.4-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-queryable-state</artifactId>
+    <name>flink-queryable-state</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>flink-queryable-state-java</module>
+       <!-- <module>flink-state-client-scala</module>-->
+    </modules>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 9193859..53503ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
deleted file mode 100644
index a37a3ac..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupService.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Akka-based {@link KvStateLocationLookupService} that retrieves the current
- * JobManager address and uses it for lookups.
- */
-class AkkaKvStateLocationLookupService implements 
KvStateLocationLookupService, LeaderRetrievalListener {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateLocationLookupService.class);
-
-       /** Future returned when no JobManager is available. */
-       private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = 
Futures.failed(new UnknownJobManager());
-
-       /** Leader retrieval service to retrieve the current job manager. */
-       private final LeaderRetrievalService leaderRetrievalService;
-
-       /** The actor system used to resolve the JobManager address. */
-       private final ActorSystem actorSystem;
-
-       /** Timeout for JobManager ask-requests. */
-       private final FiniteDuration askTimeout;
-
-       /** Retry strategy factory on future failures. */
-       private final LookupRetryStrategyFactory retryStrategyFactory;
-
-       /** Current job manager future. */
-       private volatile Future<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
-
-       /**
-        * Creates the Akka-based {@link KvStateLocationLookupService}.
-        *
-        * @param leaderRetrievalService Leader retrieval service to use.
-        * @param actorSystem            Actor system to use.
-        * @param askTimeout             Timeout for JobManager ask-requests.
-        * @param retryStrategyFactory   Retry strategy if no JobManager 
available.
-        */
-       AkkaKvStateLocationLookupService(
-                       LeaderRetrievalService leaderRetrievalService,
-                       ActorSystem actorSystem,
-                       FiniteDuration askTimeout,
-                       LookupRetryStrategyFactory retryStrategyFactory) {
-
-               this.leaderRetrievalService = 
Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
-               this.actorSystem = Preconditions.checkNotNull(actorSystem, 
"Actor system");
-               this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask 
Timeout");
-               this.retryStrategyFactory = 
Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
-       }
-
-       public void start() {
-               try {
-                       leaderRetrievalService.start(this);
-               } catch (Exception e) {
-                       LOG.error("Failed to start leader retrieval service", 
e);
-                       throw new RuntimeException(e);
-               }
-       }
-
-       public void shutDown() {
-               try {
-                       leaderRetrievalService.stop();
-               } catch (Exception e) {
-                       LOG.error("Failed to stop leader retrieval service", e);
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, 
final String registrationName) {
-               return getKvStateLookupInfo(jobId, registrationName, 
retryStrategyFactory.createRetryStrategy());
-       }
-
-       /**
-        * Returns a future holding the {@link KvStateLocation} for the given 
job
-        * and KvState registration name.
-        *
-        * <p>If there is currently no JobManager registered with the service, 
the
-        * request is retried. The retry behaviour is specified by the
-        * {@link LookupRetryStrategy} of the lookup service.
-        *
-        * @param jobId               JobID the KvState instance belongs to
-        * @param registrationName    Name under which the KvState has been 
registered
-        * @param lookupRetryStrategy Retry strategy to use for retries on 
UnknownJobManager failures.
-        * @return Future holding the {@link KvStateLocation}
-        */
-       @SuppressWarnings("unchecked")
-       private Future<KvStateLocation> getKvStateLookupInfo(
-                       final JobID jobId,
-                       final String registrationName,
-                       final LookupRetryStrategy lookupRetryStrategy) {
-
-               return jobManagerFuture
-                               .flatMap(new Mapper<ActorGateway, 
Future<Object>>() {
-                                       @Override
-                                       public Future<Object> 
apply(ActorGateway jobManager) {
-                                               // Lookup the KvStateLocation
-                                               Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, registrationName);
-                                               return jobManager.ask(msg, 
askTimeout);
-                                       }
-                               }, actorSystem.dispatcher())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
-                               .recoverWith(new 
Recover<Future<KvStateLocation>>() {
-                                       @Override
-                                       public Future<KvStateLocation> 
recover(Throwable failure) throws Throwable {
-                                               // If the Future fails with 
UnknownJobManager, retry
-                                               // the request. Otherwise all 
Futures will be failed
-                                               // during the start up phase, 
when the JobManager did
-                                               // not notify this service yet 
or leadership is lost
-                                               // intermittently.
-                                               if (failure instanceof 
UnknownJobManager && lookupRetryStrategy.tryRetry()) {
-                                                       return Patterns.after(
-                                                                       
lookupRetryStrategy.getRetryDelay(),
-                                                                       
actorSystem.scheduler(),
-                                                                       
actorSystem.dispatcher(),
-                                                                       new 
Callable<Future<KvStateLocation>>() {
-                                                                               
@Override
-                                                                               
public Future<KvStateLocation> call() throws Exception {
-                                                                               
        return getKvStateLookupInfo(
-                                                                               
                        jobId,
-                                                                               
                        registrationName,
-                                                                               
                        lookupRetryStrategy);
-                                                                               
}
-                                                                       });
-                                               } else {
-                                                       return 
Futures.failed(failure);
-                                               }
-                                       }
-                               }, actorSystem.dispatcher());
-       }
-
-       @Override
-       public void notifyLeaderAddress(String leaderAddress, final UUID 
leaderSessionID) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Received leader address notification {}:{}", 
leaderAddress, leaderSessionID);
-               }
-
-               if (leaderAddress == null) {
-                       jobManagerFuture = UNKNOWN_JOB_MANAGER;
-               } else {
-                       jobManagerFuture = 
AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
-                                       .map(new Mapper<ActorRef, 
ActorGateway>() {
-                                               @Override
-                                               public ActorGateway 
apply(ActorRef actorRef) {
-                                                       return new 
AkkaActorGateway(actorRef, leaderSessionID);
-                                               }
-                                       }, actorSystem.dispatcher());
-               }
-       }
-
-       @Override
-       public void handleError(Exception exception) {
-               jobManagerFuture = Futures.failed(exception);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Retry strategy for failed lookups.
-        *
-        * <p>Usage:
-        * <pre>
-        * LookupRetryStrategy retryStrategy = 
LookupRetryStrategyFactory.create();
-        *
-        * if (retryStrategy.tryRetry()) {
-        *     // OK to retry
-        *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
-        * }
-        * </pre>
-        */
-       interface LookupRetryStrategy {
-
-               /**
-                * Returns the current retry.
-                *
-                * @return Current retry delay.
-                */
-               FiniteDuration getRetryDelay();
-
-               /**
-                * Tries another retry and returns whether it is allowed or not.
-                *
-                * @return Whether it is allowed to do another restart or not.
-                */
-               boolean tryRetry();
-
-       }
-
-       /**
-        * Factory for retry strategies.
-        */
-       interface LookupRetryStrategyFactory {
-
-               /**
-                * Creates a new retry strategy.
-                *
-                * @return The retry strategy.
-                */
-               LookupRetryStrategy createRetryStrategy();
-
-       }
-
-       /**
-        * Factory for disabled retries.
-        */
-       static class DisabledLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
-
-               private static final DisabledLookupRetryStrategy RETRY_STRATEGY 
= new DisabledLookupRetryStrategy();
-
-               @Override
-               public LookupRetryStrategy createRetryStrategy() {
-                       return RETRY_STRATEGY;
-               }
-
-               private static class DisabledLookupRetryStrategy implements 
LookupRetryStrategy {
-
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               return FiniteDuration.Zero();
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               return false;
-                       }
-               }
-
-       }
-
-       /**
-        * Factory for fixed delay retries.
-        */
-       static class FixedDelayLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
-
-               private final int maxRetries;
-               private final FiniteDuration retryDelay;
-
-               FixedDelayLookupRetryStrategyFactory(int maxRetries, 
FiniteDuration retryDelay) {
-                       this.maxRetries = maxRetries;
-                       this.retryDelay = retryDelay;
-               }
-
-               @Override
-               public LookupRetryStrategy createRetryStrategy() {
-                       return new FixedDelayLookupRetryStrategy(maxRetries, 
retryDelay);
-               }
-
-               private static class FixedDelayLookupRetryStrategy implements 
LookupRetryStrategy {
-
-                       private final Object retryLock = new Object();
-                       private final int maxRetries;
-                       private final FiniteDuration retryDelay;
-                       private int numRetries;
-
-                       public FixedDelayLookupRetryStrategy(int maxRetries, 
FiniteDuration retryDelay) {
-                               Preconditions.checkArgument(maxRetries >= 0, 
"Negative number maximum retries");
-                               this.maxRetries = maxRetries;
-                               this.retryDelay = 
Preconditions.checkNotNull(retryDelay, "Retry delay");
-                       }
-
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               synchronized (retryLock) {
-                                       return retryDelay;
-                               }
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               synchronized (retryLock) {
-                                       if (numRetries < maxRetries) {
-                                               numRetries++;
-                                               return true;
-                                       } else {
-                                               return false;
-                                       }
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 86d1838..8a213bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
  * Location information for all key groups of a {@link InternalKvState} 
instance.
  *
  * <p>This is populated by the {@link KvStateLocationRegistry} and used by the
- * {@link QueryableStateClient} to target queries.
+ * Queryable State Client to target queries.
  */
 public class KvStateLocation implements Serializable {
 
@@ -166,7 +166,7 @@ public class KvStateLocation implements Serializable {
         * @param kvStateAddress Server address of the KvState instance at the 
key group index.
         * @throws IndexOutOfBoundsException If key group range start < 0 or 
key group range end >= Number of key groups
         */
-       void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, 
KvStateServerAddress kvStateAddress) {
+       public void registerKvState(KeyGroupRange keyGroupRange, KvStateID 
kvStateId, KvStateServerAddress kvStateAddress) {
 
                if (keyGroupRange.getStartKeyGroup() < 0 || 
keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
                        throw new IndexOutOfBoundsException("Key group index");
@@ -183,6 +183,10 @@ public class KvStateLocation implements Serializable {
                }
        }
 
+       public static long getSerialVersionUID() {
+               return serialVersionUID;
+       }
+
        /**
         * Registers a KvState instance for the given key group index.
         *
@@ -190,7 +194,7 @@ public class KvStateLocation implements Serializable {
         * @throws IndexOutOfBoundsException If key group range start < 0 or 
key group range end >= Number of key groups
         * @throws IllegalArgumentException  If no location information 
registered for a key group index in the range.
         */
-       void unregisterKvState(KeyGroupRange keyGroupRange) {
+       public void unregisterKvState(KeyGroupRange keyGroupRange) {
                if (keyGroupRange.getStartKeyGroup() < 0 || 
keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
                        throw new IndexOutOfBoundsException("Key group index");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
deleted file mode 100644
index dfd9c14..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationLookupService.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.api.common.JobID;
-
-import scala.concurrent.Future;
-
-/**
- * {@link KvStateLocation} lookup service.
- */
-public interface KvStateLocationLookupService {
-
-       /**
-        * Starts the lookup service.
-        */
-       void start();
-
-       /**
-        * Shuts down the lookup service.
-        */
-       void shutDown();
-
-       /**
-        * Returns a future holding the {@link KvStateLocation} for the given 
job
-        * and KvState registration name.
-        *
-        * @param jobId            JobID the KvState instance belongs to
-        * @param registrationName Name under which the KvState has been 
registered
-        * @return Future holding the {@link KvStateLocation}
-        */
-       Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String 
registrationName);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 26b700c..90fa5cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
new file mode 100644
index 0000000..9b14c49
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+/**
+ * An interface for the Queryable State Server running on each Task Manager in 
the cluster.
+ * This server is responsible for serving requests coming from the Queryable 
State Client and
+ * requesting <b>locally</b> stored state.
+ */
+public interface KvStateServer {
+
+       /**
+        * Returns the address of this server.
+        *
+        * @return Server address
+        */
+       KvStateServerAddress getAddress();
+
+
+       /** Starts the proxy. */
+       void start() throws InterruptedException;
+
+       /**
+        * Shuts down the server and all related thread pools.
+        */
+       void shutDown();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
index 9ec25bc..2599855 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.query;
 
-import org.apache.flink.runtime.query.netty.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
@@ -88,4 +87,9 @@ public class KvStateServerAddress implements Serializable {
                result = 31 * result + port;
                return result;
        }
+
+       @Override
+       public String toString() {
+               return hostAddress.getHostName() + ':' + port;
+       }
 }

Reply via email to