http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
new file mode 100644
index 0000000..acaa067
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/MessageSerializerTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.network.messages.RequestFailure;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MessageSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class MessageSerializerTest {
+
+       private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+       @Parameterized.Parameters
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(false, true);
+       }
+
+       @Parameterized.Parameter
+       public boolean async;
+
+       /**
+        * Tests request serialization.
+        */
+       @Test
+       public void testRequestSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1337L;
+               KvStateID kvStateId = new KvStateID();
+               byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(kvStateId, requestDeser.getKvStateId());
+               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests request serialization with zero-length serialized key and 
namespace.
+        */
+       @Test
+       public void testRequestSerializationWithZeroLengthKeyAndNamespace() 
throws Exception {
+
+               long requestId = Integer.MAX_VALUE + 1337L;
+               KvStateID kvStateId = new KvStateID();
+               byte[] serializedKeyAndNamespace = new byte[0];
+
+               final KvStateInternalRequest request = new 
KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeRequest(alloc, 
requestId, request);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateInternalRequest requestDeser = 
serializer.deserializeRequest(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(kvStateId, requestDeser.getKvStateId());
+               assertArrayEquals(serializedKeyAndNamespace, 
requestDeser.getSerializedKeyAndNamespace());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> key and 
namespace.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() 
throws Exception {
+               new KvStateInternalRequest(new KvStateID(), null);
+       }
+
+       /**
+        * Tests response serialization.
+        */
+       @Test
+       public void testResponseSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 72727278L;
+               byte[] serializedResult = randomByteArray(1024);
+
+               final KvStateResponse response = new 
KvStateResponse(serializedResult);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
requestId, response);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(requestId, MessageSerializer.getRequestId(buf));
+               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
+
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedResult, responseDeser.getContent());
+       }
+
+       /**
+        * Tests response serialization with zero-length serialized result.
+        */
+       @Test
+       public void testResponseSerializationWithZeroLengthSerializedResult() 
throws Exception {
+               byte[] serializedResult = new byte[0];
+
+               final KvStateResponse response = new 
KvStateResponse(serializedResult);
+               final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer =
+                               new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());
+
+               ByteBuf buf = MessageSerializer.serializeResponse(alloc, 
72727278L, response);
+
+               int frameLength = buf.readInt();
+
+               assertEquals(MessageType.REQUEST_RESULT, 
MessageSerializer.deserializeHeader(buf));
+               assertEquals(72727278L, MessageSerializer.getRequestId(buf));
+               KvStateResponse responseDeser = 
serializer.deserializeResponse(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertArrayEquals(serializedResult, responseDeser.getContent());
+       }
+
+       /**
+        * Tests that we don't try to be smart about <code>null</code> results.
+        * They should be treated explicitly.
+        */
+       @Test(expected = NullPointerException.class)
+       public void testNullPointerExceptionOnNullSerializedResult() throws 
Exception {
+               new KvStateResponse((byte[]) null);
+       }
+
+       /**
+        * Tests request failure serialization.
+        */
+       @Test
+       public void testKvStateRequestFailureSerialization() throws Exception {
+               long requestId = Integer.MAX_VALUE + 1111222L;
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeRequestFailure(alloc, 
requestId, cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.REQUEST_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               RequestFailure requestFailure = 
MessageSerializer.deserializeRequestFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(requestId, requestFailure.getRequestId());
+               assertEquals(cause.getClass(), 
requestFailure.getCause().getClass());
+               assertEquals(cause.getMessage(), 
requestFailure.getCause().getMessage());
+       }
+
+       /**
+        * Tests server failure serialization.
+        */
+       @Test
+       public void testServerFailureSerialization() throws Exception {
+               IllegalStateException cause = new 
IllegalStateException("Expected test");
+
+               ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, 
cause);
+
+               int frameLength = buf.readInt();
+               assertEquals(MessageType.SERVER_FAILURE, 
MessageSerializer.deserializeHeader(buf));
+               Throwable request = 
MessageSerializer.deserializeServerFailure(buf);
+               assertEquals(buf.readerIndex(), frameLength + 4);
+
+               assertEquals(cause.getClass(), request.getClass());
+               assertEquals(cause.getMessage(), request.getMessage());
+       }
+
+       private byte[] randomByteArray(int capacity) {
+               byte[] bytes = new byte[capacity];
+               ThreadLocalRandom.current().nextBytes(bytes);
+               return bytes;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..10792cd
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/resources/log4j-test.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
+log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/pom.xml b/flink-queryable-state/pom.xml
index e2579f6..9300fb3 100644
--- a/flink-queryable-state/pom.xml
+++ b/flink-queryable-state/pom.xml
@@ -35,8 +35,9 @@ under the License.
        <packaging>pom</packaging>
 
        <modules>
-               <module>flink-queryable-state-java</module>
-          <!-- <module>flink-state-client-scala</module>-->
+               <module>flink-queryable-state-runtime</module>
+               <module>flink-queryable-state-client-java</module>
+               <!-- <module>flink-state-client-scala</module>-->
        </modules>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 83ac781..134c414 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -50,6 +50,12 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- The Hadoop FS support has only an optional dependency on 
Hadoop and
                        gracefully handles absence of Hadoop classes -->
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f60f561..4535290 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
@@ -74,10 +75,8 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -104,6 +103,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -586,7 +586,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        final KeyGroupRange keyGroupRange,
                        final String registrationName,
                        final KvStateID kvStateId,
-                       final KvStateServerAddress kvStateServerAddress)
+                       final InetSocketAddress kvStateServerAddress)
        {
                if (log.isDebugEnabled()) {
                        log.debug("Key value state registered for job {} under 
name {}.",

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index d59feed..2c7e438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -36,9 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -165,7 +165,7 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway, FencedRp
                        final KeyGroupRange keyGroupRange,
                        final String registrationName,
                        final KvStateID kvStateId,
-                       final KvStateServerAddress kvStateServerAddress);
+                       final InetSocketAddress kvStateServerAddress);
 
        /**
         * Notifies that queryable state has been unregistered.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
deleted file mode 100644
index c122508..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateID.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.AbstractID;
-
-/**
- * Identifier for {@link InternalKvState} instances.
- *
- * <p>Assigned when registering state at the {@link KvStateRegistry}.
- */
-public class KvStateID extends AbstractID {
-
-       private static final long serialVersionUID = 1L;
-
-       public KvStateID() {
-               super();
-       }
-
-       public KvStateID(long lowerPart, long upperPart) {
-               super(lowerPart, upperPart);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
index 03e8238..e4fdda5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 
 /**
@@ -56,7 +58,7 @@ public class KvStateLocation implements Serializable {
         * Server address for each KvState instance where array index 
corresponds to
         * key group index.
         */
-       private final KvStateServerAddress[] kvStateAddresses;
+       private final InetSocketAddress[] kvStateAddresses;
 
        /** Current number of registered key groups. */
        private int numRegisteredKeyGroups;
@@ -76,7 +78,7 @@ public class KvStateLocation implements Serializable {
                this.numKeyGroups = numKeyGroups;
                this.registrationName = 
Preconditions.checkNotNull(registrationName, "Registration name");
                this.kvStateIds = new KvStateID[numKeyGroups];
-               this.kvStateAddresses = new KvStateServerAddress[numKeyGroups];
+               this.kvStateAddresses = new InetSocketAddress[numKeyGroups];
        }
 
        /**
@@ -142,15 +144,15 @@ public class KvStateLocation implements Serializable {
        }
 
        /**
-        * Returns the registered KvStateServerAddress for the key group index 
or
+        * Returns the registered server address for the key group index or
         * <code>null</code> if none is registered yet.
         *
         * @param keyGroupIndex Key group index to get server address for.
-        * @return KvStateServerAddress for the key group index or 
<code>null</code>
+        * @return the server address for the key group index or 
<code>null</code>
         * if none is registered yet
         * @throws IndexOutOfBoundsException If key group index < 0 or >= 
Number of key groups
         */
-       public KvStateServerAddress getKvStateServerAddress(int keyGroupIndex) {
+       public InetSocketAddress getKvStateServerAddress(int keyGroupIndex) {
                if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) {
                        throw new IndexOutOfBoundsException("Key group index");
                }
@@ -166,7 +168,7 @@ public class KvStateLocation implements Serializable {
         * @param kvStateAddress Server address of the KvState instance at the 
key group index.
         * @throws IndexOutOfBoundsException If key group range start < 0 or 
key group range end >= Number of key groups
         */
-       public void registerKvState(KeyGroupRange keyGroupRange, KvStateID 
kvStateId, KvStateServerAddress kvStateAddress) {
+       public void registerKvState(KeyGroupRange keyGroupRange, KvStateID 
kvStateId, InetSocketAddress kvStateAddress) {
 
                if (keyGroupRange.getStartKeyGroup() < 0 || 
keyGroupRange.getEndKeyGroup() >= numKeyGroups) {
                        throw new IndexOutOfBoundsException("Key group index");

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
index cb61905..05ee017 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -89,7 +91,7 @@ public class KvStateLocationRegistry {
                        KeyGroupRange keyGroupRange,
                        String registrationName,
                        KvStateID kvStateId,
-                       KvStateServerAddress kvStateServerAddress) {
+                       InetSocketAddress kvStateServerAddress) {
 
                KvStateLocation location = lookupTable.get(registrationName);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
index 04684ee..e94d2f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.net.InetSocketAddress;
 
 /**
  * Actor messages for {@link InternalKvState} lookup and registration.
@@ -114,7 +116,7 @@ public interface KvStateMessage extends Serializable {
                private final KvStateID kvStateId;
 
                /** Server address where to find the KvState instance. */
-               private final KvStateServerAddress kvStateServerAddress;
+               private final InetSocketAddress kvStateServerAddress;
 
                /**
                 * Notifies the JobManager about a registered {@link 
InternalKvState} instance.
@@ -132,7 +134,7 @@ public interface KvStateMessage extends Serializable {
                                KeyGroupRange keyGroupRange,
                                String registrationName,
                                KvStateID kvStateId,
-                               KvStateServerAddress kvStateServerAddress) {
+                               InetSocketAddress kvStateServerAddress) {
 
                        this.jobId = Preconditions.checkNotNull(jobId, "JobID");
                        this.jobVertexId = 
Preconditions.checkNotNull(jobVertexId, "JobVertexID");
@@ -140,7 +142,7 @@ public interface KvStateMessage extends Serializable {
                        this.keyGroupRange = 
Preconditions.checkNotNull(keyGroupRange);
                        this.registrationName = 
Preconditions.checkNotNull(registrationName, "Registration name");
                        this.kvStateId = Preconditions.checkNotNull(kvStateId, 
"KvStateID");
-                       this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+                       this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress");
                }
 
                /**
@@ -193,7 +195,7 @@ public interface KvStateMessage extends Serializable {
                 *
                 * @return Server address where to find the KvState instance
                 */
-               public KvStateServerAddress getKvStateServerAddress() {
+               public InetSocketAddress getKvStateServerAddress() {
                        return kvStateServerAddress;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 90fa5cc..af19d81 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
index 13862c9..4b9834a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryGateway.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
+import java.net.InetSocketAddress;
+
 /**
  * A gateway to listen for {@code KvState} registrations.
  */
@@ -42,7 +45,7 @@ public interface KvStateRegistryGateway extends RpcGateway {
                KeyGroupRange keyGroupRange,
                String registrationName,
                KvStateID kvStateId,
-               KvStateServerAddress kvStateServerAddress);
+               InetSocketAddress kvStateServerAddress);
 
        /**
         * Notifies the listener about an unregistered KvState instance.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
index 29bee9a..dc90c96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
index 17ffe0d..ae58714 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.query;
 
+import java.net.InetSocketAddress;
+
 /**
  * An interface for the Queryable State Server running on each Task Manager in 
the cluster.
  * This server is responsible for serving requests coming from the {@link 
KvStateClientProxy
@@ -26,10 +28,10 @@ package org.apache.flink.runtime.query;
 public interface KvStateServer {
 
        /**
-        * Returns the {@link KvStateServerAddress address} the server is 
listening to.
+        * Returns the {@link InetSocketAddress address} the server is 
listening to.
         * @return Server address.
         */
-       KvStateServerAddress getServerAddress();
+       InetSocketAddress getServerAddress();
 
 
        /** Starts the server. */

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
deleted file mode 100644
index 2599855..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateServerAddress.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-
-/**
- * The (host, port)-address of a {@link KvStateServer}.
- */
-public class KvStateServerAddress implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       /** KvStateServer host address. */
-       private final InetAddress hostAddress;
-
-       /** KvStateServer port. */
-       private final int port;
-
-       /**
-        * Creates a KvStateServerAddress for the given KvStateServer host 
address
-        * and port.
-        *
-        * @param hostAddress KvStateServer host address
-        * @param port        KvStateServer port
-        */
-       public KvStateServerAddress(InetAddress hostAddress, int port) {
-               this.hostAddress = Preconditions.checkNotNull(hostAddress, 
"Host address");
-               Preconditions.checkArgument(port > 0 && port <= 65535, "Port " 
+ port + " is out of range 1-65535");
-               this.port = port;
-       }
-
-       /**
-        * Returns the host address of the KvStateServer.
-        *
-        * @return KvStateServer host address
-        */
-       public InetAddress getHost() {
-               return hostAddress;
-       }
-
-       /**
-        * Returns the port of the KvStateServer.
-        *
-        * @return KvStateServer port
-        */
-       public int getPort() {
-               return port;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               KvStateServerAddress that = (KvStateServerAddress) o;
-
-               return port == that.port && 
hostAddress.equals(that.hostAddress);
-       }
-
-       @Override
-       public int hashCode() {
-               int result = hostAddress.hashCode();
-               result = 31 * result + port;
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return hostAddress.getHostName() + ':' + port;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
index fa021df..adbe15d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.query;
 
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
@@ -74,7 +74,8 @@ public final class QueryableStateUtils {
                        return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, stats);
                } catch (ClassNotFoundException e) {
                        LOG.warn("Could not load Queryable State Client Proxy. 
" +
-                                       "Probable reason: flink-queryable-state 
is not in the classpath");
+                                       "Probable reason: 
flink-queryable-state-runtime is not in the classpath. " +
+                                       "Please put the corresponding jar from 
the opt to the lib folder.");
                        LOG.debug("Caught exception", e);
                        return null;
                } catch (InvocationTargetException e) {
@@ -128,7 +129,8 @@ public final class QueryableStateUtils {
                        return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
                } catch (ClassNotFoundException e) {
                        LOG.warn("Could not load Queryable State Server. " +
-                                       "Probable reason: flink-queryable-state 
is not in the classpath");
+                                       "Probable reason: 
flink-queryable-state-runtime is not in the classpath. " +
+                                       "Please put the corresponding jar from 
the opt to the lib folder.");
                        LOG.debug("Caught exception", e);
                        return null;
                } catch (InvocationTargetException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
index 8d0eede..f799b5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.internal.InternalKvState;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
deleted file mode 100644
index 1d80bab..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/AtomicKvStateRequestStats.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Atomic {@link KvStateRequestStats} implementation.
- */
-public class AtomicKvStateRequestStats implements KvStateRequestStats {
-
-       /**
-        * Number of active connections.
-        */
-       private final AtomicLong numConnections = new AtomicLong();
-
-       /**
-        * Total number of reported requests.
-        */
-       private final AtomicLong numRequests = new AtomicLong();
-
-       /**
-        * Total number of successful requests (<= reported requests).
-        */
-       private final AtomicLong numSuccessful = new AtomicLong();
-
-       /**
-        * Total duration of all successful requests.
-        */
-       private final AtomicLong successfulDuration = new AtomicLong();
-
-       /**
-        * Total number of failed requests (<= reported requests).
-        */
-       private final AtomicLong numFailed = new AtomicLong();
-
-       @Override
-       public void reportActiveConnection() {
-               numConnections.incrementAndGet();
-       }
-
-       @Override
-       public void reportInactiveConnection() {
-               numConnections.decrementAndGet();
-       }
-
-       @Override
-       public void reportRequest() {
-               numRequests.incrementAndGet();
-       }
-
-       @Override
-       public void reportSuccessfulRequest(long durationTotalMillis) {
-               numSuccessful.incrementAndGet();
-               successfulDuration.addAndGet(durationTotalMillis);
-       }
-
-       @Override
-       public void reportFailedRequest() {
-               numFailed.incrementAndGet();
-       }
-
-       public long getNumConnections() {
-               return numConnections.get();
-       }
-
-       public long getNumRequests() {
-               return numRequests.get();
-       }
-
-       public long getNumSuccessful() {
-               return numSuccessful.get();
-       }
-
-       public long getNumFailed() {
-               return numFailed.get();
-       }
-
-       @Override
-       public String toString() {
-               return "AtomicKvStateRequestStats{" +
-                               "numConnections=" + numConnections +
-                               ", numRequests=" + numRequests +
-                               ", numSuccessful=" + numSuccessful +
-                               ", numFailed=" + numFailed +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
deleted file mode 100644
index de8824d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/DisabledKvStateRequestStats.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-/**
- * Disabled {@link KvStateRequestStats} implementation.
- */
-public class DisabledKvStateRequestStats implements KvStateRequestStats {
-
-       @Override
-       public void reportActiveConnection() {
-       }
-
-       @Override
-       public void reportInactiveConnection() {
-       }
-
-       @Override
-       public void reportRequest() {
-       }
-
-       @Override
-       public void reportSuccessfulRequest(long durationTotalMillis) {
-       }
-
-       @Override
-       public void reportFailedRequest() {
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
deleted file mode 100644
index 19caf92..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-/**
- * Simple statistics for
- * {@link org.apache.flink.runtime.query.KvStateServer} and
- * {@link org.apache.flink.runtime.query.KvStateClientProxy} monitoring.
- */
-public interface KvStateRequestStats {
-
-       /**
-        * Reports an active connection.
-        */
-       void reportActiveConnection();
-
-       /**
-        * Reports an inactive connection.
-        */
-       void reportInactiveConnection();
-
-       /**
-        * Reports an incoming request.
-        */
-       void reportRequest();
-
-       /**
-        * Reports a successfully handled request.
-        *
-        * @param durationTotalMillis Duration of the request (in milliseconds).
-        */
-       void reportSuccessfulRequest(long durationTotalMillis);
-
-       /**
-        * Reports a failure during a request.
-        */
-       void reportFailedRequest();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
deleted file mode 100644
index 44ee571..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateSerializer.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.util.DataInputDeserializer;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Serialization and deserialization the different state types and namespaces.
- */
-public final class KvStateSerializer {
-
-       // 
------------------------------------------------------------------------
-       // Generic serialization utils
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Serializes the key and namespace into a {@link ByteBuffer}.
-        *
-        * <p>The serialized format matches the RocksDB state backend key 
format, i.e.
-        * the key and namespace don't have to be deserialized for RocksDB 
lookups.
-        *
-        * @param key                 Key to serialize
-        * @param keySerializer       Serializer for the key
-        * @param namespace           Namespace to serialize
-        * @param namespaceSerializer Serializer for the namespace
-        * @param <K>                 Key type
-        * @param <N>                 Namespace type
-        * @return Buffer holding the serialized key and namespace
-        * @throws IOException Serialization errors are forwarded
-        */
-       public static <K, N> byte[] serializeKeyAndNamespace(
-                       K key,
-                       TypeSerializer<K> keySerializer,
-                       N namespace,
-                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
-
-               DataOutputSerializer dos = new DataOutputSerializer(32);
-
-               keySerializer.serialize(key, dos);
-               dos.writeByte(42);
-               namespaceSerializer.serialize(namespace, dos);
-
-               return dos.getCopyOfBuffer();
-       }
-
-       /**
-        * Deserializes the key and namespace into a {@link Tuple2}.
-        *
-        * @param serializedKeyAndNamespace Serialized key and namespace
-        * @param keySerializer             Serializer for the key
-        * @param namespaceSerializer       Serializer for the namespace
-        * @param <K>                       Key type
-        * @param <N>                       Namespace
-        * @return Tuple2 holding deserialized key and namespace
-        * @throws IOException              if the deserialization fails for 
any reason
-        */
-       public static <K, N> Tuple2<K, N> deserializeKeyAndNamespace(
-                       byte[] serializedKeyAndNamespace,
-                       TypeSerializer<K> keySerializer,
-                       TypeSerializer<N> namespaceSerializer) throws 
IOException {
-
-               DataInputDeserializer dis = new DataInputDeserializer(
-                               serializedKeyAndNamespace,
-                               0,
-                               serializedKeyAndNamespace.length);
-
-               try {
-                       K key = keySerializer.deserialize(dis);
-                       byte magicNumber = dis.readByte();
-                       if (magicNumber != 42) {
-                               throw new IOException("Unexpected magic number 
" + magicNumber + ".");
-                       }
-                       N namespace = namespaceSerializer.deserialize(dis);
-
-                       if (dis.available() > 0) {
-                               throw new IOException("Unconsumed bytes in the 
serialized key and namespace.");
-                       }
-
-                       return new Tuple2<>(key, namespace);
-               } catch (IOException e) {
-                       throw new IOException("Unable to deserialize key " +
-                               "and namespace. This indicates a mismatch in 
the key/namespace " +
-                               "serializers used by the KvState instance and 
this access.", e);
-               }
-       }
-
-       /**
-        * Serializes the value with the given serializer.
-        *
-        * @param value      Value of type T to serialize
-        * @param serializer Serializer for T
-        * @param <T>        Type of the value
-        * @return Serialized value or <code>null</code> if value 
<code>null</code>
-        * @throws IOException On failure during serialization
-        */
-       public static <T> byte[] serializeValue(T value, TypeSerializer<T> 
serializer) throws IOException {
-               if (value != null) {
-                       // Serialize
-                       DataOutputSerializer dos = new DataOutputSerializer(32);
-                       serializer.serialize(value, dos);
-                       return dos.getCopyOfBuffer();
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Deserializes the value with the given serializer.
-        *
-        * @param serializedValue Serialized value of type T
-        * @param serializer      Serializer for T
-        * @param <T>             Type of the value
-        * @return Deserialized value or <code>null</code> if the serialized 
value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <T> T deserializeValue(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
-               if (serializedValue == null) {
-                       return null;
-               } else {
-                       final DataInputDeserializer deser = new 
DataInputDeserializer(
-                               serializedValue, 0, serializedValue.length);
-                       final T value = serializer.deserialize(deser);
-                       if (deser.available() > 0) {
-                               throw new IOException(
-                                       "Unconsumed bytes in the deserialized 
value. " +
-                                               "This indicates a mismatch in 
the value serializers " +
-                                               "used by the KvState instance 
and this access.");
-                       }
-                       return value;
-               }
-       }
-
-       /**
-        * Deserializes all values with the given serializer.
-        *
-        * @param serializedValue Serialized value of type List&lt;T&gt;
-        * @param serializer      Serializer for T
-        * @param <T>             Type of the value
-        * @return Deserialized list or <code>null</code> if the serialized 
value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <T> List<T> deserializeList(byte[] serializedValue, 
TypeSerializer<T> serializer) throws IOException {
-               if (serializedValue != null) {
-                       final DataInputDeserializer in = new 
DataInputDeserializer(
-                               serializedValue, 0, serializedValue.length);
-
-                       try {
-                               final List<T> result = new ArrayList<>();
-                               while (in.available() > 0) {
-                                       result.add(serializer.deserialize(in));
-
-                                       // The expected binary format has a 
single byte separator. We
-                                       // want a consistent binary format in 
order to not need any
-                                       // special casing during 
deserialization. A "cleaner" format
-                                       // would skip this extra byte, but 
would require a memory copy
-                                       // for RocksDB, which stores the data 
serialized in this way
-                                       // for lists.
-                                       if (in.available() > 0) {
-                                               in.readByte();
-                                       }
-                               }
-
-                               return result;
-                       } catch (IOException e) {
-                               throw new IOException(
-                                               "Unable to deserialize value. " 
+
-                                                       "This indicates a 
mismatch in the value serializers " +
-                                                       "used by the KvState 
instance and this access.", e);
-                       }
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Serializes all values of the Iterable with the given serializer.
-        *
-        * @param entries         Key-value pairs to serialize
-        * @param keySerializer   Serializer for UK
-        * @param valueSerializer Serializer for UV
-        * @param <UK>            Type of the keys
-        * @param <UV>            Type of the values
-        * @return Serialized values or <code>null</code> if values 
<code>null</code> or empty
-        * @throws IOException On failure during serialization
-        */
-       public static <UK, UV> byte[] serializeMap(Iterable<Map.Entry<UK, UV>> 
entries, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) 
throws IOException {
-               if (entries != null) {
-                       // Serialize
-                       DataOutputSerializer dos = new DataOutputSerializer(32);
-
-                       for (Map.Entry<UK, UV> entry : entries) {
-                               keySerializer.serialize(entry.getKey(), dos);
-
-                               if (entry.getValue() == null) {
-                                       dos.writeBoolean(true);
-                               } else {
-                                       dos.writeBoolean(false);
-                                       
valueSerializer.serialize(entry.getValue(), dos);
-                               }
-                       }
-
-                       return dos.getCopyOfBuffer();
-               } else {
-                       return null;
-               }
-       }
-
-       /**
-        * Deserializes all kv pairs with the given serializer.
-        *
-        * @param serializedValue Serialized value of type Map&lt;UK, UV&gt;
-        * @param keySerializer   Serializer for UK
-        * @param valueSerializer Serializer for UV
-        * @param <UK>            Type of the key
-        * @param <UV>            Type of the value.
-        * @return Deserialized map or <code>null</code> if the serialized value
-        * is <code>null</code>
-        * @throws IOException On failure during deserialization
-        */
-       public static <UK, UV> Map<UK, UV> deserializeMap(byte[] 
serializedValue, TypeSerializer<UK> keySerializer, TypeSerializer<UV> 
valueSerializer) throws IOException {
-               if (serializedValue != null) {
-                       DataInputDeserializer in = new 
DataInputDeserializer(serializedValue, 0, serializedValue.length);
-
-                       Map<UK, UV> result = new HashMap<>();
-                       while (in.available() > 0) {
-                               UK key = keySerializer.deserialize(in);
-
-                               boolean isNull = in.readBoolean();
-                               UV value = isNull ? null : 
valueSerializer.deserialize(in);
-
-                               result.put(key, value);
-                       }
-
-                       return result;
-               } else {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 97b6bcd..66360e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;
 
@@ -41,7 +41,7 @@ import org.apache.flink.util.Preconditions;
 public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends 
StateDescriptor<S, ?>>
                implements InternalKvState<N> {
 
-       /** Map containing the actual key/value pairs */
+       /** Map containing the actual key/value pairs. */
        protected final StateTable<K, N, SV> stateTable;
 
        /** This holds the name of the state and can create an initial default 
value for the state. */
@@ -118,4 +118,4 @@ public abstract class AbstractHeapState<K, N, SV, S extends 
State, SD extends St
        public StateTable<K, N, SV> getStateTable() {
                return stateTable;
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index f981b9f..206f10a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.state.heap;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 1cc94d2a..2baf644 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -42,7 +43,6 @@ import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
index 3692a71..6312d08 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcKvStateRegistryListener.java
@@ -19,22 +19,23 @@
 package org.apache.flink.runtime.taskexecutor.rpc;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateRegistryGateway;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.Preconditions;
 
+import java.net.InetSocketAddress;
+
 public class RpcKvStateRegistryListener implements KvStateRegistryListener {
 
        private final KvStateRegistryGateway kvStateRegistryGateway;
-       private final KvStateServerAddress kvStateServerAddress;
+       private final InetSocketAddress kvStateServerAddress;
 
        public RpcKvStateRegistryListener(
                        KvStateRegistryGateway kvStateRegistryGateway,
-                       KvStateServerAddress kvStateServerAddress) {
+                       InetSocketAddress kvStateServerAddress) {
                this.kvStateRegistryGateway = 
Preconditions.checkNotNull(kvStateRegistryGateway);
                this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
index 4404867..63bda99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -19,15 +19,16 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.util.Preconditions;
 
+import java.net.InetSocketAddress;
+
 /**
  * This implementation uses {@link ActorGateway} to forward key-value state 
notifications to the job
  * manager. The notifications are wrapped in an actor message and send to the 
given actor gateway.
@@ -36,14 +37,14 @@ public class ActorGatewayKvStateRegistryListener implements 
KvStateRegistryListe
 
        private ActorGateway jobManager;
 
-       private KvStateServerAddress kvStateServerAddress;
+       private InetSocketAddress kvStateServerAddress;
 
        public ActorGatewayKvStateRegistryListener(
                ActorGateway jobManager,
-               KvStateServerAddress kvStateServerAddress) {
+               InetSocketAddress kvStateServerAddress) {
 
                this.jobManager = Preconditions.checkNotNull(jobManager, 
"JobManager");
-               this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+               this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "ServerAddress");
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 68da362..889191f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
@@ -74,12 +75,10 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -119,6 +118,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -717,7 +717,7 @@ public class JobManagerTest extends TestLogger {
                                new KeyGroupRange(0, 0),
                                "any-name",
                                new KvStateID(),
-                               new 
KvStateServerAddress(InetAddress.getLocalHost(), 1233));
+                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1233));
 
                jobManager.tell(registerNonExistingJob);
 
@@ -742,7 +742,7 @@ public class JobManagerTest extends TestLogger {
                                new KeyGroupRange(0, 0),
                                "register-me",
                                new KvStateID(),
-                               new 
KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
 
                jobManager.tell(registerForExistingJob);
 
@@ -797,7 +797,7 @@ public class JobManagerTest extends TestLogger {
                                new KeyGroupRange(0, 0),
                                "duplicate-me",
                                new KvStateID(),
-                               new 
KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
 
                NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
                                jobGraph.getJobID(),
@@ -805,7 +805,7 @@ public class JobManagerTest extends TestLogger {
                                new KeyGroupRange(0, 0),
                                "duplicate-me", // ...same name
                                new KvStateID(),
-                               new 
KvStateServerAddress(InetAddress.getLocalHost(), 1293));
+                               new 
InetSocketAddress(InetAddress.getLocalHost(), 1293));
 
                Future<TestingJobManagerMessages.JobStatusIs> failedFuture = 
jobManager
                                .ask(new 
NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft())

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
index 7bf9ee7..74e16a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -63,7 +65,7 @@ public class KvStateLocationRegistryTest {
                        }
                }
 
-               KvStateServerAddress server = new 
KvStateServerAddress(InetAddress.getLocalHost(), 12032);
+               InetSocketAddress server = new 
InetSocketAddress(InetAddress.getLocalHost(), 12032);
 
                // Create registry
                Map<JobVertexID, ExecutionJobVertex> vertexMap = 
createVertexMap(vertices);
@@ -129,7 +131,7 @@ public class KvStateLocationRegistryTest {
                                new KeyGroupRange(0, 0),
                                registrationName,
                                new KvStateID(),
-                               new 
KvStateServerAddress(InetAddress.getLocalHost(), 12328));
+                               new 
InetSocketAddress(InetAddress.getLocalHost(), 12328));
 
                try {
                        // Second operator registers same name
@@ -138,7 +140,7 @@ public class KvStateLocationRegistryTest {
                                        new KeyGroupRange(0, 0),
                                        registrationName,
                                        new KvStateID(),
-                                       new 
KvStateServerAddress(InetAddress.getLocalHost(), 12032));
+                                       new 
InetSocketAddress(InetAddress.getLocalHost(), 12032));
 
                        fail("Did not throw expected Exception after duplicated 
name");
                } catch (IllegalStateException ignored) {
@@ -187,7 +189,7 @@ public class KvStateLocationRegistryTest {
                                new KeyGroupRange(0, 0),
                                name,
                                new KvStateID(),
-                               mock(KvStateServerAddress.class));
+                               mock(InetSocketAddress.class));
 
                try {
                        // Unregister not registered keyGroupIndex

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
index 116deea..3c79948 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.query;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
 import org.junit.Test;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -65,7 +67,7 @@ public class KvStateLocationTest {
                KvStateLocation location = new KvStateLocation(jobId, 
jobVertexId, numKeyGroups, registrationName);
 
                KvStateID[] kvStateIds = new KvStateID[numRanges];
-               KvStateServerAddress[] serverAddresses = new 
KvStateServerAddress[numRanges];
+               InetSocketAddress[] serverAddresses = new 
InetSocketAddress[numRanges];
 
                InetAddress host = InetAddress.getLocalHost();
 
@@ -73,7 +75,7 @@ public class KvStateLocationTest {
                int registeredCount = 0;
                for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
                        kvStateIds[rangeIdx] = new KvStateID();
-                       serverAddresses[rangeIdx] = new 
KvStateServerAddress(host, 1024 + rangeIdx);
+                       serverAddresses[rangeIdx] = new InetSocketAddress(host, 
1024 + rangeIdx);
                        KeyGroupRange keyGroupRange = 
keyGroupRanges.get(rangeIdx);
                        location.registerKvState(keyGroupRange, 
kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
                        registeredCount += keyGroupRange.getNumberOfKeyGroups();
@@ -92,7 +94,7 @@ public class KvStateLocationTest {
                // Overwrite
                for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) {
                        kvStateIds[rangeIdx] = new KvStateID();
-                       serverAddresses[rangeIdx] = new 
KvStateServerAddress(host, 1024 + rangeIdx);
+                       serverAddresses[rangeIdx] = new InetSocketAddress(host, 
1024 + rangeIdx);
 
                        location.registerKvState(keyGroupRanges.get(rangeIdx), 
kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
                        assertEquals(registeredCount, 
location.getNumRegisteredKeyGroups());

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
deleted file mode 100644
index aa4e6d8..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty.message;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for {@link KvStateSerializer}.
- */
-@RunWith(Parameterized.class)
-public class KvStateRequestSerializerTest {
-
-       @Parameterized.Parameters
-       public static Collection<Boolean> parameters() {
-               return Arrays.asList(false, true);
-       }
-
-       @Parameterized.Parameter
-       public boolean async;
-
-       /**
-        * Tests key and namespace serialization utils.
-        */
-       @Test
-       public void testKeyAndNamespaceSerialization() throws Exception {
-               TypeSerializer<Long> keySerializer = LongSerializer.INSTANCE;
-               TypeSerializer<String> namespaceSerializer = 
StringSerializer.INSTANCE;
-
-               long expectedKey = Integer.MAX_VALUE + 12323L;
-               String expectedNamespace = "knilf";
-
-               byte[] serializedKeyAndNamespace = 
KvStateSerializer.serializeKeyAndNamespace(
-                               expectedKey, keySerializer, expectedNamespace, 
namespaceSerializer);
-
-               Tuple2<Long, String> actual = 
KvStateSerializer.deserializeKeyAndNamespace(
-                               serializedKeyAndNamespace, keySerializer, 
namespaceSerializer);
-
-               assertEquals(expectedKey, actual.f0.longValue());
-               assertEquals(expectedNamespace, actual.f1);
-       }
-
-       /**
-        * Tests key and namespace deserialization utils with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testKeyAndNamespaceDeserializationEmpty() throws Exception {
-               KvStateSerializer.deserializeKeyAndNamespace(
-                       new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests key and namespace deserialization utils with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testKeyAndNamespaceDeserializationTooShort() throws 
Exception {
-               KvStateSerializer.deserializeKeyAndNamespace(
-                       new byte[] {1}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests key and namespace deserialization utils with too many bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testKeyAndNamespaceDeserializationTooMany1() throws 
Exception {
-               // Long + null String + 1 byte
-               KvStateSerializer.deserializeKeyAndNamespace(
-                       new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, 
LongSerializer.INSTANCE,
-                       StringSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests key and namespace deserialization utils with too many bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testKeyAndNamespaceDeserializationTooMany2() throws 
Exception {
-               // Long + null String + 2 bytes
-               KvStateSerializer.deserializeKeyAndNamespace(
-                       new byte[] {1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, 
LongSerializer.INSTANCE,
-                       StringSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests value serialization utils.
-        */
-       @Test
-       public void testValueSerialization() throws Exception {
-               TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
-               long expectedValue = Long.MAX_VALUE - 1292929292L;
-
-               byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
-               long actualValue = 
KvStateSerializer.deserializeValue(serializedValue, valueSerializer);
-
-               assertEquals(expectedValue, actualValue);
-       }
-
-       /**
-        * Tests value deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeValueEmpty() throws Exception {
-               KvStateSerializer.deserializeValue(new byte[] {}, 
LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests value deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeValueTooShort() throws Exception {
-               // 1 byte (incomplete Long)
-               KvStateSerializer.deserializeValue(new byte[] {1}, 
LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests value deserialization with too many bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeValueTooMany1() throws Exception {
-               // Long + 1 byte
-               KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 
1, 1, 1, 2},
-                       LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests value deserialization with too many bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeValueTooMany2() throws Exception {
-               // Long + 2 bytes
-               KvStateSerializer.deserializeValue(new byte[] {1, 1, 1, 1, 1, 
1, 1, 1, 2, 2},
-                       LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests list serialization utils.
-        */
-       @Test
-       public void testListSerialization() throws Exception {
-               final long key = 0L;
-
-               // objects for heap state list serialisation
-               final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                       new HeapKeyedStateBackend<>(
-                               mock(TaskKvStateRegistry.class),
-                               LongSerializer.INSTANCE,
-                               ClassLoader.getSystemClassLoader(),
-                               1,
-                               new KeyGroupRange(0, 0),
-                               async,
-                               new ExecutionConfig()
-                       );
-               longHeapKeyedStateBackend.setCurrentKey(key);
-
-               final InternalListState<VoidNamespace, Long> listState = 
longHeapKeyedStateBackend.createListState(
-                               VoidNamespaceSerializer.INSTANCE,
-                               new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
-
-               testListSerialization(key, listState);
-       }
-
-       /**
-        * Verifies that the serialization of a list using the given list state
-        * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
-        *
-        * @param key
-        *              key of the list state
-        * @param listState
-        *              list state using the {@link VoidNamespace}, must also 
be a {@link InternalKvState} instance
-        *
-        * @throws Exception
-        */
-       public static void testListSerialization(
-                       final long key,
-                       final InternalListState<VoidNamespace, Long> listState) 
throws Exception {
-
-               TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
-               listState.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-               // List
-               final int numElements = 10;
-
-               final List<Long> expectedValues = new ArrayList<>();
-               for (int i = 0; i < numElements; i++) {
-                       final long value = 
ThreadLocalRandom.current().nextLong();
-                       expectedValues.add(value);
-                       listState.add(value);
-               }
-
-               final byte[] serializedKey =
-                       KvStateSerializer.serializeKeyAndNamespace(
-                               key, LongSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
-
-               final byte[] serializedValues = 
listState.getSerializedValue(serializedKey);
-
-               List<Long> actualValues = 
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
-               assertEquals(expectedValues, actualValues);
-
-               // Single value
-               long expectedValue = ThreadLocalRandom.current().nextLong();
-               byte[] serializedValue = 
KvStateSerializer.serializeValue(expectedValue, valueSerializer);
-               List<Long> actualValue = 
KvStateSerializer.deserializeList(serializedValue, valueSerializer);
-               assertEquals(1, actualValue.size());
-               assertEquals(expectedValue, actualValue.get(0).longValue());
-       }
-
-       /**
-        * Tests list deserialization with too few bytes.
-        */
-       @Test
-       public void testDeserializeListEmpty() throws Exception {
-               List<Long> actualValue = KvStateSerializer
-                       .deserializeList(new byte[] {}, 
LongSerializer.INSTANCE);
-               assertEquals(0, actualValue.size());
-       }
-
-       /**
-        * Tests list deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeListTooShort1() throws Exception {
-               // 1 byte (incomplete Long)
-               KvStateSerializer.deserializeList(new byte[] {1}, 
LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests list deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeListTooShort2() throws Exception {
-               // Long + 1 byte (separator) + 1 byte (incomplete Long)
-               KvStateSerializer.deserializeList(new byte[] {1, 1, 1, 1, 1, 1, 
1, 1, 2, 3},
-                       LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests map serialization utils.
-        */
-       @Test
-       public void testMapSerialization() throws Exception {
-               final long key = 0L;
-
-               // objects for heap state list serialisation
-               final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
-                       new HeapKeyedStateBackend<>(
-                                       mock(TaskKvStateRegistry.class),
-                                       LongSerializer.INSTANCE,
-                                       ClassLoader.getSystemClassLoader(),
-                                       1,
-                                       new KeyGroupRange(0, 0),
-                                       async,
-                                       new ExecutionConfig()
-                       );
-               longHeapKeyedStateBackend.setCurrentKey(key);
-
-               final InternalMapState<VoidNamespace, Long, String> mapState = 
(InternalMapState<VoidNamespace, Long, String>) 
longHeapKeyedStateBackend.getPartitionedState(
-                               VoidNamespace.INSTANCE,
-                               VoidNamespaceSerializer.INSTANCE,
-                               new MapStateDescriptor<>("test", 
LongSerializer.INSTANCE, StringSerializer.INSTANCE));
-
-               testMapSerialization(key, mapState);
-       }
-
-       /**
-        * Verifies that the serialization of a map using the given map state
-        * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
-        *
-        * @param key
-        *              key of the map state
-        * @param mapState
-        *              map state using the {@link VoidNamespace}, must also be 
a {@link InternalKvState} instance
-        *
-        * @throws Exception
-        */
-       public static void testMapSerialization(
-                       final long key,
-                       final InternalMapState<VoidNamespace, Long, String> 
mapState) throws Exception {
-
-               TypeSerializer<Long> userKeySerializer = 
LongSerializer.INSTANCE;
-               TypeSerializer<String> userValueSerializer = 
StringSerializer.INSTANCE;
-               mapState.setCurrentNamespace(VoidNamespace.INSTANCE);
-
-               // Map
-               final int numElements = 10;
-
-               final Map<Long, String> expectedValues = new HashMap<>();
-               for (int i = 1; i <= numElements; i++) {
-                       final long value = 
ThreadLocalRandom.current().nextLong();
-                       expectedValues.put(value, Long.toString(value));
-                       mapState.put(value, Long.toString(value));
-               }
-
-               expectedValues.put(0L, null);
-               mapState.put(0L, null);
-
-               final byte[] serializedKey =
-                       KvStateSerializer.serializeKeyAndNamespace(
-                               key, LongSerializer.INSTANCE,
-                               VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
-
-               final byte[] serializedValues = 
mapState.getSerializedValue(serializedKey);
-
-               Map<Long, String> actualValues = 
KvStateSerializer.deserializeMap(serializedValues, userKeySerializer, 
userValueSerializer);
-               assertEquals(expectedValues.size(), actualValues.size());
-               for (Map.Entry<Long, String> actualEntry : 
actualValues.entrySet()) {
-                       assertEquals(expectedValues.get(actualEntry.getKey()), 
actualEntry.getValue());
-               }
-
-               // Single value
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               long expectedKey = ThreadLocalRandom.current().nextLong();
-               String expectedValue = Long.toString(expectedKey);
-               byte[] isNull = {0};
-
-               baos.write(KvStateSerializer.serializeValue(expectedKey, 
userKeySerializer));
-               baos.write(isNull);
-               baos.write(KvStateSerializer.serializeValue(expectedValue, 
userValueSerializer));
-               byte[] serializedValue = baos.toByteArray();
-
-               Map<Long, String> actualValue = 
KvStateSerializer.deserializeMap(serializedValue, userKeySerializer, 
userValueSerializer);
-               assertEquals(1, actualValue.size());
-               assertEquals(expectedValue, actualValue.get(expectedKey));
-       }
-
-       /**
-        * Tests map deserialization with too few bytes.
-        */
-       @Test
-       public void testDeserializeMapEmpty() throws Exception {
-               Map<Long, String> actualValue = KvStateSerializer
-                       .deserializeMap(new byte[] {}, LongSerializer.INSTANCE, 
StringSerializer.INSTANCE);
-               assertEquals(0, actualValue.size());
-       }
-
-       /**
-        * Tests map deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeMapTooShort1() throws Exception {
-               // 1 byte (incomplete Key)
-               KvStateSerializer.deserializeMap(new byte[] {1}, 
LongSerializer.INSTANCE, StringSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests map deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeMapTooShort2() throws Exception {
-               // Long (Key) + 1 byte (incomplete Value)
-               KvStateSerializer.deserializeMap(new byte[]{1, 1, 1, 1, 1, 1, 
1, 1, 0},
-                               LongSerializer.INSTANCE, 
LongSerializer.INSTANCE);
-       }
-
-       /**
-        * Tests map deserialization with too few bytes.
-        */
-       @Test(expected = IOException.class)
-       public void testDeserializeMapTooShort3() throws Exception {
-               // Long (Key1) + Boolean (false) + Long (Value1) + 1 byte 
(incomplete Key2)
-               KvStateSerializer.deserializeMap(new byte[] {1, 1, 1, 1, 1, 1, 
1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3},
-                       LongSerializer.INSTANCE, LongSerializer.INSTANCE);
-       }
-
-       private byte[] randomByteArray(int capacity) {
-               byte[] bytes = new byte[capacity];
-               ThreadLocalRandom.current().nextBytes(bytes);
-               return bytes;
-       }
-}

Reply via email to