[FLINK-7770][QS] Hide the queryable state behind a proxy.

Previously the QueryableStateClient could connect to the JM
and the TMs directly to fetch the required state. Now, there
is a proxy running on each TM and the remote client connects
to one of these proxies in order to get its state. The proxy
receives the request from the client, performs all necessary
message exchanges within the Flink cluster, receives the state
and forwards it back to the client.

This architecture allows for more security features to be
integrated in the future, as the proxy is running in the
cluster, it exposes less information about the cluster to
the outside world, and is more suitable for containerized
environments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f48f5340
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f48f5340
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f48f5340

Branch: refs/heads/master
Commit: f48f5340a871ac58a649766e434218b67e9322ac
Parents: 29a6e99
Author: kkloudas <[email protected]>
Authored: Thu Oct 5 15:16:23 2017 +0200
Committer: kkloudas <[email protected]>
Committed: Wed Oct 11 15:33:32 2017 +0200

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    |   2 +-
 .../java/org/apache/flink/util/AbstractID.java  |   4 +-
 .../flink/queryablestate/UnknownJobManager.java |  35 -
 .../UnknownJobManagerException.java             |  36 +
 .../queryablestate/UnknownKeyOrNamespace.java   |  31 -
 .../UnknownKeyOrNamespaceException.java         |  39 +
 .../flink/queryablestate/UnknownKvStateID.java  |  35 -
 .../UnknownKvStateIdException.java              |  42 +
 .../UnknownKvStateKeyGroupLocation.java         |  31 -
 ...UnknownKvStateKeyGroupLocationException.java |  41 +
 .../AkkaKvStateLocationLookupService.java       | 325 --------
 .../queryablestate/client/KvStateClient.java    | 583 --------------
 .../client/KvStateClientHandler.java            | 107 ---
 .../client/KvStateClientHandlerCallback.java    |  54 --
 .../client/KvStateLocationLookupService.java    |  51 --
 .../client/QueryableStateClient.java            | 479 +++--------
 .../client/proxy/KvStateClientProxyHandler.java | 225 ++++++
 .../client/proxy/KvStateClientProxyImpl.java    | 127 +++
 .../messages/KvStateInternalRequest.java        |  93 +++
 .../queryablestate/messages/KvStateRequest.java | 142 ++--
 .../messages/KvStateRequestFailure.java         |  68 --
 .../messages/KvStateRequestResult.java          |  74 --
 .../messages/KvStateResponse.java               |  75 ++
 .../network/AbstractServerBase.java             | 241 ++++++
 .../network/AbstractServerHandler.java          | 306 ++++++++
 .../network/BadRequestException.java            |  35 +
 .../queryablestate/network/ChunkedByteBuf.java  | 100 +++
 .../flink/queryablestate/network/Client.java    | 537 +++++++++++++
 .../queryablestate/network/ClientHandler.java   | 122 +++
 .../network/ClientHandlerCallback.java          |  56 ++
 .../network/messages/MessageBody.java           |  38 +
 .../network/messages/MessageDeserializer.java   |  39 +
 .../network/messages/MessageSerializer.java     | 228 +++---
 .../network/messages/MessageType.java           |   7 +-
 .../network/messages/RequestFailure.java        |  71 ++
 .../queryablestate/server/ChunkedByteBuf.java   |  98 ---
 .../server/KvStateServerHandler.java            | 279 +------
 .../server/KvStateServerImpl.java               | 222 ++----
 .../itcases/AbstractQueryableStateITCase.java   | 534 +++++++------
 .../itcases/HAAbstractQueryableStateITCase.java |   8 +-
 .../NonHAAbstractQueryableStateITCase.java      |  10 +-
 .../AkkaKvStateLocationLookupServiceTest.java   | 399 ----------
 .../queryablestate/network/ClientTest.java      | 784 +++++++++++++++++++
 .../network/KvStateClientHandlerTest.java       |  24 +-
 .../network/KvStateClientTest.java              | 752 ------------------
 .../network/KvStateRequestSerializerTest.java   | 214 -----
 .../network/KvStateServerHandlerTest.java       | 286 ++++---
 .../network/KvStateServerTest.java              |  30 +-
 .../network/MessageSerializerTest.java          | 220 ++++++
 .../network/QueryableStateClientTest.java       | 458 -----------
 .../flink/runtime/concurrent/FutureUtils.java   |  16 +
 .../runtime/io/network/NetworkEnvironment.java  |  44 +-
 .../flink/runtime/query/KvStateClientProxy.java |  65 ++
 .../flink/runtime/query/KvStateLocation.java    |   8 +-
 .../flink/runtime/query/KvStateServer.java      |  19 +-
 .../runtime/query/QueryableStateUtils.java      |  54 +-
 .../query/netty/KvStateRequestStats.java        |   6 +-
 .../QueryableStateConfiguration.java            |   1 +
 .../taskexecutor/TaskManagerServices.java       |  16 +-
 .../TaskManagerServicesConfiguration.java       |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  28 +-
 .../io/network/NetworkEnvironmentTest.java      |   1 +
 ...askManagerComponentsStartupShutdownTest.java |   1 +
 63 files changed, 4317 insertions(+), 4743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 8b17cfb..df850e9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -40,7 +40,7 @@ public class QueryableStateOptions {
        /** Port to bind KvState server to (0 => pick random available port). */
        public static final ConfigOption<Integer> SERVER_PORT =
                        key("query.server.port")
-                       .defaultValue(0);
+                       .defaultValue(9069);
 
        /** Number of network (event loop) threads for the KvState server (0 => 
#slots). */
        public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
index 397bb71..12d634d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
@@ -150,7 +150,7 @@ public class AbstractID implements Comparable<AbstractID>, 
java.io.Serializable
                                ((int)  this.upperPart) ^
                                ((int) (this.upperPart >>> 32));
        }
-       
+
        @Override
        public String toString() {
                if (this.toString == null) {
@@ -163,7 +163,7 @@ public class AbstractID implements Comparable<AbstractID>, 
java.io.Serializable
 
                return this.toString;
        }
-       
+
        @Override
        public int compareTo(AbstractID o) {
                int diff1 = (this.upperPart < o.upperPart) ? -1 : 
((this.upperPart == o.upperPart) ? 0 : 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
deleted file mode 100644
index 93f2ba5..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
-
-/**
- * Exception to fail Future with if no JobManager is currently registered at
- * the {@link KvStateLocationLookupService}.
- */
-public class UnknownJobManager extends Exception {
-
-       private static final long serialVersionUID = 1L;
-
-       public UnknownJobManager() {
-               super("Unknown JobManager. Either the JobManager has not 
registered yet " +
-                               "or has lost leadership.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
new file mode 100644
index 0000000..fa2604b
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Exception to fail Future if the Task Manager on which the
+ * {@link org.apache.flink.runtime.query.KvStateClientProxy}
+ * is running on, does not know the active Job Manager.
+ */
+@Internal
+public class UnknownJobManagerException extends Exception {
+
+       private static final long serialVersionUID = 9092442511708951209L;
+
+       public UnknownJobManagerException() {
+               super("Unknown JobManager. Either the JobManager has not 
registered yet or has lost leadership.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
deleted file mode 100644
index e921e40..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or 
namespace.
- */
-public class UnknownKeyOrNamespace extends IllegalStateException {
-
-       private static final long serialVersionUID = 1L;
-
-       public UnknownKeyOrNamespace() {
-               super("KvState does not hold any state for key/namespace.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
new file mode 100644
index 0000000..c497a72
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+
+/**
+ * Thrown if the KvState does not hold any state for the given key or 
namespace.
+ */
+@Internal
+public class UnknownKeyOrNamespaceException extends BadRequestException {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates the exception.
+        * @param serverName the name of the server that threw the exception.
+        */
+       public UnknownKeyOrNamespaceException(String serverName) {
+               super(serverName, "No state for the specified key/namespace.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
deleted file mode 100644
index d5ff828..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-public class UnknownKvStateID extends IllegalStateException {
-
-       private static final long serialVersionUID = 1L;
-
-       public UnknownKvStateID(KvStateID kvStateId) {
-               super("No KvState registered with ID " + 
Preconditions.checkNotNull(kvStateId, "KvStateID") +
-                               " at TaskManager.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
new file mode 100644
index 0000000..59ba081
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Thrown if no KvState with the given ID cannot found by the server handler.
+ */
+@Internal
+public class UnknownKvStateIdException extends BadRequestException {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates the exception.
+        * @param serverName the name of the server that threw the exception.
+        * @param kvStateId the state id for which no state was found.
+        */
+       public UnknownKvStateIdException(String serverName, KvStateID 
kvStateId) {
+               super(serverName, "No registered state with ID " + 
Preconditions.checkNotNull(kvStateId) + '.');
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
deleted file mode 100644
index fd25fae..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.runtime.query.KvStateLocation;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-public class UnknownKvStateKeyGroupLocation extends Exception {
-
-       private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
new file mode 100644
index 0000000..0d6588a
--- /dev/null
+++ 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+import org.apache.flink.runtime.query.KvStateLocation;
+
+/**
+ * Exception thrown if there is no location information available for the given
+ * key group in a {@link KvStateLocation} instance.
+ */
+@Internal
+public class UnknownKvStateKeyGroupLocationException extends 
BadRequestException {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Creates the exception.
+        * @param serverName the name of the server that threw the exception.
+        */
+       public UnknownKvStateKeyGroupLocationException(String serverName) {
+               super(serverName, "Unknown key-group location.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
deleted file mode 100644
index f42e008..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.queryablestate.UnknownJobManager;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Akka-based {@link KvStateLocationLookupService} that retrieves the current
- * JobManager address and uses it for lookups.
- */
-public class AkkaKvStateLocationLookupService implements 
KvStateLocationLookupService, LeaderRetrievalListener {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateLocationLookupService.class);
-
-       /** Future returned when no JobManager is available. */
-       private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = 
Futures.failed(new UnknownJobManager());
-
-       /** Leader retrieval service to retrieve the current job manager. */
-       private final LeaderRetrievalService leaderRetrievalService;
-
-       /** The actor system used to resolve the JobManager address. */
-       private final ActorSystem actorSystem;
-
-       /** Timeout for JobManager ask-requests. */
-       private final FiniteDuration askTimeout;
-
-       /** Retry strategy factory on future failures. */
-       private final LookupRetryStrategyFactory retryStrategyFactory;
-
-       /** Current job manager future. */
-       private volatile Future<ActorGateway> jobManagerFuture = 
UNKNOWN_JOB_MANAGER;
-
-       /**
-        * Creates the Akka-based {@link KvStateLocationLookupService}.
-        *
-        * @param leaderRetrievalService Leader retrieval service to use.
-        * @param actorSystem            Actor system to use.
-        * @param askTimeout             Timeout for JobManager ask-requests.
-        * @param retryStrategyFactory   Retry strategy if no JobManager 
available.
-        */
-       public AkkaKvStateLocationLookupService(
-                       LeaderRetrievalService leaderRetrievalService,
-                       ActorSystem actorSystem,
-                       FiniteDuration askTimeout,
-                       LookupRetryStrategyFactory retryStrategyFactory) {
-
-               this.leaderRetrievalService = 
Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
-               this.actorSystem = Preconditions.checkNotNull(actorSystem, 
"Actor system");
-               this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask 
Timeout");
-               this.retryStrategyFactory = 
Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
-       }
-
-       public void start() {
-               try {
-                       leaderRetrievalService.start(this);
-               } catch (Exception e) {
-                       LOG.error("Failed to start leader retrieval service", 
e);
-                       throw new RuntimeException(e);
-               }
-       }
-
-       public void shutDown() {
-               try {
-                       leaderRetrievalService.stop();
-               } catch (Exception e) {
-                       LOG.error("Failed to stop leader retrieval service", e);
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, 
final String registrationName) {
-               return getKvStateLookupInfo(jobId, registrationName, 
retryStrategyFactory.createRetryStrategy());
-       }
-
-       /**
-        * Returns a future holding the {@link KvStateLocation} for the given 
job
-        * and KvState registration name.
-        *
-        * <p>If there is currently no JobManager registered with the service, 
the
-        * request is retried. The retry behaviour is specified by the
-        * {@link LookupRetryStrategy} of the lookup service.
-        *
-        * @param jobId               JobID the KvState instance belongs to
-        * @param registrationName    Name under which the KvState has been 
registered
-        * @param lookupRetryStrategy Retry strategy to use for retries on 
UnknownJobManager failures.
-        * @return Future holding the {@link KvStateLocation}
-        */
-       @SuppressWarnings("unchecked")
-       private Future<KvStateLocation> getKvStateLookupInfo(
-                       final JobID jobId,
-                       final String registrationName,
-                       final LookupRetryStrategy lookupRetryStrategy) {
-
-               return jobManagerFuture
-                               .flatMap(new Mapper<ActorGateway, 
Future<Object>>() {
-                                       @Override
-                                       public Future<Object> 
apply(ActorGateway jobManager) {
-                                               // Lookup the KvStateLocation
-                                               Object msg = new 
KvStateMessage.LookupKvStateLocation(jobId, registrationName);
-                                               return jobManager.ask(msg, 
askTimeout);
-                                       }
-                               }, actorSystem.dispatcher())
-                               
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
-                               .recoverWith(new 
Recover<Future<KvStateLocation>>() {
-                                       @Override
-                                       public Future<KvStateLocation> 
recover(Throwable failure) throws Throwable {
-                                               // If the Future fails with 
UnknownJobManager, retry
-                                               // the request. Otherwise all 
Futures will be failed
-                                               // during the start up phase, 
when the JobManager did
-                                               // not notify this service yet 
or leadership is lost
-                                               // intermittently.
-                                               if (failure instanceof 
UnknownJobManager && lookupRetryStrategy.tryRetry()) {
-                                                       return Patterns.after(
-                                                                       
lookupRetryStrategy.getRetryDelay(),
-                                                                       
actorSystem.scheduler(),
-                                                                       
actorSystem.dispatcher(),
-                                                                       new 
Callable<Future<KvStateLocation>>() {
-                                                                               
@Override
-                                                                               
public Future<KvStateLocation> call() throws Exception {
-                                                                               
        return getKvStateLookupInfo(
-                                                                               
                        jobId,
-                                                                               
                        registrationName,
-                                                                               
                        lookupRetryStrategy);
-                                                                               
}
-                                                                       });
-                                               } else {
-                                                       return 
Futures.failed(failure);
-                                               }
-                                       }
-                               }, actorSystem.dispatcher());
-       }
-
-       @Override
-       public void notifyLeaderAddress(String leaderAddress, final UUID 
leaderSessionID) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Received leader address notification {}:{}", 
leaderAddress, leaderSessionID);
-               }
-
-               if (leaderAddress == null) {
-                       jobManagerFuture = UNKNOWN_JOB_MANAGER;
-               } else {
-                       jobManagerFuture = 
AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
-                                       .map(new Mapper<ActorRef, 
ActorGateway>() {
-                                               @Override
-                                               public ActorGateway 
apply(ActorRef actorRef) {
-                                                       return new 
AkkaActorGateway(actorRef, leaderSessionID);
-                                               }
-                                       }, actorSystem.dispatcher());
-               }
-       }
-
-       @Override
-       public void handleError(Exception exception) {
-               jobManagerFuture = Futures.failed(exception);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Retry strategy for failed lookups.
-        *
-        * <p>Usage:
-        * <pre>
-        * LookupRetryStrategy retryStrategy = 
LookupRetryStrategyFactory.create();
-        *
-        * if (retryStrategy.tryRetry()) {
-        *     // OK to retry
-        *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
-        * }
-        * </pre>
-        */
-       public interface LookupRetryStrategy {
-
-               /**
-                * Returns the current retry.
-                *
-                * @return Current retry delay.
-                */
-               FiniteDuration getRetryDelay();
-
-               /**
-                * Tries another retry and returns whether it is allowed or not.
-                *
-                * @return Whether it is allowed to do another restart or not.
-                */
-               boolean tryRetry();
-
-       }
-
-       /**
-        * Factory for retry strategies.
-        */
-       public interface LookupRetryStrategyFactory {
-
-               /**
-                * Creates a new retry strategy.
-                *
-                * @return The retry strategy.
-                */
-               LookupRetryStrategy createRetryStrategy();
-
-       }
-
-       /**
-        * Factory for disabled retries.
-        */
-       public static class DisabledLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
-
-               private static final DisabledLookupRetryStrategy RETRY_STRATEGY 
= new DisabledLookupRetryStrategy();
-
-               @Override
-               public LookupRetryStrategy createRetryStrategy() {
-                       return RETRY_STRATEGY;
-               }
-
-               private static class DisabledLookupRetryStrategy implements 
LookupRetryStrategy {
-
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               return FiniteDuration.Zero();
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               return false;
-                       }
-               }
-
-       }
-
-       /**
-        * Factory for fixed delay retries.
-        */
-       public static class FixedDelayLookupRetryStrategyFactory implements 
LookupRetryStrategyFactory {
-
-               private final int maxRetries;
-               private final FiniteDuration retryDelay;
-
-               FixedDelayLookupRetryStrategyFactory(int maxRetries, 
FiniteDuration retryDelay) {
-                       this.maxRetries = maxRetries;
-                       this.retryDelay = retryDelay;
-               }
-
-               @Override
-               public LookupRetryStrategy createRetryStrategy() {
-                       return new FixedDelayLookupRetryStrategy(maxRetries, 
retryDelay);
-               }
-
-               private static class FixedDelayLookupRetryStrategy implements 
LookupRetryStrategy {
-
-                       private final Object retryLock = new Object();
-                       private final int maxRetries;
-                       private final FiniteDuration retryDelay;
-                       private int numRetries;
-
-                       public FixedDelayLookupRetryStrategy(int maxRetries, 
FiniteDuration retryDelay) {
-                               Preconditions.checkArgument(maxRetries >= 0, 
"Negative number maximum retries");
-                               this.maxRetries = maxRetries;
-                               this.retryDelay = 
Preconditions.checkNotNull(retryDelay, "Retry delay");
-                       }
-
-                       @Override
-                       public FiniteDuration getRetryDelay() {
-                               synchronized (retryLock) {
-                                       return retryDelay;
-                               }
-                       }
-
-                       @Override
-                       public boolean tryRetry() {
-                               synchronized (retryLock) {
-                                       if (numRetries < maxRetries) {
-                                               numRetries++;
-                                               return true;
-                                       } else {
-                                               return false;
-                                       }
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
deleted file mode 100644
index d456cd7..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import akka.dispatch.Futures;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * Netty-based client querying {@link KvStateServer} instances.
- *
- * <p>This client can be used by multiple threads concurrently. Operations are
- * executed asynchronously and return Futures to their result.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. 
Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame 
before
- * giving it to our {@link KvStateClientHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered.
- */
-public class KvStateClient {
-
-       /** Netty's Bootstrap. */
-       private final Bootstrap bootstrap;
-
-       /** Statistics tracker. */
-       private final KvStateRequestStats stats;
-
-       /** Established connections. */
-       private final ConcurrentHashMap<KvStateServerAddress, 
EstablishedConnection> establishedConnections =
-                       new ConcurrentHashMap<>();
-
-       /** Pending connections. */
-       private final ConcurrentHashMap<KvStateServerAddress, 
PendingConnection> pendingConnections =
-                       new ConcurrentHashMap<>();
-
-       /** Atomic shut down flag. */
-       private final AtomicBoolean shutDown = new AtomicBoolean();
-
-       /**
-        * Creates a client with the specified number of event loop threads.
-        *
-        * @param numEventLoopThreads Number of event loop threads (minimum 1).
-        */
-       public KvStateClient(int numEventLoopThreads, KvStateRequestStats 
stats) {
-               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
-               NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateClient Event Loop 
Thread %d")
-                               .build();
-
-               NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               this.bootstrap = new Bootstrap()
-                               .group(nioGroup)
-                               .channel(NioSocketChannel.class)
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               .handler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline()
-                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                                               // 
ChunkedWriteHandler respects Channel writability
-                                                               .addLast(new 
ChunkedWriteHandler());
-                                       }
-                               });
-
-               this.stats = Preconditions.checkNotNull(stats, "Statistics 
tracker");
-       }
-
-       /**
-        * Returns a future holding the serialized request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param serverAddress Address of the server to query
-        * @param kvStateId ID of the KvState instance to query
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query KvState instance with
-        * @return Future holding the serialized result
-        */
-       public Future<byte[]> getKvState(
-                       KvStateServerAddress serverAddress,
-                       KvStateID kvStateId,
-                       byte[] serializedKeyAndNamespace) {
-
-               if (shutDown.get()) {
-                       return Futures.failed(new IllegalStateException("Shut 
down"));
-               }
-
-               EstablishedConnection connection = 
establishedConnections.get(serverAddress);
-
-               if (connection != null) {
-                       return connection.getKvState(kvStateId, 
serializedKeyAndNamespace);
-               } else {
-                       PendingConnection pendingConnection = 
pendingConnections.get(serverAddress);
-                       if (pendingConnection != null) {
-                               // There was a race, use the existing pending 
connection.
-                               return pendingConnection.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                       } else {
-                               // We try to connect to the server.
-                               PendingConnection pending = new 
PendingConnection(serverAddress);
-                               PendingConnection previous = 
pendingConnections.putIfAbsent(serverAddress, pending);
-
-                               if (previous == null) {
-                                       // OK, we are responsible to connect.
-                                       
bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
-                                                       .addListener(pending);
-
-                                       return pending.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                               } else {
-                                       // There was a race, use the existing 
pending connection.
-                                       return previous.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Shuts down the client and closes all connections.
-        *
-        * <p>After a call to this method, all returned futures will be failed.
-        */
-       public void shutDown() {
-               if (shutDown.compareAndSet(false, true)) {
-                       for (Map.Entry<KvStateServerAddress, 
EstablishedConnection> conn : establishedConnections.entrySet()) {
-                               if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-                                       conn.getValue().close();
-                               }
-                       }
-
-                       for (Map.Entry<KvStateServerAddress, PendingConnection> 
conn : pendingConnections.entrySet()) {
-                               if (pendingConnections.remove(conn.getKey()) != 
null) {
-                                       conn.getValue().close();
-                               }
-                       }
-
-                       if (bootstrap != null) {
-                               EventLoopGroup group = bootstrap.group();
-                               if (group != null) {
-                                       group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Closes the connection to the given server address if it exists.
-        *
-        * <p>If there is a request to the server a new connection will be 
established.
-        *
-        * @param serverAddress Target address of the connection to close
-        */
-       public void closeConnection(KvStateServerAddress serverAddress) {
-               PendingConnection pending = 
pendingConnections.get(serverAddress);
-               if (pending != null) {
-                       pending.close();
-               }
-
-               EstablishedConnection established = 
establishedConnections.remove(serverAddress);
-               if (established != null) {
-                       established.close();
-               }
-       }
-
-       /**
-        * A pending connection that is in the process of connecting.
-        */
-       private class PendingConnection implements ChannelFutureListener {
-
-               /** Lock to guard the connect call, channel hand in, etc. */
-               private final Object connectLock = new Object();
-
-               /** Address of the server we are connecting to. */
-               private final KvStateServerAddress serverAddress;
-
-               /** Queue of requests while connecting. */
-               private final ArrayDeque<PendingRequest> queuedRequests = new 
ArrayDeque<>();
-
-               /** The established connection after the connect succeeds. */
-               private EstablishedConnection established;
-
-               /** Closed flag. */
-               private boolean closed;
-
-               /** Failure cause if something goes wrong. */
-               private Throwable failureCause;
-
-               /**
-                * Creates a pending connection to the given server.
-                *
-                * @param serverAddress Address of the server to connect to.
-                */
-               private PendingConnection(KvStateServerAddress serverAddress) {
-                       this.serverAddress = serverAddress;
-               }
-
-               @Override
-               public void operationComplete(ChannelFuture future) throws 
Exception {
-                       // Callback from the Bootstrap's connect call.
-                       if (future.isSuccess()) {
-                               handInChannel(future.channel());
-                       } else {
-                               close(future.cause());
-                       }
-               }
-
-               /**
-                * Returns a future holding the serialized request result.
-                *
-                * <p>If the channel has been established, forward the call to 
the
-                * established channel, otherwise queue it for when the channel 
is
-                * handed in.
-                *
-                * @param kvStateId                 ID of the KvState instance 
to query
-                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
-                *                                  with
-                * @return Future holding the serialized result
-                */
-               public Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                       synchronized (connectLock) {
-                               if (failureCause != null) {
-                                       return Futures.failed(failureCause);
-                               } else if (closed) {
-                                       return Futures.failed(new 
ClosedChannelException());
-                               } else {
-                                       if (established != null) {
-                                               return 
established.getKvState(kvStateId, serializedKeyAndNamespace);
-                                       } else {
-                                               // Queue this and handle when 
connected
-                                               PendingRequest pending = new 
PendingRequest(kvStateId, serializedKeyAndNamespace);
-                                               queuedRequests.add(pending);
-                                               return pending.promise.future();
-                                       }
-                               }
-                       }
-               }
-
-               /**
-                * Hands in a channel after a successful connection.
-                *
-                * @param channel Channel to hand in
-                */
-               private void handInChannel(Channel channel) {
-                       synchronized (connectLock) {
-                               if (closed || failureCause != null) {
-                                       // Close the channel and we are done. 
Any queued requests
-                                       // are removed on the close/failure 
call and after that no
-                                       // new ones can be enqueued.
-                                       channel.close();
-                               } else {
-                                       established = new 
EstablishedConnection(serverAddress, channel);
-
-                                       PendingRequest pending;
-                                       while ((pending = 
queuedRequests.poll()) != null) {
-                                               Future<byte[]> resultFuture = 
established.getKvState(
-                                                               
pending.kvStateId,
-                                                               
pending.serializedKeyAndNamespace);
-
-                                               
pending.promise.completeWith(resultFuture);
-                                       }
-
-                                       // Publish the channel for the general 
public
-                                       
establishedConnections.put(serverAddress, established);
-                                       
pendingConnections.remove(serverAddress);
-
-                                       // Check shut down for possible race 
with shut down. We
-                                       // don't want any lingering connections 
after shut down,
-                                       // which can happen if we don't check 
this here.
-                                       if (shutDown.get()) {
-                                               if 
(establishedConnections.remove(serverAddress, established)) {
-                                                       established.close();
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               /**
-                * Close the connecting channel with a ClosedChannelException.
-                */
-               private void close() {
-                       close(new ClosedChannelException());
-               }
-
-               /**
-                * Close the connecting channel with an Exception (can be
-                * <code>null</code>) or forward to the established channel.
-                */
-               private void close(Throwable cause) {
-                       synchronized (connectLock) {
-                               if (!closed) {
-                                       if (failureCause == null) {
-                                               failureCause = cause;
-                                       }
-
-                                       if (established != null) {
-                                               established.close();
-                                       } else {
-                                               PendingRequest pending;
-                                               while ((pending = 
queuedRequests.poll()) != null) {
-                                                       
pending.promise.tryFailure(cause);
-                                               }
-                                       }
-
-                                       closed = true;
-                               }
-                       }
-               }
-
-               /**
-                * A pending request queued while the channel is connecting.
-                */
-               private final class PendingRequest {
-
-                       private final KvStateID kvStateId;
-                       private final byte[] serializedKeyAndNamespace;
-                       private final Promise<byte[]> promise;
-
-                       private PendingRequest(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                               this.kvStateId = kvStateId;
-                               this.serializedKeyAndNamespace = 
serializedKeyAndNamespace;
-                               this.promise = Futures.promise();
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       synchronized (connectLock) {
-                               return "PendingConnection{" +
-                                               "serverAddress=" + 
serverAddress +
-                                               ", queuedRequests=" + 
queuedRequests.size() +
-                                               ", established=" + (established 
!= null) +
-                                               ", closed=" + closed +
-                                               '}';
-                       }
-               }
-       }
-
-       /**
-        * An established connection that wraps the actual channel instance and 
is
-        * registered at the {@link KvStateClientHandler} for callbacks.
-        */
-       private class EstablishedConnection implements 
KvStateClientHandlerCallback {
-
-               /** Address of the server we are connected to. */
-               private final KvStateServerAddress serverAddress;
-
-               /** The actual TCP channel. */
-               private final Channel channel;
-
-               /** Pending requests keyed by request ID. */
-               private final ConcurrentHashMap<Long, PromiseAndTimestamp> 
pendingRequests = new ConcurrentHashMap<>();
-
-               /** Current request number used to assign unique request IDs. */
-               private final AtomicLong requestCount = new AtomicLong();
-
-               /** Reference to a failure that was reported by the channel. */
-               private final AtomicReference<Throwable> failureCause = new 
AtomicReference<>();
-
-               /**
-                * Creates an established connection with the given channel.
-                *
-                * @param serverAddress Address of the server connected to
-                * @param channel The actual TCP channel
-                */
-               EstablishedConnection(KvStateServerAddress serverAddress, 
Channel channel) {
-                       this.serverAddress = 
Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
-                       this.channel = Preconditions.checkNotNull(channel, 
"Channel");
-
-                       // Add the client handler with the callback
-                       channel.pipeline().addLast("KvStateClientHandler", new 
KvStateClientHandler(this));
-
-                       stats.reportActiveConnection();
-               }
-
-               /**
-                * Close the channel with a ClosedChannelException.
-                */
-               void close() {
-                       close(new ClosedChannelException());
-               }
-
-               /**
-                * Close the channel with a cause.
-                *
-                * @param cause The cause to close the channel with.
-                * @return Channel close future
-                */
-               private boolean close(Throwable cause) {
-                       if (failureCause.compareAndSet(null, cause)) {
-                               channel.close();
-                               stats.reportInactiveConnection();
-
-                               for (long requestId : pendingRequests.keySet()) 
{
-                                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                                       if (pending != null && 
pending.promise.tryFailure(cause)) {
-                                               stats.reportFailedRequest();
-                                       }
-                               }
-
-                               return true;
-                       }
-
-                       return false;
-               }
-
-               /**
-                * Returns a future holding the serialized request result.
-                *
-                * @param kvStateId                 ID of the KvState instance 
to query
-                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
-                *                                  with
-                * @return Future holding the serialized result
-                */
-               Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                       PromiseAndTimestamp requestPromiseTs = new 
PromiseAndTimestamp(
-                                       Futures.<byte[]>promise(),
-                                       System.nanoTime());
-
-                       try {
-                               final long requestId = 
requestCount.getAndIncrement();
-                               pendingRequests.put(requestId, 
requestPromiseTs);
-
-                               stats.reportRequest();
-
-                               ByteBuf buf = 
MessageSerializer.serializeKvStateRequest(
-                                               channel.alloc(),
-                                               requestId,
-                                               kvStateId,
-                                               serializedKeyAndNamespace);
-
-                               channel.writeAndFlush(buf).addListener(new 
ChannelFutureListener() {
-                                       @Override
-                                       public void 
operationComplete(ChannelFuture future) throws Exception {
-                                               if (!future.isSuccess()) {
-                                                       // Fail promise if not 
failed to write
-                                                       PromiseAndTimestamp 
pending = pendingRequests.remove(requestId);
-                                                       if (pending != null && 
pending.promise.tryFailure(future.cause())) {
-                                                               
stats.reportFailedRequest();
-                                                       }
-                                               }
-                                       }
-                               });
-
-                               // Check failure for possible race. We don't 
want any lingering
-                               // promises after a failure, which can happen 
if we don't check
-                               // this here. Note that close is treated as a 
failure as well.
-                               Throwable failure = failureCause.get();
-                               if (failure != null) {
-                                       // Remove from pending requests to 
guard against concurrent
-                                       // removal and to make sure that we 
only count it once as failed.
-                                       PromiseAndTimestamp p = 
pendingRequests.remove(requestId);
-                                       if (p != null && 
p.promise.tryFailure(failure)) {
-                                               stats.reportFailedRequest();
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               requestPromiseTs.promise.tryFailure(t);
-                       }
-
-                       return requestPromiseTs.promise.future();
-               }
-
-               @Override
-               public void onRequestResult(long requestId, byte[] 
serializedValue) {
-                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                       if (pending != null && 
pending.promise.trySuccess(serializedValue)) {
-                               long durationMillis = (System.nanoTime() - 
pending.timestamp) / 1_000_000;
-                               stats.reportSuccessfulRequest(durationMillis);
-                       }
-               }
-
-               @Override
-               public void onRequestFailure(long requestId, Throwable cause) {
-                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                       if (pending != null && 
pending.promise.tryFailure(cause)) {
-                               stats.reportFailedRequest();
-                       }
-               }
-
-               @Override
-               public void onFailure(Throwable cause) {
-                       if (close(cause)) {
-                               // Remove from established channels, otherwise 
future
-                               // requests will be handled by this failed 
channel.
-                               establishedConnections.remove(serverAddress, 
this);
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       return "EstablishedConnection{" +
-                                       "serverAddress=" + serverAddress +
-                                       ", channel=" + channel +
-                                       ", pendingRequests=" + 
pendingRequests.size() +
-                                       ", requestCount=" + requestCount +
-                                       ", failureCause=" + failureCause +
-                                       '}';
-               }
-
-               /**
-                * Pair of promise and a timestamp.
-                */
-               private class PromiseAndTimestamp {
-
-                       private final Promise<byte[]> promise;
-                       private final long timestamp;
-
-                       public PromiseAndTimestamp(Promise<byte[]> promise, 
long timestamp) {
-                               this.promise = promise;
-                               this.timestamp = timestamp;
-                       }
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
deleted file mode 100644
index 36a2b31..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.channels.ClosedChannelException;
-
-/**
- * This handler expects responses from {@link KvStateServerHandler}.
- *
- * <p>It deserializes the response and calls the registered callback, which is
- * responsible for actually handling the result (see {@link 
KvStateClient.EstablishedConnection}).
- */
-public class KvStateClientHandler extends ChannelInboundHandlerAdapter {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientHandler.class);
-
-       private final KvStateClientHandlerCallback callback;
-
-       /**
-        * Creates a {@link KvStateClientHandler} with the callback.
-        *
-        * @param callback Callback for responses.
-        */
-       public KvStateClientHandler(KvStateClientHandlerCallback callback) {
-               this.callback = callback;
-       }
-
-       @Override
-       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-               try {
-                       ByteBuf buf = (ByteBuf) msg;
-                       MessageType msgType = 
MessageSerializer.deserializeHeader(buf);
-
-                       if (msgType == MessageType.REQUEST_RESULT) {
-                               KvStateRequestResult result = 
MessageSerializer.deserializeKvStateRequestResult(buf);
-                               callback.onRequestResult(result.getRequestId(), 
result.getSerializedResult());
-                       } else if (msgType == MessageType.REQUEST_FAILURE) {
-                               KvStateRequestFailure failure = 
MessageSerializer.deserializeKvStateRequestFailure(buf);
-                               
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
-                       } else if (msgType == MessageType.SERVER_FAILURE) {
-                               throw 
MessageSerializer.deserializeServerFailure(buf);
-                       } else {
-                               throw new IllegalStateException("Unexpected 
response type '" + msgType + "'");
-                       }
-               } catch (Throwable t1) {
-                       try {
-                               callback.onFailure(t1);
-                       } catch (Throwable t2) {
-                               LOG.error("Failed to notify callback about 
failure", t2);
-                       }
-               } finally {
-                       ReferenceCountUtil.release(msg);
-               }
-       }
-
-       @Override
-       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-               try {
-                       callback.onFailure(cause);
-               } catch (Throwable t) {
-                       LOG.error("Failed to notify callback about failure", t);
-               }
-       }
-
-       @Override
-       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
-               // Only the client is expected to close the channel. Otherwise 
it
-               // indicates a failure. Note that this will be invoked in both 
cases
-               // though. If the callback closed the channel, the callback 
must be
-               // ignored.
-               try {
-                       callback.onFailure(new ClosedChannelException());
-               } catch (Throwable t) {
-                       LOG.error("Failed to notify callback about failure", t);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
deleted file mode 100644
index 98718fa..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-
-/**
- * Callback for {@link KvStateClientHandler}.
- */
-public interface KvStateClientHandlerCallback {
-
-       /**
-        * Called on a successful {@link KvStateRequest}.
-        *
-        * @param requestId       ID of the request
-        * @param serializedValue Serialized value for the request
-        */
-       void onRequestResult(long requestId, byte[] serializedValue);
-
-       /**
-        * Called on a failed {@link KvStateRequest}.
-        *
-        * @param requestId ID of the request
-        * @param cause     Cause of the request failure
-        */
-       void onRequestFailure(long requestId, Throwable cause);
-
-       /**
-        * Called on any failure, which is not related to a specific request.
-        *
-        * <p>This can be for example a caught Exception in the channel pipeline
-        * or an unexpected channel close.
-        *
-        * @param cause Cause of the failure
-        */
-       void onFailure(Throwable cause);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
 
b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
deleted file mode 100644
index 635cbae..0000000
--- 
a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.query.KvStateLocation;
-
-import scala.concurrent.Future;
-
-/**
- * {@link KvStateLocation} lookup service.
- */
-public interface KvStateLocationLookupService {
-
-       /**
-        * Starts the lookup service.
-        */
-       void start();
-
-       /**
-        * Shuts down the lookup service.
-        */
-       void shutDown();
-
-       /**
-        * Returns a future holding the {@link KvStateLocation} for the given 
job
-        * and KvState registration name.
-        *
-        * @param jobId            JobID the KvState instance belongs to
-        * @param registrationName Name under which the KvState has been 
registered
-        * @return Future holding the {@link KvStateLocation}
-        */
-       Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String 
registrationName);
-
-}

Reply via email to