http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
deleted file mode 100644
index 4ba6929..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeHint;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.configuration.QueryableStateOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory;
-import 
org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
-import org.apache.flink.runtime.query.netty.KvStateClient;
-import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
-import org.apache.flink.runtime.query.netty.UnknownKvStateID;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceTypeInfo;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Client for queryable state.
- *
- * <p>You can mark state as queryable via {@link 
StateDescriptor#setQueryable(String)}.
- * The state instance created from this descriptor will be published for 
queries
- * when it's created on the TaskManagers and the location will be reported to
- * the JobManager.
- *
- * <p>The client resolves the location of the requested KvState via the
- * JobManager. Resolved locations are cached. When the server address of the
- * requested KvState instance is determined, the client sends out a request to
- * the server.
- */
-public class QueryableStateClient {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateClient.class);
-
-       /**
-        * {@link KvStateLocation} lookup to resolve the address of KvState 
instances.
-        */
-       private final KvStateLocationLookupService lookupService;
-
-       /**
-        * Network client for queries against {@link KvStateServer} instances.
-        */
-       private final KvStateClient kvStateClient;
-
-       /**
-        * Execution context.
-        */
-       private final ExecutionContext executionContext;
-
-       /**
-        * Cache for {@link KvStateLocation} instances keyed by job and name.
-        */
-       private final ConcurrentMap<Tuple2<JobID, String>, 
Future<KvStateLocation>> lookupCache =
-                       new ConcurrentHashMap<>();
-
-       /** This is != null, if we started the actor system. */
-       private final ActorSystem actorSystem;
-
-       private ExecutionConfig executionConfig;
-
-       /**
-        * Creates a client from the given configuration.
-        *
-        * <p>This will create multiple Thread pools: one for the started actor
-        * system and another for the network client.
-        *
-        * @param config Configuration to use.
-        * @throws Exception Failures are forwarded
-        */
-       public QueryableStateClient(Configuration config) throws Exception {
-               this(config, 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                               config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
-       }
-
-       /**
-        * Creates a client from the given configuration.
-        *
-        * <p>This will create multiple Thread pools: one for the started actor
-        * system and another for the network client.
-        *
-        * @param config Configuration to use.
-        * @param highAvailabilityServices Service factory for high 
availability services
-        * @throws Exception Failures are forwarded
-        *
-        * @deprecated This constructor is deprecated and stays only for 
backwards compatibility. Use the
-        * {@link #QueryableStateClient(Configuration)} instead.
-        */
-       @Deprecated
-       public QueryableStateClient(
-                       Configuration config,
-                       HighAvailabilityServices highAvailabilityServices) 
throws Exception {
-               Preconditions.checkNotNull(config, "Configuration");
-
-               // Create a leader retrieval service
-               LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-
-               // Get the ask timeout
-               String askTimeoutString = 
config.getString(AkkaOptions.ASK_TIMEOUT);
-
-               Duration timeout = FiniteDuration.apply(askTimeoutString);
-               if (!timeout.isFinite()) {
-                       throw new 
IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key()
-                                       + " is not a finite timeout ('" + 
askTimeoutString + "')");
-               }
-
-               FiniteDuration askTimeout = (FiniteDuration) timeout;
-
-               int lookupRetries = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES);
-               int lookupRetryDelayMillis = 
config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY);
-
-               // Retries if no JobManager is around
-               LookupRetryStrategyFactory retryStrategy = new 
FixedDelayLookupRetryStrategyFactory(
-                               lookupRetries,
-                               FiniteDuration.apply(lookupRetryDelayMillis, 
"ms"));
-
-               // Create the actor system
-               @SuppressWarnings("unchecked")
-               Option<Tuple2<String, Object>> remoting = new Some(new 
Tuple2<>("", 0));
-               this.actorSystem = AkkaUtils.createActorSystem(config, 
remoting);
-
-               AkkaKvStateLocationLookupService lookupService = new 
AkkaKvStateLocationLookupService(
-                               leaderRetrievalService,
-                               actorSystem,
-                               askTimeout,
-                               retryStrategy);
-
-               int numEventLoopThreads = 
config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS);
-
-               if (numEventLoopThreads == 0) {
-                       numEventLoopThreads = 
Runtime.getRuntime().availableProcessors();
-               }
-
-               // Create the network client
-               KvStateClient networkClient = new KvStateClient(
-                               numEventLoopThreads,
-                               new DisabledKvStateRequestStats());
-
-               this.lookupService = lookupService;
-               this.kvStateClient = networkClient;
-               this.executionContext = actorSystem.dispatcher();
-               this.executionConfig = new ExecutionConfig();
-
-               this.lookupService.start();
-       }
-
-       /** Gets the {@link ExecutionConfig}. */
-       public ExecutionConfig getExecutionConfig() {
-               return executionConfig;
-       }
-
-       /** Sets the {@link ExecutionConfig}. */
-       public void setExecutionConfig(ExecutionConfig config) {
-               this.executionConfig = config;
-       }
-
-       /**
-        * Creates a client.
-        *
-        * @param lookupService    Location lookup service
-        * @param kvStateClient    Network client for queries
-        * @param executionContext Execution context for futures
-        */
-       public QueryableStateClient(
-                       KvStateLocationLookupService lookupService,
-                       KvStateClient kvStateClient,
-                       ExecutionContext executionContext) {
-
-               this.lookupService = Preconditions.checkNotNull(lookupService, 
"KvStateLocationLookupService");
-               this.kvStateClient = Preconditions.checkNotNull(kvStateClient, 
"KvStateClient");
-               this.executionContext = 
Preconditions.checkNotNull(executionContext, "ExecutionContext");
-               this.actorSystem = null;
-
-               this.lookupService.start();
-       }
-
-       /**
-        * Returns the execution context of this client.
-        *
-        * @return The execution context used by the client.
-        */
-       public ExecutionContext getExecutionContext() {
-               return executionContext;
-       }
-
-       /**
-        * Shuts down the client and all components.
-        */
-       public void shutDown() {
-               try {
-                       lookupService.shutDown();
-               } catch (Throwable t) {
-                       LOG.error("Failed to shut down KvStateLookupService", 
t);
-               }
-
-               try {
-                       kvStateClient.shutDown();
-               } catch (Throwable t) {
-                       LOG.error("Failed to shut down KvStateClient", t);
-               }
-
-               if (actorSystem != null) {
-                       try {
-                               actorSystem.shutdown();
-                       } catch (Throwable t) {
-                               LOG.error("Failed to shut down ActorSystem", t);
-                       }
-               }
-       }
-
-       /**
-        * Returns a future holding the serialized request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable state
-        *                                  belongs to
-        * @param queryableStateName        Name under which the state is 
queryable
-        * @param keyHashCode               Integer hash code of the key 
(result of
-        *                                  a call to {@link Object#hashCode()}
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
-        *                                  KvState instance with
-        * @return Future holding the serialized result
-        */
-       @SuppressWarnings("unchecked")
-       public Future<byte[]> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace) {
-
-               return getKvState(jobId, queryableStateName, keyHashCode, 
serializedKeyAndNamespace, false)
-                               .recoverWith(new Recover<Future<byte[]>>() {
-                                       @Override
-                                       public Future<byte[]> recover(Throwable 
failure) throws Throwable {
-                                               if (failure instanceof 
UnknownKvStateID ||
-                                                               failure 
instanceof UnknownKvStateKeyGroupLocation ||
-                                                               failure 
instanceof UnknownKvStateLocation ||
-                                                               failure 
instanceof ConnectException) {
-                                                       // These failures are 
likely to be caused by out-of-sync
-                                                       // KvStateLocation. 
Therefore we retry this query and
-                                                       // force look up the 
location.
-                                                       return getKvState(
-                                                                       jobId,
-                                                                       
queryableStateName,
-                                                                       
keyHashCode,
-                                                                       
serializedKeyAndNamespace,
-                                                                       true);
-                                               } else {
-                                                       return 
Futures.failed(failure);
-                                               }
-                                       }
-                               }, executionContext);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key we are interested 
in.
-        * @param keyTypeHint                           A {@link TypeHint} used 
to extract the type of the key.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
-        */
-       @PublicEvolving
-       public <K, V> Future<V> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final TypeHint<K> keyTypeHint,
-                       final StateDescriptor<?, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(keyTypeHint);
-
-               TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
-               return getKvState(jobId, queryableStateName, key, keyTypeInfo, 
stateDescriptor);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key we are interested 
in.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the key.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
-        */
-       @PublicEvolving
-       public <K, V> Future<V> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final TypeInformation<K> keyTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(keyTypeInfo);
-
-               return getKvState(jobId, queryableStateName, key, 
VoidNamespace.INSTANCE,
-                               keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, 
stateDescriptor);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key that the state we 
request is associated with.
-        * @param namespace                                     The namespace 
of the state.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
-        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
-        * @param stateDescriptor                       The {@link 
StateDescriptor} of the state we want to query.
-        * @return Future holding the result.
-        */
-       @PublicEvolving
-       public <K, V, N> Future<V> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final N namespace,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeInformation<N> namespaceTypeInfo,
-                       final StateDescriptor<?, V> stateDescriptor) {
-
-               Preconditions.checkNotNull(stateDescriptor);
-
-               // initialize the value serializer based on the execution 
config.
-               stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-               TypeSerializer<V> stateSerializer = 
stateDescriptor.getSerializer();
-
-               return getKvState(jobId, queryableStateName, key,
-                               namespace, keyTypeInfo, namespaceTypeInfo, 
stateSerializer);
-       }
-
-       /**
-        * Returns a future holding the request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param jobId                     JobID of the job the queryable 
state belongs to.
-        * @param queryableStateName        Name under which the state is 
queryable.
-        * @param key                               The key that the state we 
request is associated with.
-        * @param namespace                                     The namespace 
of the state.
-        * @param keyTypeInfo                           The {@link 
TypeInformation} of the keys.
-        * @param namespaceTypeInfo                     The {@link 
TypeInformation} of the namespace.
-        * @param stateSerializer                       The {@link 
TypeSerializer} of the state we want to query.
-        * @return Future holding the result.
-        */
-       @PublicEvolving
-       public <K, V, N> Future<V> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final K key,
-                       final N namespace,
-                       final TypeInformation<K> keyTypeInfo,
-                       final TypeInformation<N> namespaceTypeInfo,
-                       final TypeSerializer<V> stateSerializer) {
-
-               Preconditions.checkNotNull(queryableStateName);
-
-               Preconditions.checkNotNull(key);
-               Preconditions.checkNotNull(namespace);
-
-               Preconditions.checkNotNull(keyTypeInfo);
-               Preconditions.checkNotNull(namespaceTypeInfo);
-               Preconditions.checkNotNull(stateSerializer);
-
-               if (stateSerializer instanceof ListSerializer) {
-                       throw new IllegalArgumentException("ListState is not 
supported out-of-the-box yet.");
-               }
-
-               TypeSerializer<K> keySerializer = 
keyTypeInfo.createSerializer(executionConfig);
-               TypeSerializer<N> namespaceSerializer = 
namespaceTypeInfo.createSerializer(executionConfig);
-
-               final byte[] serializedKeyAndNamespace;
-               try {
-                       serializedKeyAndNamespace = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-                                       key,
-                                       keySerializer,
-                                       namespace,
-                                       namespaceSerializer);
-               } catch (IOException e) {
-                       return Futures.failed(e);
-               }
-
-               return getKvState(jobId, queryableStateName, key.hashCode(), 
serializedKeyAndNamespace)
-                               .flatMap(new Mapper<byte[], Future<V>>() {
-                                       @Override
-                                       public Future<V> apply(byte[] 
parameter) {
-                                               try {
-                                                       return 
Futures.successful(
-                                                                       
KvStateRequestSerializer.deserializeValue(parameter, stateSerializer));
-                                               } catch (IOException e) {
-                                                       return 
Futures.failed(e);
-                                               }
-                                       }
-                               }, executionContext);
-       }
-
-       /**
-        * Returns a future holding the serialized request result.
-        *
-        * @param jobId                     JobID of the job the queryable state
-        *                                  belongs to
-        * @param queryableStateName        Name under which the state is 
queryable
-        * @param keyHashCode               Integer hash code of the key 
(result of
-        *                                  a call to {@link Object#hashCode()}
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query
-        *                                  KvState instance with
-        * @param forceLookup               Flag to force lookup of the {@link 
KvStateLocation}
-        * @return Future holding the serialized result
-        */
-       private Future<byte[]> getKvState(
-                       final JobID jobId,
-                       final String queryableStateName,
-                       final int keyHashCode,
-                       final byte[] serializedKeyAndNamespace,
-                       boolean forceLookup) {
-
-               return getKvStateLookupInfo(jobId, queryableStateName, 
forceLookup)
-                               .flatMap(new Mapper<KvStateLocation, 
Future<byte[]>>() {
-                                       @Override
-                                       public Future<byte[]> 
apply(KvStateLocation lookup) {
-                                               int keyGroupIndex = 
KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, 
lookup.getNumKeyGroups());
-
-                                               KvStateServerAddress 
serverAddress = lookup.getKvStateServerAddress(keyGroupIndex);
-                                               if (serverAddress == null) {
-                                                       return 
Futures.failed(new UnknownKvStateKeyGroupLocation());
-                                               } else {
-                                                       // Query server
-                                                       KvStateID kvStateId = 
lookup.getKvStateID(keyGroupIndex);
-                                                       return 
kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace);
-                                               }
-                                       }
-                               }, executionContext);
-       }
-
-       /**
-        * Lookup the {@link KvStateLocation} for the given job and queryable 
state
-        * name.
-        *
-        * <p>The job manager will be queried for the location only if forced 
or no
-        * cached location can be found. There are no guarantees about
-        *
-        * @param jobId              JobID the state instance belongs to.
-        * @param queryableStateName Name under which the state instance has 
been published.
-        * @param forceUpdate        Flag to indicate whether to force a update 
via the lookup service.
-        * @return Future holding the KvStateLocation
-        */
-       private Future<KvStateLocation> getKvStateLookupInfo(
-                       JobID jobId,
-                       final String queryableStateName,
-                       boolean forceUpdate) {
-
-               if (forceUpdate) {
-                       Future<KvStateLocation> lookupFuture = lookupService
-                                       .getKvStateLookupInfo(jobId, 
queryableStateName);
-                       lookupCache.put(new Tuple2<>(jobId, 
queryableStateName), lookupFuture);
-                       return lookupFuture;
-               } else {
-                       Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, 
queryableStateName);
-                       final Future<KvStateLocation> cachedFuture = 
lookupCache.get(cacheKey);
-
-                       if (cachedFuture == null) {
-                               Future<KvStateLocation> lookupFuture = 
lookupService
-                                               .getKvStateLookupInfo(jobId, 
queryableStateName);
-
-                               Future<KvStateLocation> previous = 
lookupCache.putIfAbsent(cacheKey, lookupFuture);
-                               if (previous == null) {
-                                       return lookupFuture;
-                               } else {
-                                       return previous;
-                               }
-                       } else {
-                               // do not retain futures which failed as they 
will remain in
-                               // the cache even if the error cause is not 
present any more
-                               // and a new lookup may succeed
-                               if (cachedFuture.isCompleted() &&
-                                       cachedFuture.value().get().isFailure()) 
{
-                                       // issue a new lookup
-                                       Future<KvStateLocation> lookupFuture = 
lookupService
-                                               .getKvStateLookupInfo(jobId, 
queryableStateName);
-
-                                       // replace the existing one if it has 
not been replaced yet
-                                       // otherwise return the one in the cache
-                                       if (lookupCache.replace(cacheKey, 
cachedFuture, lookupFuture)) {
-                                               return lookupFuture;
-                                       } else {
-                                               return 
lookupCache.get(cacheKey);
-                                       }
-                               } else {
-                                       return cachedFuture;
-                               }
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
new file mode 100644
index 0000000..852d394
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.query;
+
+import org.apache.flink.runtime.query.netty.KvStateRequestStats;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
+
+/**
+ * Utility class to initialize entities used in queryable state.
+ */
+public final class QueryableStateUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(QueryableStateUtils.class);
+
+       /**
+        * Initializes the {@link KvStateServer server} responsible for sending 
the
+        * requested internal state to the Queryable State Client.
+        *
+        * @param address the address to bind to.
+        * @param port the port to listen to.
+        * @param eventLoopThreads the number of threads to be used to process 
incoming requests.
+        * @param queryThreads the number of threads to be used to send the 
actual state.
+        * @param kvStateRegistry the registry with the queryable state.
+        * @param stats statistics to be gathered about the incoming requests.
+        * @return the {@link KvStateServer state server}.
+        */
+       public static KvStateServer createKvStateServer(
+                       final InetAddress address,
+                       final int port,
+                       final int eventLoopThreads,
+                       final int queryThreads,
+                       final KvStateRegistry kvStateRegistry,
+                       final KvStateRequestStats stats) {
+
+               Preconditions.checkNotNull(address, "address");
+               Preconditions.checkNotNull(kvStateRegistry, "registry");
+               Preconditions.checkNotNull(stats, "stats");
+
+               Preconditions.checkArgument(eventLoopThreads >= 1);
+               Preconditions.checkArgument(queryThreads >= 1);
+
+               try {
+                       String classname = 
"org.apache.flink.queryablestate.server.KvStateServerImpl";
+                       Class<? extends KvStateServer> clazz = 
Class.forName(classname).asSubclass(KvStateServer.class);
+                       Constructor<? extends KvStateServer> constructor = 
clazz.getConstructor(
+                                       InetAddress.class,
+                                       Integer.class,
+                                       Integer.class,
+                                       Integer.class,
+                                       KvStateRegistry.class,
+                                       KvStateRequestStats.class);
+                       return constructor.newInstance(address, port, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
+               } catch (ClassNotFoundException e) {
+                       LOG.info("Could not load Queryable State Server. " +
+                                       "Probable reason: flink-queryable-state 
is not in the classpath");
+                       LOG.debug("Caught exception", e);
+                       return null;
+               } catch (InvocationTargetException e) {
+                       LOG.error("Queryable State Server could not be 
created", e.getTargetException());
+                       return null;
+               } catch (Throwable t) {
+                       LOG.error("Failed to instantiate the Queryable State 
Server.", t);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
deleted file mode 100644
index 3549ed6..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-/**
- * Exception to fail Future with if no JobManager is currently registered at
- * the {@link KvStateLocationLookupService}.
- */
-class UnknownJobManager extends Exception {
-
-       private static final long serialVersionUID = 1L;
-
-       public UnknownJobManager() {
-               super("Unknown JobManager. Either the JobManager has not 
registered yet " +
-                               "or has lost leadership.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
deleted file mode 100644
index 8f62be5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-class UnknownKvStateKeyGroupLocation extends Exception {
-
-       private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
deleted file mode 100644
index e4fa809..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-/**
- * A {@link ByteBuf} instance to be consumed in chunks by {@link 
ChunkedWriteHandler},
- * respecting the high and low watermarks.
- *
- * @see <a 
href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0";>Low/High
 Watermarks</a>
- */
-class ChunkedByteBuf implements ChunkedInput<ByteBuf> {
-
-       /** The buffer to chunk. */
-       private final ByteBuf buf;
-
-       /** Size of chunks. */
-       private final int chunkSize;
-
-       /** Closed flag. */
-       private boolean isClosed;
-
-       /** End of input flag. */
-       private boolean isEndOfInput;
-
-       public ChunkedByteBuf(ByteBuf buf, int chunkSize) {
-               this.buf = Preconditions.checkNotNull(buf, "Buffer");
-               Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk 
size");
-               this.chunkSize = chunkSize;
-       }
-
-       @Override
-       public boolean isEndOfInput() throws Exception {
-               return isClosed || isEndOfInput;
-       }
-
-       @Override
-       public void close() throws Exception {
-               if (!isClosed) {
-                       // If we did not consume the whole buffer yet, we have 
to release
-                       // it here. Otherwise, it's the responsibility of the 
consumer.
-                       if (!isEndOfInput) {
-                               buf.release();
-                       }
-
-                       isClosed = true;
-               }
-       }
-
-       @Override
-       public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
-               if (isClosed) {
-                       return null;
-               } else if (buf.readableBytes() <= chunkSize) {
-                       isEndOfInput = true;
-
-                       // Don't retain as the consumer is responsible to 
release it
-                       return buf.slice();
-               } else {
-                       // Return a chunk sized slice of the buffer. The ref 
count is
-                       // shared with the original buffer. That's why we need 
to retain
-                       // a reference here.
-                       return buf.readSlice(chunkSize).retain();
-               }
-       }
-
-       @Override
-       public String toString() {
-               return "ChunkedByteBuf{" +
-                               "buf=" + buf +
-                               ", chunkSize=" + chunkSize +
-                               ", isClosed=" + isClosed +
-                               ", isEndOfInput=" + isEndOfInput +
-                               '}';
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
deleted file mode 100644
index 1a84e83..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import akka.dispatch.Futures;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * Netty-based client querying {@link KvStateServer} instances.
- *
- * <p>This client can be used by multiple threads concurrently. Operations are
- * executed asynchronously and return Futures to their result.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. 
Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame 
before
- * giving it to our {@link KvStateClientHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered.
- */
-public class KvStateClient {
-
-       /** Netty's Bootstrap. */
-       private final Bootstrap bootstrap;
-
-       /** Statistics tracker. */
-       private final KvStateRequestStats stats;
-
-       /** Established connections. */
-       private final ConcurrentHashMap<KvStateServerAddress, 
EstablishedConnection> establishedConnections =
-                       new ConcurrentHashMap<>();
-
-       /** Pending connections. */
-       private final ConcurrentHashMap<KvStateServerAddress, 
PendingConnection> pendingConnections =
-                       new ConcurrentHashMap<>();
-
-       /** Atomic shut down flag. */
-       private final AtomicBoolean shutDown = new AtomicBoolean();
-
-       /**
-        * Creates a client with the specified number of event loop threads.
-        *
-        * @param numEventLoopThreads Number of event loop threads (minimum 1).
-        */
-       public KvStateClient(int numEventLoopThreads, KvStateRequestStats 
stats) {
-               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
-               NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateClient Event Loop 
Thread %d")
-                               .build();
-
-               NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               this.bootstrap = new Bootstrap()
-                               .group(nioGroup)
-                               .channel(NioSocketChannel.class)
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               .handler(new 
ChannelInitializer<SocketChannel>() {
-                                       @Override
-                                       protected void 
initChannel(SocketChannel ch) throws Exception {
-                                               ch.pipeline()
-                                                               .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                                               // 
ChunkedWriteHandler respects Channel writability
-                                                               .addLast(new 
ChunkedWriteHandler());
-                                       }
-                               });
-
-               this.stats = Preconditions.checkNotNull(stats, "Statistics 
tracker");
-       }
-
-       /**
-        * Returns a future holding the serialized request result.
-        *
-        * <p>If the server does not serve a KvState instance with the given ID,
-        * the Future will be failed with a {@link UnknownKvStateID}.
-        *
-        * <p>If the KvState instance does not hold any data for the given key
-        * and namespace, the Future will be failed with a {@link 
UnknownKeyOrNamespace}.
-        *
-        * <p>All other failures are forwarded to the Future.
-        *
-        * @param serverAddress Address of the server to query
-        * @param kvStateId ID of the KvState instance to query
-        * @param serializedKeyAndNamespace Serialized key and namespace to 
query KvState instance with
-        * @return Future holding the serialized result
-        */
-       public Future<byte[]> getKvState(
-                       KvStateServerAddress serverAddress,
-                       KvStateID kvStateId,
-                       byte[] serializedKeyAndNamespace) {
-
-               if (shutDown.get()) {
-                       return Futures.failed(new IllegalStateException("Shut 
down"));
-               }
-
-               EstablishedConnection connection = 
establishedConnections.get(serverAddress);
-
-               if (connection != null) {
-                       return connection.getKvState(kvStateId, 
serializedKeyAndNamespace);
-               } else {
-                       PendingConnection pendingConnection = 
pendingConnections.get(serverAddress);
-                       if (pendingConnection != null) {
-                               // There was a race, use the existing pending 
connection.
-                               return pendingConnection.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                       } else {
-                               // We try to connect to the server.
-                               PendingConnection pending = new 
PendingConnection(serverAddress);
-                               PendingConnection previous = 
pendingConnections.putIfAbsent(serverAddress, pending);
-
-                               if (previous == null) {
-                                       // OK, we are responsible to connect.
-                                       
bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
-                                                       .addListener(pending);
-
-                                       return pending.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                               } else {
-                                       // There was a race, use the existing 
pending connection.
-                                       return previous.getKvState(kvStateId, 
serializedKeyAndNamespace);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Shuts down the client and closes all connections.
-        *
-        * <p>After a call to this method, all returned futures will be failed.
-        */
-       public void shutDown() {
-               if (shutDown.compareAndSet(false, true)) {
-                       for (Map.Entry<KvStateServerAddress, 
EstablishedConnection> conn : establishedConnections.entrySet()) {
-                               if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-                                       conn.getValue().close();
-                               }
-                       }
-
-                       for (Map.Entry<KvStateServerAddress, PendingConnection> 
conn : pendingConnections.entrySet()) {
-                               if (pendingConnections.remove(conn.getKey()) != 
null) {
-                                       conn.getValue().close();
-                               }
-                       }
-
-                       if (bootstrap != null) {
-                               EventLoopGroup group = bootstrap.group();
-                               if (group != null) {
-                                       group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Closes the connection to the given server address if it exists.
-        *
-        * <p>If there is a request to the server a new connection will be 
established.
-        *
-        * @param serverAddress Target address of the connection to close
-        */
-       public void closeConnection(KvStateServerAddress serverAddress) {
-               PendingConnection pending = 
pendingConnections.get(serverAddress);
-               if (pending != null) {
-                       pending.close();
-               }
-
-               EstablishedConnection established = 
establishedConnections.remove(serverAddress);
-               if (established != null) {
-                       established.close();
-               }
-       }
-
-       /**
-        * A pending connection that is in the process of connecting.
-        */
-       private class PendingConnection implements ChannelFutureListener {
-
-               /** Lock to guard the connect call, channel hand in, etc. */
-               private final Object connectLock = new Object();
-
-               /** Address of the server we are connecting to. */
-               private final KvStateServerAddress serverAddress;
-
-               /** Queue of requests while connecting. */
-               private final ArrayDeque<PendingRequest> queuedRequests = new 
ArrayDeque<>();
-
-               /** The established connection after the connect succeeds. */
-               private EstablishedConnection established;
-
-               /** Closed flag. */
-               private boolean closed;
-
-               /** Failure cause if something goes wrong. */
-               private Throwable failureCause;
-
-               /**
-                * Creates a pending connection to the given server.
-                *
-                * @param serverAddress Address of the server to connect to.
-                */
-               private PendingConnection(KvStateServerAddress serverAddress) {
-                       this.serverAddress = serverAddress;
-               }
-
-               @Override
-               public void operationComplete(ChannelFuture future) throws 
Exception {
-                       // Callback from the Bootstrap's connect call.
-                       if (future.isSuccess()) {
-                               handInChannel(future.channel());
-                       } else {
-                               close(future.cause());
-                       }
-               }
-
-               /**
-                * Returns a future holding the serialized request result.
-                *
-                * <p>If the channel has been established, forward the call to 
the
-                * established channel, otherwise queue it for when the channel 
is
-                * handed in.
-                *
-                * @param kvStateId                 ID of the KvState instance 
to query
-                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
-                *                                  with
-                * @return Future holding the serialized result
-                */
-               public Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                       synchronized (connectLock) {
-                               if (failureCause != null) {
-                                       return Futures.failed(failureCause);
-                               } else if (closed) {
-                                       return Futures.failed(new 
ClosedChannelException());
-                               } else {
-                                       if (established != null) {
-                                               return 
established.getKvState(kvStateId, serializedKeyAndNamespace);
-                                       } else {
-                                               // Queue this and handle when 
connected
-                                               PendingRequest pending = new 
PendingRequest(kvStateId, serializedKeyAndNamespace);
-                                               queuedRequests.add(pending);
-                                               return pending.promise.future();
-                                       }
-                               }
-                       }
-               }
-
-               /**
-                * Hands in a channel after a successful connection.
-                *
-                * @param channel Channel to hand in
-                */
-               private void handInChannel(Channel channel) {
-                       synchronized (connectLock) {
-                               if (closed || failureCause != null) {
-                                       // Close the channel and we are done. 
Any queued requests
-                                       // are removed on the close/failure 
call and after that no
-                                       // new ones can be enqueued.
-                                       channel.close();
-                               } else {
-                                       established = new 
EstablishedConnection(serverAddress, channel);
-
-                                       PendingRequest pending;
-                                       while ((pending = 
queuedRequests.poll()) != null) {
-                                               Future<byte[]> resultFuture = 
established.getKvState(
-                                                               
pending.kvStateId,
-                                                               
pending.serializedKeyAndNamespace);
-
-                                               
pending.promise.completeWith(resultFuture);
-                                       }
-
-                                       // Publish the channel for the general 
public
-                                       
establishedConnections.put(serverAddress, established);
-                                       
pendingConnections.remove(serverAddress);
-
-                                       // Check shut down for possible race 
with shut down. We
-                                       // don't want any lingering connections 
after shut down,
-                                       // which can happen if we don't check 
this here.
-                                       if (shutDown.get()) {
-                                               if 
(establishedConnections.remove(serverAddress, established)) {
-                                                       established.close();
-                                               }
-                                       }
-                               }
-                       }
-               }
-
-               /**
-                * Close the connecting channel with a ClosedChannelException.
-                */
-               private void close() {
-                       close(new ClosedChannelException());
-               }
-
-               /**
-                * Close the connecting channel with an Exception (can be
-                * <code>null</code>) or forward to the established channel.
-                */
-               private void close(Throwable cause) {
-                       synchronized (connectLock) {
-                               if (!closed) {
-                                       if (failureCause == null) {
-                                               failureCause = cause;
-                                       }
-
-                                       if (established != null) {
-                                               established.close();
-                                       } else {
-                                               PendingRequest pending;
-                                               while ((pending = 
queuedRequests.poll()) != null) {
-                                                       
pending.promise.tryFailure(cause);
-                                               }
-                                       }
-
-                                       closed = true;
-                               }
-                       }
-               }
-
-               /**
-                * A pending request queued while the channel is connecting.
-                */
-               private final class PendingRequest {
-
-                       private final KvStateID kvStateId;
-                       private final byte[] serializedKeyAndNamespace;
-                       private final Promise<byte[]> promise;
-
-                       private PendingRequest(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                               this.kvStateId = kvStateId;
-                               this.serializedKeyAndNamespace = 
serializedKeyAndNamespace;
-                               this.promise = Futures.promise();
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       synchronized (connectLock) {
-                               return "PendingConnection{" +
-                                               "serverAddress=" + 
serverAddress +
-                                               ", queuedRequests=" + 
queuedRequests.size() +
-                                               ", established=" + (established 
!= null) +
-                                               ", closed=" + closed +
-                                               '}';
-                       }
-               }
-       }
-
-       /**
-        * An established connection that wraps the actual channel instance and 
is
-        * registered at the {@link KvStateClientHandler} for callbacks.
-        */
-       private class EstablishedConnection implements 
KvStateClientHandlerCallback {
-
-               /** Address of the server we are connected to. */
-               private final KvStateServerAddress serverAddress;
-
-               /** The actual TCP channel. */
-               private final Channel channel;
-
-               /** Pending requests keyed by request ID. */
-               private final ConcurrentHashMap<Long, PromiseAndTimestamp> 
pendingRequests = new ConcurrentHashMap<>();
-
-               /** Current request number used to assign unique request IDs. */
-               private final AtomicLong requestCount = new AtomicLong();
-
-               /** Reference to a failure that was reported by the channel. */
-               private final AtomicReference<Throwable> failureCause = new 
AtomicReference<>();
-
-               /**
-                * Creates an established connection with the given channel.
-                *
-                * @param serverAddress Address of the server connected to
-                * @param channel The actual TCP channel
-                */
-               EstablishedConnection(KvStateServerAddress serverAddress, 
Channel channel) {
-                       this.serverAddress = 
Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
-                       this.channel = Preconditions.checkNotNull(channel, 
"Channel");
-
-                       // Add the client handler with the callback
-                       channel.pipeline().addLast("KvStateClientHandler", new 
KvStateClientHandler(this));
-
-                       stats.reportActiveConnection();
-               }
-
-               /**
-                * Close the channel with a ClosedChannelException.
-                */
-               void close() {
-                       close(new ClosedChannelException());
-               }
-
-               /**
-                * Close the channel with a cause.
-                *
-                * @param cause The cause to close the channel with.
-                * @return Channel close future
-                */
-               private boolean close(Throwable cause) {
-                       if (failureCause.compareAndSet(null, cause)) {
-                               channel.close();
-                               stats.reportInactiveConnection();
-
-                               for (long requestId : pendingRequests.keySet()) 
{
-                                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                                       if (pending != null && 
pending.promise.tryFailure(cause)) {
-                                               stats.reportFailedRequest();
-                                       }
-                               }
-
-                               return true;
-                       }
-
-                       return false;
-               }
-
-               /**
-                * Returns a future holding the serialized request result.
-                *
-                * @param kvStateId                 ID of the KvState instance 
to query
-                * @param serializedKeyAndNamespace Serialized key and 
namespace to query KvState instance
-                *                                  with
-                * @return Future holding the serialized result
-                */
-               Future<byte[]> getKvState(KvStateID kvStateId, byte[] 
serializedKeyAndNamespace) {
-                       PromiseAndTimestamp requestPromiseTs = new 
PromiseAndTimestamp(
-                                       Futures.<byte[]>promise(),
-                                       System.nanoTime());
-
-                       try {
-                               final long requestId = 
requestCount.getAndIncrement();
-                               pendingRequests.put(requestId, 
requestPromiseTs);
-
-                               stats.reportRequest();
-
-                               ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequest(
-                                               channel.alloc(),
-                                               requestId,
-                                               kvStateId,
-                                               serializedKeyAndNamespace);
-
-                               channel.writeAndFlush(buf).addListener(new 
ChannelFutureListener() {
-                                       @Override
-                                       public void 
operationComplete(ChannelFuture future) throws Exception {
-                                               if (!future.isSuccess()) {
-                                                       // Fail promise if not 
failed to write
-                                                       PromiseAndTimestamp 
pending = pendingRequests.remove(requestId);
-                                                       if (pending != null && 
pending.promise.tryFailure(future.cause())) {
-                                                               
stats.reportFailedRequest();
-                                                       }
-                                               }
-                                       }
-                               });
-
-                               // Check failure for possible race. We don't 
want any lingering
-                               // promises after a failure, which can happen 
if we don't check
-                               // this here. Note that close is treated as a 
failure as well.
-                               Throwable failure = failureCause.get();
-                               if (failure != null) {
-                                       // Remove from pending requests to 
guard against concurrent
-                                       // removal and to make sure that we 
only count it once as failed.
-                                       PromiseAndTimestamp p = 
pendingRequests.remove(requestId);
-                                       if (p != null && 
p.promise.tryFailure(failure)) {
-                                               stats.reportFailedRequest();
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               requestPromiseTs.promise.tryFailure(t);
-                       }
-
-                       return requestPromiseTs.promise.future();
-               }
-
-               @Override
-               public void onRequestResult(long requestId, byte[] 
serializedValue) {
-                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                       if (pending != null && 
pending.promise.trySuccess(serializedValue)) {
-                               long durationMillis = (System.nanoTime() - 
pending.timestamp) / 1_000_000;
-                               stats.reportSuccessfulRequest(durationMillis);
-                       }
-               }
-
-               @Override
-               public void onRequestFailure(long requestId, Throwable cause) {
-                       PromiseAndTimestamp pending = 
pendingRequests.remove(requestId);
-                       if (pending != null && 
pending.promise.tryFailure(cause)) {
-                               stats.reportFailedRequest();
-                       }
-               }
-
-               @Override
-               public void onFailure(Throwable cause) {
-                       if (close(cause)) {
-                               // Remove from established channels, otherwise 
future
-                               // requests will be handled by this failed 
channel.
-                               establishedConnections.remove(serverAddress, 
this);
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       return "EstablishedConnection{" +
-                                       "serverAddress=" + serverAddress +
-                                       ", channel=" + channel +
-                                       ", pendingRequests=" + 
pendingRequests.size() +
-                                       ", requestCount=" + requestCount +
-                                       ", failureCause=" + failureCause +
-                                       '}';
-               }
-
-               /**
-                * Pair of promise and a timestamp.
-                */
-               private class PromiseAndTimestamp {
-
-                       private final Promise<byte[]> promise;
-                       private final long timestamp;
-
-                       public PromiseAndTimestamp(Promise<byte[]> promise, 
long timestamp) {
-                               this.promise = promise;
-                               this.timestamp = timestamp;
-                       }
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
deleted file mode 100644
index 3e6470b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.channels.ClosedChannelException;
-
-/**
- * This handler expects responses from {@link KvStateServerHandler}.
- *
- * <p>It deserializes the response and calls the registered callback, which is
- * responsible for actually handling the result (see {@link 
KvStateClient.EstablishedConnection}).
- */
-class KvStateClientHandler extends ChannelInboundHandlerAdapter {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateClientHandler.class);
-
-       private final KvStateClientHandlerCallback callback;
-
-       /**
-        * Creates a {@link KvStateClientHandler} with the callback.
-        *
-        * @param callback Callback for responses.
-        */
-       KvStateClientHandler(KvStateClientHandlerCallback callback) {
-               this.callback = callback;
-       }
-
-       @Override
-       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-               try {
-                       ByteBuf buf = (ByteBuf) msg;
-                       KvStateRequestType msgType = 
KvStateRequestSerializer.deserializeHeader(buf);
-
-                       if (msgType == KvStateRequestType.REQUEST_RESULT) {
-                               KvStateRequestResult result = 
KvStateRequestSerializer.deserializeKvStateRequestResult(buf);
-                               callback.onRequestResult(result.getRequestId(), 
result.getSerializedResult());
-                       } else if (msgType == 
KvStateRequestType.REQUEST_FAILURE) {
-                               KvStateRequestFailure failure = 
KvStateRequestSerializer.deserializeKvStateRequestFailure(buf);
-                               
callback.onRequestFailure(failure.getRequestId(), failure.getCause());
-                       } else if (msgType == 
KvStateRequestType.SERVER_FAILURE) {
-                               throw 
KvStateRequestSerializer.deserializeServerFailure(buf);
-                       } else {
-                               throw new IllegalStateException("Unexpected 
response type '" + msgType + "'");
-                       }
-               } catch (Throwable t1) {
-                       try {
-                               callback.onFailure(t1);
-                       } catch (Throwable t2) {
-                               LOG.error("Failed to notify callback about 
failure", t2);
-                       }
-               } finally {
-                       ReferenceCountUtil.release(msg);
-               }
-       }
-
-       @Override
-       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-               try {
-                       callback.onFailure(cause);
-               } catch (Throwable t) {
-                       LOG.error("Failed to notify callback about failure", t);
-               }
-       }
-
-       @Override
-       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
-               // Only the client is expected to close the channel. Otherwise 
it
-               // indicates a failure. Note that this will be invoked in both 
cases
-               // though. If the callback closed the channel, the callback 
must be
-               // ignored.
-               try {
-                       callback.onFailure(new ClosedChannelException());
-               } catch (Throwable t) {
-                       LOG.error("Failed to notify callback about failure", t);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
deleted file mode 100644
index 65ff781..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.query.netty.message.KvStateRequest;
-
-/**
- * Callback for {@link KvStateClientHandler}.
- */
-interface KvStateClientHandlerCallback {
-
-       /**
-        * Called on a successful {@link KvStateRequest}.
-        *
-        * @param requestId       ID of the request
-        * @param serializedValue Serialized value for the request
-        */
-       void onRequestResult(long requestId, byte[] serializedValue);
-
-       /**
-        * Called on a failed {@link KvStateRequest}.
-        *
-        * @param requestId ID of the request
-        * @param cause     Cause of the request failure
-        */
-       void onRequestFailure(long requestId, Throwable cause);
-
-       /**
-        * Called on any failure, which is not related to a specific request.
-        *
-        * <p>This can be for example a caught Exception in the channel pipeline
-        * or an unexpected channel close.
-        *
-        * @param cause Cause of the failure
-        */
-       void onFailure(Throwable cause);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
index 1c0d8d5..9781e23 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.query.netty;
 
+import org.apache.flink.runtime.query.KvStateServer;
+
 /**
  * Simple statistics for {@link KvStateServer} monitoring.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
deleted file mode 100644
index 7cf2148..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.message.KvStateRequest;
-import org.apache.flink.util.Preconditions;
-
-import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Netty-based server answering {@link KvStateRequest} messages.
- *
- * <p>Requests are handled by asynchronous query tasks (see {@link 
KvStateServerHandler.AsyncKvStateQueryTask})
- * that are executed by a separate query Thread pool. This pool is shared among
- * all TCP connections.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. 
Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame 
before
- * giving it to our {@link KvStateServerHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered. A
- * server-side connection close is considered a failure by the client.
- */
-public class KvStateServer {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServer.class);
-
-       /** Server config: low water mark. */
-       private static final int LOW_WATER_MARK = 8 * 1024;
-
-       /** Server config: high water mark. */
-       private static final int HIGH_WATER_MARK = 32 * 1024;
-
-       /** Netty's ServerBootstrap. */
-       private final ServerBootstrap bootstrap;
-
-       /** Query executor thread pool. */
-       private final ExecutorService queryExecutor;
-
-       /** Address of this server. */
-       private KvStateServerAddress serverAddress;
-
-       /**
-        * Creates the {@link KvStateServer}.
-        *
-        * <p>The server needs to be started via {@link #start()} in order to 
bind
-        * to the configured bind address.
-        *
-        * @param bindAddress         Address to bind to
-        * @param bindPort            Port to bind to. Pick random port if 0.
-        * @param numEventLoopThreads Number of event loop threads
-        * @param numQueryThreads     Number of query threads
-        * @param kvStateRegistry     KvStateRegistry to query for KvState 
instances
-        * @param stats               Statistics tracker
-        */
-       public KvStateServer(
-                       InetAddress bindAddress,
-                       int bindPort,
-                       int numEventLoopThreads,
-                       int numQueryThreads,
-                       KvStateRegistry kvStateRegistry,
-                       KvStateRequestStats stats) {
-
-               Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, 
"Port " + bindPort +
-                               " is out of valid port range (0-65536).");
-
-               Preconditions.checkArgument(numEventLoopThreads >= 1, 
"Non-positive number of event loop threads.");
-               Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive 
number of query threads.");
-
-               Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry");
-               Preconditions.checkNotNull(stats, "KvStateRequestStats");
-
-               NettyBufferPool bufferPool = new 
NettyBufferPool(numEventLoopThreads);
-
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateServer EventLoop 
Thread %d")
-                               .build();
-
-               NioEventLoopGroup nioGroup = new 
NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-               queryExecutor = createQueryExecutor(numQueryThreads);
-
-               // Shared between all channels
-               KvStateServerHandler serverHandler = new KvStateServerHandler(
-                               kvStateRegistry,
-                               queryExecutor,
-                               stats);
-
-               bootstrap = new ServerBootstrap()
-                               // Bind address and port
-                               .localAddress(bindAddress, bindPort)
-                               // NIO server channels
-                               .group(nioGroup)
-                               .channel(NioServerSocketChannel.class)
-                               // Server channel Options
-                               .option(ChannelOption.ALLOCATOR, bufferPool)
-                               // Child channel options
-                               .childOption(ChannelOption.ALLOCATOR, 
bufferPool)
-                               
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK)
-                               
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK)
-                               // See initializer for pipeline details
-                               .childHandler(new 
KvStateServerChannelInitializer(serverHandler));
-       }
-
-       /**
-        * Starts the server by binding to the configured bind address 
(blocking).
-        *
-        * @throws InterruptedException If interrupted during the bind operation
-        */
-       public void start() throws InterruptedException {
-               Channel channel = bootstrap.bind().sync().channel();
-
-               InetSocketAddress localAddress = (InetSocketAddress) 
channel.localAddress();
-               serverAddress = new 
KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
-       }
-
-       /**
-        * Returns the address of this server.
-        *
-        * @return Server address
-        * @throws IllegalStateException If server has not been started yet
-        */
-       public KvStateServerAddress getAddress() {
-               if (serverAddress == null) {
-                       throw new IllegalStateException("KvStateServer not 
started yet.");
-               }
-
-               return serverAddress;
-       }
-
-       /**
-        * Shuts down the server and all related thread pools.
-        */
-       public void shutDown() {
-               if (bootstrap != null) {
-                       EventLoopGroup group = bootstrap.group();
-                       if (group != null) {
-                               group.shutdownGracefully(0, 10, 
TimeUnit.SECONDS);
-                       }
-               }
-
-               if (queryExecutor != null) {
-                       queryExecutor.shutdown();
-               }
-
-               serverAddress = null;
-       }
-
-       /**
-        * Creates a thread pool for the query execution.
-        *
-        * @param numQueryThreads Number of query threads.
-        * @return Thread pool for query execution
-        */
-       private static ExecutorService createQueryExecutor(int numQueryThreads) 
{
-               ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                               .setDaemon(true)
-                               .setNameFormat("Flink KvStateServer Query 
Thread %d")
-                               .build();
-
-               return Executors.newFixedThreadPool(numQueryThreads, 
threadFactory);
-       }
-
-       /**
-        * Channel pipeline initializer.
-        *
-        * <p>The request handler is shared, whereas the other handlers are 
created
-        * per channel.
-        */
-       private static final class KvStateServerChannelInitializer extends 
ChannelInitializer<SocketChannel> {
-
-               /** The shared request handler. */
-               private final KvStateServerHandler sharedRequestHandler;
-
-               /**
-                * Creates the channel pipeline initializer with the shared 
request handler.
-                *
-                * @param sharedRequestHandler Shared request handler.
-                */
-               public KvStateServerChannelInitializer(KvStateServerHandler 
sharedRequestHandler) {
-                       this.sharedRequestHandler = 
Preconditions.checkNotNull(sharedRequestHandler, "Request handler");
-               }
-
-               @Override
-               protected void initChannel(SocketChannel ch) throws Exception {
-                       ch.pipeline()
-                                       .addLast(new ChunkedWriteHandler())
-                                       .addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-                                       .addLast(sharedRequestHandler);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
deleted file mode 100644
index 1af55dc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.netty.message.KvStateRequest;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
-import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * This handler dispatches asynchronous tasks, which query {@link 
InternalKvState}
- * instances and write the result to the channel.
- *
- * <p>The network threads receive the message, deserialize it and dispatch the
- * query task. The actual query is handled in a separate thread as it might
- * otherwise block the network threads (file I/O etc.).
- */
[email protected]
-class KvStateServerHandler extends ChannelInboundHandlerAdapter {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerHandler.class);
-
-       /** KvState registry holding references to the KvState instances. */
-       private final KvStateRegistry registry;
-
-       /** Thread pool for query execution. */
-       private final ExecutorService queryExecutor;
-
-       /** Exposed server statistics. */
-       private final KvStateRequestStats stats;
-
-       /**
-        * Create the handler.
-        *
-        * @param kvStateRegistry Registry to query.
-        * @param queryExecutor   Thread pool for query execution.
-        * @param stats           Exposed server statistics.
-        */
-       KvStateServerHandler(
-                       KvStateRegistry kvStateRegistry,
-                       ExecutorService queryExecutor,
-                       KvStateRequestStats stats) {
-
-               this.registry = Objects.requireNonNull(kvStateRegistry, 
"KvStateRegistry");
-               this.queryExecutor = Objects.requireNonNull(queryExecutor, 
"Query thread pool");
-               this.stats = Objects.requireNonNull(stats, 
"KvStateRequestStats");
-       }
-
-       @Override
-       public void channelActive(ChannelHandlerContext ctx) throws Exception {
-               stats.reportActiveConnection();
-       }
-
-       @Override
-       public void channelInactive(ChannelHandlerContext ctx) throws Exception 
{
-               stats.reportInactiveConnection();
-       }
-
-       @Override
-       public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-               KvStateRequest request = null;
-
-               try {
-                       ByteBuf buf = (ByteBuf) msg;
-                       KvStateRequestType msgType = 
KvStateRequestSerializer.deserializeHeader(buf);
-
-                       if (msgType == KvStateRequestType.REQUEST) {
-                               // 
------------------------------------------------------------
-                               // Request
-                               // 
------------------------------------------------------------
-                               request = 
KvStateRequestSerializer.deserializeKvStateRequest(buf);
-
-                               stats.reportRequest();
-
-                               InternalKvState<?> kvState = 
registry.getKvState(request.getKvStateId());
-
-                               if (kvState != null) {
-                                       // Execute actual query async, because 
it is possibly
-                                       // blocking (e.g. file I/O).
-                                       //
-                                       // A submission failure is not treated 
as fatal.
-                                       queryExecutor.submit(new 
AsyncKvStateQueryTask(ctx, request, kvState, stats));
-                               } else {
-                                       ByteBuf unknown = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       new 
UnknownKvStateID(request.getKvStateId()));
-
-                                       ctx.writeAndFlush(unknown);
-
-                                       stats.reportFailedRequest();
-                               }
-                       } else {
-                               // 
------------------------------------------------------------
-                               // Unexpected
-                               // 
------------------------------------------------------------
-                               ByteBuf failure = 
KvStateRequestSerializer.serializeServerFailure(
-                                               ctx.alloc(),
-                                               new 
IllegalArgumentException("Unexpected message type " + msgType
-                                                               + ". 
KvStateServerHandler expects "
-                                                               + 
KvStateRequestType.REQUEST + " messages."));
-
-                               ctx.writeAndFlush(failure);
-                       }
-               } catch (Throwable t) {
-                       String stringifiedCause = 
ExceptionUtils.stringifyException(t);
-
-                       ByteBuf err;
-                       if (request != null) {
-                               String errMsg = "Failed to handle incoming 
request with ID " +
-                                               request.getRequestId() + ". 
Caused by: " + stringifiedCause;
-                               err = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                                               ctx.alloc(),
-                                               request.getRequestId(),
-                                               new RuntimeException(errMsg));
-
-                               stats.reportFailedRequest();
-                       } else {
-                               String errMsg = "Failed to handle incoming 
message. Caused by: " + stringifiedCause;
-                               err = 
KvStateRequestSerializer.serializeServerFailure(
-                                               ctx.alloc(),
-                                               new RuntimeException(errMsg));
-                       }
-
-                       ctx.writeAndFlush(err);
-               } finally {
-                       // IMPORTANT: We have to always recycle the incoming 
buffer.
-                       // Otherwise we will leak memory out of Netty's buffer 
pool.
-                       //
-                       // If any operation ever holds on to the buffer, it is 
the
-                       // responsibility of that operation to retain the 
buffer and
-                       // release it later.
-                       ReferenceCountUtil.release(msg);
-               }
-       }
-
-       @Override
-       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-               String stringifiedCause = 
ExceptionUtils.stringifyException(cause);
-               String msg = "Exception in server pipeline. Caused by: " + 
stringifiedCause;
-
-               ByteBuf err = KvStateRequestSerializer.serializeServerFailure(
-                               ctx.alloc(),
-                               new RuntimeException(msg));
-
-               ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE);
-       }
-
-       /**
-        * Task to execute the actual query against the {@link InternalKvState} 
instance.
-        */
-       private static class AsyncKvStateQueryTask implements Runnable {
-
-               private final ChannelHandlerContext ctx;
-
-               private final KvStateRequest request;
-
-               private final InternalKvState<?> kvState;
-
-               private final KvStateRequestStats stats;
-
-               private final long creationNanos;
-
-               public AsyncKvStateQueryTask(
-                               ChannelHandlerContext ctx,
-                               KvStateRequest request,
-                               InternalKvState<?> kvState,
-                               KvStateRequestStats stats) {
-
-                       this.ctx = Objects.requireNonNull(ctx, "Channel handler 
context");
-                       this.request = Objects.requireNonNull(request, "State 
query");
-                       this.kvState = Objects.requireNonNull(kvState, 
"KvState");
-                       this.stats = Objects.requireNonNull(stats, "State query 
stats");
-                       this.creationNanos = System.nanoTime();
-               }
-
-               @Override
-               public void run() {
-                       boolean success = false;
-
-                       try {
-                               if (!ctx.channel().isActive()) {
-                                       return;
-                               }
-
-                               // Query the KvState instance
-                               byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
-                               byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
-
-                               if (serializedResult != null) {
-                                       // We found some data, success!
-                                       ByteBuf buf = 
KvStateRequestSerializer.serializeKvStateRequestResult(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       serializedResult);
-
-                                       int highWatermark = 
ctx.channel().config().getWriteBufferHighWaterMark();
-
-                                       ChannelFuture write;
-                                       if (buf.readableBytes() <= 
highWatermark) {
-                                               write = ctx.writeAndFlush(buf);
-                                       } else {
-                                               write = ctx.writeAndFlush(new 
ChunkedByteBuf(buf, highWatermark));
-                                       }
-
-                                       write.addListener(new 
QueryResultWriteListener());
-
-                                       success = true;
-                               } else {
-                                       // No data for the key/namespace. This 
is considered to be
-                                       // a failure.
-                                       ByteBuf unknownKey = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(),
-                                                       request.getRequestId(),
-                                                       new 
UnknownKeyOrNamespace());
-
-                                       ctx.writeAndFlush(unknownKey);
-                               }
-                       } catch (Throwable t) {
-                               try {
-                                       String stringifiedCause = 
ExceptionUtils.stringifyException(t);
-                                       String errMsg = "Failed to query state 
backend for query " +
-                                                       request.getRequestId() 
+ ". Caused by: " + stringifiedCause;
-
-                                       ByteBuf err = 
KvStateRequestSerializer.serializeKvStateRequestFailure(
-                                                       ctx.alloc(), 
request.getRequestId(), new RuntimeException(errMsg));
-
-                                       ctx.writeAndFlush(err);
-                               } catch (IOException e) {
-                                       LOG.error("Failed to respond with the 
error after failed to query state backend", e);
-                               }
-                       } finally {
-                               if (!success) {
-                                       stats.reportFailedRequest();
-                               }
-                       }
-               }
-
-               @Override
-               public String toString() {
-                       return "AsyncKvStateQueryTask{" +
-                                       ", request=" + request +
-                                       ", creationNanos=" + creationNanos +
-                                       '}';
-               }
-
-               /**
-                * Callback after query result has been written.
-                *
-                * <p>Gathers stats and logs errors.
-                */
-               private class QueryResultWriteListener implements 
ChannelFutureListener {
-
-                       @Override
-                       public void operationComplete(ChannelFuture future) 
throws Exception {
-                               long durationNanos = System.nanoTime() - 
creationNanos;
-                               long durationMillis = 
TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS);
-
-                               if (future.isSuccess()) {
-                                       
stats.reportSuccessfulRequest(durationMillis);
-                               } else {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Query " + request + 
" failed after " + durationMillis + " ms", future.cause());
-                                       }
-
-                                       stats.reportFailedRequest();
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
deleted file mode 100644
index 4e5a1de..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or 
namespace.
- */
-public class UnknownKeyOrNamespace extends IllegalStateException {
-
-       private static final long serialVersionUID = 1L;
-
-       UnknownKeyOrNamespace() {
-               super("KvState does not hold any state for key/namespace.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
deleted file mode 100644
index cc60035..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.query.netty;
-
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-public class UnknownKvStateID extends IllegalStateException {
-
-       private static final long serialVersionUID = 1L;
-
-       public UnknownKvStateID(KvStateID kvStateId) {
-               super("No KvState registered with ID " + 
Preconditions.checkNotNull(kvStateId, "KvStateID") +
-                               " at TaskManager.");
-       }
-}

Reply via email to