http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java deleted file mode 100644 index 4ba6929..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ /dev/null @@ -1,587 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.base.ListSerializer; -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.configuration.QueryableStateOptions; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.concurrent.Executors; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.FixedDelayLookupRetryStrategyFactory; -import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; -import org.apache.flink.runtime.query.netty.KvStateClient; -import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; -import org.apache.flink.runtime.query.netty.UnknownKvStateID; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; -import org.apache.flink.util.Preconditions; - -import akka.actor.ActorSystem; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.Recover; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.ConnectException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; - -/** - * Client for queryable state. - * - * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}. - * The state instance created from this descriptor will be published for queries - * when it's created on the TaskManagers and the location will be reported to - * the JobManager. - * - * <p>The client resolves the location of the requested KvState via the - * JobManager. Resolved locations are cached. When the server address of the - * requested KvState instance is determined, the client sends out a request to - * the server. - */ -public class QueryableStateClient { - - private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class); - - /** - * {@link KvStateLocation} lookup to resolve the address of KvState instances. - */ - private final KvStateLocationLookupService lookupService; - - /** - * Network client for queries against {@link KvStateServer} instances. - */ - private final KvStateClient kvStateClient; - - /** - * Execution context. - */ - private final ExecutionContext executionContext; - - /** - * Cache for {@link KvStateLocation} instances keyed by job and name. - */ - private final ConcurrentMap<Tuple2<JobID, String>, Future<KvStateLocation>> lookupCache = - new ConcurrentHashMap<>(); - - /** This is != null, if we started the actor system. */ - private final ActorSystem actorSystem; - - private ExecutionConfig executionConfig; - - /** - * Creates a client from the given configuration. - * - * <p>This will create multiple Thread pools: one for the started actor - * system and another for the network client. - * - * @param config Configuration to use. - * @throws Exception Failures are forwarded - */ - public QueryableStateClient(Configuration config) throws Exception { - this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); - } - - /** - * Creates a client from the given configuration. - * - * <p>This will create multiple Thread pools: one for the started actor - * system and another for the network client. - * - * @param config Configuration to use. - * @param highAvailabilityServices Service factory for high availability services - * @throws Exception Failures are forwarded - * - * @deprecated This constructor is deprecated and stays only for backwards compatibility. Use the - * {@link #QueryableStateClient(Configuration)} instead. - */ - @Deprecated - public QueryableStateClient( - Configuration config, - HighAvailabilityServices highAvailabilityServices) throws Exception { - Preconditions.checkNotNull(config, "Configuration"); - - // Create a leader retrieval service - LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); - - // Get the ask timeout - String askTimeoutString = config.getString(AkkaOptions.ASK_TIMEOUT); - - Duration timeout = FiniteDuration.apply(askTimeoutString); - if (!timeout.isFinite()) { - throw new IllegalConfigurationException(AkkaOptions.ASK_TIMEOUT.key() - + " is not a finite timeout ('" + askTimeoutString + "')"); - } - - FiniteDuration askTimeout = (FiniteDuration) timeout; - - int lookupRetries = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRIES); - int lookupRetryDelayMillis = config.getInteger(QueryableStateOptions.CLIENT_LOOKUP_RETRY_DELAY); - - // Retries if no JobManager is around - LookupRetryStrategyFactory retryStrategy = new FixedDelayLookupRetryStrategyFactory( - lookupRetries, - FiniteDuration.apply(lookupRetryDelayMillis, "ms")); - - // Create the actor system - @SuppressWarnings("unchecked") - Option<Tuple2<String, Object>> remoting = new Some(new Tuple2<>("", 0)); - this.actorSystem = AkkaUtils.createActorSystem(config, remoting); - - AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( - leaderRetrievalService, - actorSystem, - askTimeout, - retryStrategy); - - int numEventLoopThreads = config.getInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS); - - if (numEventLoopThreads == 0) { - numEventLoopThreads = Runtime.getRuntime().availableProcessors(); - } - - // Create the network client - KvStateClient networkClient = new KvStateClient( - numEventLoopThreads, - new DisabledKvStateRequestStats()); - - this.lookupService = lookupService; - this.kvStateClient = networkClient; - this.executionContext = actorSystem.dispatcher(); - this.executionConfig = new ExecutionConfig(); - - this.lookupService.start(); - } - - /** Gets the {@link ExecutionConfig}. */ - public ExecutionConfig getExecutionConfig() { - return executionConfig; - } - - /** Sets the {@link ExecutionConfig}. */ - public void setExecutionConfig(ExecutionConfig config) { - this.executionConfig = config; - } - - /** - * Creates a client. - * - * @param lookupService Location lookup service - * @param kvStateClient Network client for queries - * @param executionContext Execution context for futures - */ - public QueryableStateClient( - KvStateLocationLookupService lookupService, - KvStateClient kvStateClient, - ExecutionContext executionContext) { - - this.lookupService = Preconditions.checkNotNull(lookupService, "KvStateLocationLookupService"); - this.kvStateClient = Preconditions.checkNotNull(kvStateClient, "KvStateClient"); - this.executionContext = Preconditions.checkNotNull(executionContext, "ExecutionContext"); - this.actorSystem = null; - - this.lookupService.start(); - } - - /** - * Returns the execution context of this client. - * - * @return The execution context used by the client. - */ - public ExecutionContext getExecutionContext() { - return executionContext; - } - - /** - * Shuts down the client and all components. - */ - public void shutDown() { - try { - lookupService.shutDown(); - } catch (Throwable t) { - LOG.error("Failed to shut down KvStateLookupService", t); - } - - try { - kvStateClient.shutDown(); - } catch (Throwable t) { - LOG.error("Failed to shut down KvStateClient", t); - } - - if (actorSystem != null) { - try { - actorSystem.shutdown(); - } catch (Throwable t) { - LOG.error("Failed to shut down ActorSystem", t); - } - } - } - - /** - * Returns a future holding the serialized request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state - * belongs to - * @param queryableStateName Name under which the state is queryable - * @param keyHashCode Integer hash code of the key (result of - * a call to {@link Object#hashCode()} - * @param serializedKeyAndNamespace Serialized key and namespace to query - * KvState instance with - * @return Future holding the serialized result - */ - @SuppressWarnings("unchecked") - public Future<byte[]> getKvState( - final JobID jobId, - final String queryableStateName, - final int keyHashCode, - final byte[] serializedKeyAndNamespace) { - - return getKvState(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace, false) - .recoverWith(new Recover<Future<byte[]>>() { - @Override - public Future<byte[]> recover(Throwable failure) throws Throwable { - if (failure instanceof UnknownKvStateID || - failure instanceof UnknownKvStateKeyGroupLocation || - failure instanceof UnknownKvStateLocation || - failure instanceof ConnectException) { - // These failures are likely to be caused by out-of-sync - // KvStateLocation. Therefore we retry this query and - // force look up the location. - return getKvState( - jobId, - queryableStateName, - keyHashCode, - serializedKeyAndNamespace, - true); - } else { - return Futures.failed(failure); - } - } - }, executionContext); - } - - /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key we are interested in. - * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public <K, V> Future<V> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final TypeHint<K> keyTypeHint, - final StateDescriptor<?, V> stateDescriptor) { - - Preconditions.checkNotNull(keyTypeHint); - - TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo(); - return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); - } - - /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key we are interested in. - * @param keyTypeInfo The {@link TypeInformation} of the key. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public <K, V> Future<V> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final TypeInformation<K> keyTypeInfo, - final StateDescriptor<?, V> stateDescriptor) { - - Preconditions.checkNotNull(keyTypeInfo); - - return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, - keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); - } - - /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key that the state we request is associated with. - * @param namespace The namespace of the state. - * @param keyTypeInfo The {@link TypeInformation} of the keys. - * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. - * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public <K, V, N> Future<V> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final N namespace, - final TypeInformation<K> keyTypeInfo, - final TypeInformation<N> namespaceTypeInfo, - final StateDescriptor<?, V> stateDescriptor) { - - Preconditions.checkNotNull(stateDescriptor); - - // initialize the value serializer based on the execution config. - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - TypeSerializer<V> stateSerializer = stateDescriptor.getSerializer(); - - return getKvState(jobId, queryableStateName, key, - namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer); - } - - /** - * Returns a future holding the request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param jobId JobID of the job the queryable state belongs to. - * @param queryableStateName Name under which the state is queryable. - * @param key The key that the state we request is associated with. - * @param namespace The namespace of the state. - * @param keyTypeInfo The {@link TypeInformation} of the keys. - * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. - * @param stateSerializer The {@link TypeSerializer} of the state we want to query. - * @return Future holding the result. - */ - @PublicEvolving - public <K, V, N> Future<V> getKvState( - final JobID jobId, - final String queryableStateName, - final K key, - final N namespace, - final TypeInformation<K> keyTypeInfo, - final TypeInformation<N> namespaceTypeInfo, - final TypeSerializer<V> stateSerializer) { - - Preconditions.checkNotNull(queryableStateName); - - Preconditions.checkNotNull(key); - Preconditions.checkNotNull(namespace); - - Preconditions.checkNotNull(keyTypeInfo); - Preconditions.checkNotNull(namespaceTypeInfo); - Preconditions.checkNotNull(stateSerializer); - - if (stateSerializer instanceof ListSerializer) { - throw new IllegalArgumentException("ListState is not supported out-of-the-box yet."); - } - - TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig); - TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); - - final byte[] serializedKeyAndNamespace; - try { - serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - keySerializer, - namespace, - namespaceSerializer); - } catch (IOException e) { - return Futures.failed(e); - } - - return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace) - .flatMap(new Mapper<byte[], Future<V>>() { - @Override - public Future<V> apply(byte[] parameter) { - try { - return Futures.successful( - KvStateRequestSerializer.deserializeValue(parameter, stateSerializer)); - } catch (IOException e) { - return Futures.failed(e); - } - } - }, executionContext); - } - - /** - * Returns a future holding the serialized request result. - * - * @param jobId JobID of the job the queryable state - * belongs to - * @param queryableStateName Name under which the state is queryable - * @param keyHashCode Integer hash code of the key (result of - * a call to {@link Object#hashCode()} - * @param serializedKeyAndNamespace Serialized key and namespace to query - * KvState instance with - * @param forceLookup Flag to force lookup of the {@link KvStateLocation} - * @return Future holding the serialized result - */ - private Future<byte[]> getKvState( - final JobID jobId, - final String queryableStateName, - final int keyHashCode, - final byte[] serializedKeyAndNamespace, - boolean forceLookup) { - - return getKvStateLookupInfo(jobId, queryableStateName, forceLookup) - .flatMap(new Mapper<KvStateLocation, Future<byte[]>>() { - @Override - public Future<byte[]> apply(KvStateLocation lookup) { - int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups()); - - KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex); - if (serverAddress == null) { - return Futures.failed(new UnknownKvStateKeyGroupLocation()); - } else { - // Query server - KvStateID kvStateId = lookup.getKvStateID(keyGroupIndex); - return kvStateClient.getKvState(serverAddress, kvStateId, serializedKeyAndNamespace); - } - } - }, executionContext); - } - - /** - * Lookup the {@link KvStateLocation} for the given job and queryable state - * name. - * - * <p>The job manager will be queried for the location only if forced or no - * cached location can be found. There are no guarantees about - * - * @param jobId JobID the state instance belongs to. - * @param queryableStateName Name under which the state instance has been published. - * @param forceUpdate Flag to indicate whether to force a update via the lookup service. - * @return Future holding the KvStateLocation - */ - private Future<KvStateLocation> getKvStateLookupInfo( - JobID jobId, - final String queryableStateName, - boolean forceUpdate) { - - if (forceUpdate) { - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - lookupCache.put(new Tuple2<>(jobId, queryableStateName), lookupFuture); - return lookupFuture; - } else { - Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName); - final Future<KvStateLocation> cachedFuture = lookupCache.get(cacheKey); - - if (cachedFuture == null) { - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - - Future<KvStateLocation> previous = lookupCache.putIfAbsent(cacheKey, lookupFuture); - if (previous == null) { - return lookupFuture; - } else { - return previous; - } - } else { - // do not retain futures which failed as they will remain in - // the cache even if the error cause is not present any more - // and a new lookup may succeed - if (cachedFuture.isCompleted() && - cachedFuture.value().get().isFailure()) { - // issue a new lookup - Future<KvStateLocation> lookupFuture = lookupService - .getKvStateLookupInfo(jobId, queryableStateName); - - // replace the existing one if it has not been replaced yet - // otherwise return the one in the cache - if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { - return lookupFuture; - } else { - return lookupCache.get(cacheKey); - } - } else { - return cachedFuture; - } - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java new file mode 100644 index 0000000..852d394 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.query; + +import org.apache.flink.runtime.query.netty.KvStateRequestStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.InetAddress; + +/** + * Utility class to initialize entities used in queryable state. + */ +public final class QueryableStateUtils { + + private static final Logger LOG = LoggerFactory.getLogger(QueryableStateUtils.class); + + /** + * Initializes the {@link KvStateServer server} responsible for sending the + * requested internal state to the Queryable State Client. + * + * @param address the address to bind to. + * @param port the port to listen to. + * @param eventLoopThreads the number of threads to be used to process incoming requests. + * @param queryThreads the number of threads to be used to send the actual state. + * @param kvStateRegistry the registry with the queryable state. + * @param stats statistics to be gathered about the incoming requests. + * @return the {@link KvStateServer state server}. + */ + public static KvStateServer createKvStateServer( + final InetAddress address, + final int port, + final int eventLoopThreads, + final int queryThreads, + final KvStateRegistry kvStateRegistry, + final KvStateRequestStats stats) { + + Preconditions.checkNotNull(address, "address"); + Preconditions.checkNotNull(kvStateRegistry, "registry"); + Preconditions.checkNotNull(stats, "stats"); + + Preconditions.checkArgument(eventLoopThreads >= 1); + Preconditions.checkArgument(queryThreads >= 1); + + try { + String classname = "org.apache.flink.queryablestate.server.KvStateServerImpl"; + Class<? extends KvStateServer> clazz = Class.forName(classname).asSubclass(KvStateServer.class); + Constructor<? extends KvStateServer> constructor = clazz.getConstructor( + InetAddress.class, + Integer.class, + Integer.class, + Integer.class, + KvStateRegistry.class, + KvStateRequestStats.class); + return constructor.newInstance(address, port, eventLoopThreads, queryThreads, kvStateRegistry, stats); + } catch (ClassNotFoundException e) { + LOG.info("Could not load Queryable State Server. " + + "Probable reason: flink-queryable-state is not in the classpath"); + LOG.debug("Caught exception", e); + return null; + } catch (InvocationTargetException e) { + LOG.error("Queryable State Server could not be created", e.getTargetException()); + return null; + } catch (Throwable t) { + LOG.error("Failed to instantiate the Queryable State Server.", t); + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java deleted file mode 100644 index 3549ed6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownJobManager.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -/** - * Exception to fail Future with if no JobManager is currently registered at - * the {@link KvStateLocationLookupService}. - */ -class UnknownJobManager extends Exception { - - private static final long serialVersionUID = 1L; - - public UnknownJobManager() { - super("Unknown JobManager. Either the JobManager has not registered yet " + - "or has lost leadership."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java deleted file mode 100644 index 8f62be5..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/UnknownKvStateKeyGroupLocation.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query; - -/** - * Exception thrown if there is no location information available for the given - * key group in a {@link KvStateLocation} instance. - */ -class UnknownKvStateKeyGroupLocation extends Exception { - - private static final long serialVersionUID = 1L; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java deleted file mode 100644 index e4fa809..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/ChunkedByteBuf.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedInput; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -/** - * A {@link ByteBuf} instance to be consumed in chunks by {@link ChunkedWriteHandler}, - * respecting the high and low watermarks. - * - * @see <a href="http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#10.0">Low/High Watermarks</a> - */ -class ChunkedByteBuf implements ChunkedInput<ByteBuf> { - - /** The buffer to chunk. */ - private final ByteBuf buf; - - /** Size of chunks. */ - private final int chunkSize; - - /** Closed flag. */ - private boolean isClosed; - - /** End of input flag. */ - private boolean isEndOfInput; - - public ChunkedByteBuf(ByteBuf buf, int chunkSize) { - this.buf = Preconditions.checkNotNull(buf, "Buffer"); - Preconditions.checkArgument(chunkSize > 0, "Non-positive chunk size"); - this.chunkSize = chunkSize; - } - - @Override - public boolean isEndOfInput() throws Exception { - return isClosed || isEndOfInput; - } - - @Override - public void close() throws Exception { - if (!isClosed) { - // If we did not consume the whole buffer yet, we have to release - // it here. Otherwise, it's the responsibility of the consumer. - if (!isEndOfInput) { - buf.release(); - } - - isClosed = true; - } - } - - @Override - public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { - if (isClosed) { - return null; - } else if (buf.readableBytes() <= chunkSize) { - isEndOfInput = true; - - // Don't retain as the consumer is responsible to release it - return buf.slice(); - } else { - // Return a chunk sized slice of the buffer. The ref count is - // shared with the original buffer. That's why we need to retain - // a reference here. - return buf.readSlice(chunkSize).retain(); - } - } - - @Override - public String toString() { - return "ChunkedByteBuf{" + - "buf=" + buf + - ", chunkSize=" + chunkSize + - ", isClosed=" + isClosed + - ", isEndOfInput=" + isEndOfInput + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java deleted file mode 100644 index 1a84e83..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClient.java +++ /dev/null @@ -1,579 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.io.network.netty.NettyBufferPool; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap; -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -import akka.dispatch.Futures; - -import java.nio.channels.ClosedChannelException; -import java.util.ArrayDeque; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import scala.concurrent.Future; -import scala.concurrent.Promise; - -/** - * Netty-based client querying {@link KvStateServer} instances. - * - * <p>This client can be used by multiple threads concurrently. Operations are - * executed asynchronously and return Futures to their result. - * - * <p>The incoming pipeline looks as follows: - * <pre> - * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler - * </pre> - * - * <p>Received binary messages are expected to contain a frame length field. Netty's - * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before - * giving it to our {@link KvStateClientHandler}. - * - * <p>Connections are established and closed by the client. The server only - * closes the connection on a fatal failure that cannot be recovered. - */ -public class KvStateClient { - - /** Netty's Bootstrap. */ - private final Bootstrap bootstrap; - - /** Statistics tracker. */ - private final KvStateRequestStats stats; - - /** Established connections. */ - private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections = - new ConcurrentHashMap<>(); - - /** Pending connections. */ - private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections = - new ConcurrentHashMap<>(); - - /** Atomic shut down flag. */ - private final AtomicBoolean shutDown = new AtomicBoolean(); - - /** - * Creates a client with the specified number of event loop threads. - * - * @param numEventLoopThreads Number of event loop threads (minimum 1). - */ - public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) { - Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); - NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink KvStateClient Event Loop Thread %d") - .build(); - - NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - this.bootstrap = new Bootstrap() - .group(nioGroup) - .channel(NioSocketChannel.class) - .option(ChannelOption.ALLOCATOR, bufferPool) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - // ChunkedWriteHandler respects Channel writability - .addLast(new ChunkedWriteHandler()); - } - }); - - this.stats = Preconditions.checkNotNull(stats, "Statistics tracker"); - } - - /** - * Returns a future holding the serialized request result. - * - * <p>If the server does not serve a KvState instance with the given ID, - * the Future will be failed with a {@link UnknownKvStateID}. - * - * <p>If the KvState instance does not hold any data for the given key - * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. - * - * <p>All other failures are forwarded to the Future. - * - * @param serverAddress Address of the server to query - * @param kvStateId ID of the KvState instance to query - * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance with - * @return Future holding the serialized result - */ - public Future<byte[]> getKvState( - KvStateServerAddress serverAddress, - KvStateID kvStateId, - byte[] serializedKeyAndNamespace) { - - if (shutDown.get()) { - return Futures.failed(new IllegalStateException("Shut down")); - } - - EstablishedConnection connection = establishedConnections.get(serverAddress); - - if (connection != null) { - return connection.getKvState(kvStateId, serializedKeyAndNamespace); - } else { - PendingConnection pendingConnection = pendingConnections.get(serverAddress); - if (pendingConnection != null) { - // There was a race, use the existing pending connection. - return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace); - } else { - // We try to connect to the server. - PendingConnection pending = new PendingConnection(serverAddress); - PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending); - - if (previous == null) { - // OK, we are responsible to connect. - bootstrap.connect(serverAddress.getHost(), serverAddress.getPort()) - .addListener(pending); - - return pending.getKvState(kvStateId, serializedKeyAndNamespace); - } else { - // There was a race, use the existing pending connection. - return previous.getKvState(kvStateId, serializedKeyAndNamespace); - } - } - } - } - - /** - * Shuts down the client and closes all connections. - * - * <p>After a call to this method, all returned futures will be failed. - */ - public void shutDown() { - if (shutDown.compareAndSet(false, true)) { - for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) { - if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); - } - } - - for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) { - if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); - } - } - - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0, 10, TimeUnit.SECONDS); - } - } - } - } - - /** - * Closes the connection to the given server address if it exists. - * - * <p>If there is a request to the server a new connection will be established. - * - * @param serverAddress Target address of the connection to close - */ - public void closeConnection(KvStateServerAddress serverAddress) { - PendingConnection pending = pendingConnections.get(serverAddress); - if (pending != null) { - pending.close(); - } - - EstablishedConnection established = establishedConnections.remove(serverAddress); - if (established != null) { - established.close(); - } - } - - /** - * A pending connection that is in the process of connecting. - */ - private class PendingConnection implements ChannelFutureListener { - - /** Lock to guard the connect call, channel hand in, etc. */ - private final Object connectLock = new Object(); - - /** Address of the server we are connecting to. */ - private final KvStateServerAddress serverAddress; - - /** Queue of requests while connecting. */ - private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>(); - - /** The established connection after the connect succeeds. */ - private EstablishedConnection established; - - /** Closed flag. */ - private boolean closed; - - /** Failure cause if something goes wrong. */ - private Throwable failureCause; - - /** - * Creates a pending connection to the given server. - * - * @param serverAddress Address of the server to connect to. - */ - private PendingConnection(KvStateServerAddress serverAddress) { - this.serverAddress = serverAddress; - } - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - // Callback from the Bootstrap's connect call. - if (future.isSuccess()) { - handInChannel(future.channel()); - } else { - close(future.cause()); - } - } - - /** - * Returns a future holding the serialized request result. - * - * <p>If the channel has been established, forward the call to the - * established channel, otherwise queue it for when the channel is - * handed in. - * - * @param kvStateId ID of the KvState instance to query - * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance - * with - * @return Future holding the serialized result - */ - public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { - synchronized (connectLock) { - if (failureCause != null) { - return Futures.failed(failureCause); - } else if (closed) { - return Futures.failed(new ClosedChannelException()); - } else { - if (established != null) { - return established.getKvState(kvStateId, serializedKeyAndNamespace); - } else { - // Queue this and handle when connected - PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace); - queuedRequests.add(pending); - return pending.promise.future(); - } - } - } - } - - /** - * Hands in a channel after a successful connection. - * - * @param channel Channel to hand in - */ - private void handInChannel(Channel channel) { - synchronized (connectLock) { - if (closed || failureCause != null) { - // Close the channel and we are done. Any queued requests - // are removed on the close/failure call and after that no - // new ones can be enqueued. - channel.close(); - } else { - established = new EstablishedConnection(serverAddress, channel); - - PendingRequest pending; - while ((pending = queuedRequests.poll()) != null) { - Future<byte[]> resultFuture = established.getKvState( - pending.kvStateId, - pending.serializedKeyAndNamespace); - - pending.promise.completeWith(resultFuture); - } - - // Publish the channel for the general public - establishedConnections.put(serverAddress, established); - pendingConnections.remove(serverAddress); - - // Check shut down for possible race with shut down. We - // don't want any lingering connections after shut down, - // which can happen if we don't check this here. - if (shutDown.get()) { - if (establishedConnections.remove(serverAddress, established)) { - established.close(); - } - } - } - } - } - - /** - * Close the connecting channel with a ClosedChannelException. - */ - private void close() { - close(new ClosedChannelException()); - } - - /** - * Close the connecting channel with an Exception (can be - * <code>null</code>) or forward to the established channel. - */ - private void close(Throwable cause) { - synchronized (connectLock) { - if (!closed) { - if (failureCause == null) { - failureCause = cause; - } - - if (established != null) { - established.close(); - } else { - PendingRequest pending; - while ((pending = queuedRequests.poll()) != null) { - pending.promise.tryFailure(cause); - } - } - - closed = true; - } - } - } - - /** - * A pending request queued while the channel is connecting. - */ - private final class PendingRequest { - - private final KvStateID kvStateId; - private final byte[] serializedKeyAndNamespace; - private final Promise<byte[]> promise; - - private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { - this.kvStateId = kvStateId; - this.serializedKeyAndNamespace = serializedKeyAndNamespace; - this.promise = Futures.promise(); - } - } - - @Override - public String toString() { - synchronized (connectLock) { - return "PendingConnection{" + - "serverAddress=" + serverAddress + - ", queuedRequests=" + queuedRequests.size() + - ", established=" + (established != null) + - ", closed=" + closed + - '}'; - } - } - } - - /** - * An established connection that wraps the actual channel instance and is - * registered at the {@link KvStateClientHandler} for callbacks. - */ - private class EstablishedConnection implements KvStateClientHandlerCallback { - - /** Address of the server we are connected to. */ - private final KvStateServerAddress serverAddress; - - /** The actual TCP channel. */ - private final Channel channel; - - /** Pending requests keyed by request ID. */ - private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>(); - - /** Current request number used to assign unique request IDs. */ - private final AtomicLong requestCount = new AtomicLong(); - - /** Reference to a failure that was reported by the channel. */ - private final AtomicReference<Throwable> failureCause = new AtomicReference<>(); - - /** - * Creates an established connection with the given channel. - * - * @param serverAddress Address of the server connected to - * @param channel The actual TCP channel - */ - EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) { - this.serverAddress = Preconditions.checkNotNull(serverAddress, "KvStateServerAddress"); - this.channel = Preconditions.checkNotNull(channel, "Channel"); - - // Add the client handler with the callback - channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this)); - - stats.reportActiveConnection(); - } - - /** - * Close the channel with a ClosedChannelException. - */ - void close() { - close(new ClosedChannelException()); - } - - /** - * Close the channel with a cause. - * - * @param cause The cause to close the channel with. - * @return Channel close future - */ - private boolean close(Throwable cause) { - if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); - - for (long requestId : pendingRequests.keySet()) { - PromiseAndTimestamp pending = pendingRequests.remove(requestId); - if (pending != null && pending.promise.tryFailure(cause)) { - stats.reportFailedRequest(); - } - } - - return true; - } - - return false; - } - - /** - * Returns a future holding the serialized request result. - * - * @param kvStateId ID of the KvState instance to query - * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance - * with - * @return Future holding the serialized result - */ - Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) { - PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp( - Futures.<byte[]>promise(), - System.nanoTime()); - - try { - final long requestId = requestCount.getAndIncrement(); - pendingRequests.put(requestId, requestPromiseTs); - - stats.reportRequest(); - - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequest( - channel.alloc(), - requestId, - kvStateId, - serializedKeyAndNamespace); - - channel.writeAndFlush(buf).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - // Fail promise if not failed to write - PromiseAndTimestamp pending = pendingRequests.remove(requestId); - if (pending != null && pending.promise.tryFailure(future.cause())) { - stats.reportFailedRequest(); - } - } - } - }); - - // Check failure for possible race. We don't want any lingering - // promises after a failure, which can happen if we don't check - // this here. Note that close is treated as a failure as well. - Throwable failure = failureCause.get(); - if (failure != null) { - // Remove from pending requests to guard against concurrent - // removal and to make sure that we only count it once as failed. - PromiseAndTimestamp p = pendingRequests.remove(requestId); - if (p != null && p.promise.tryFailure(failure)) { - stats.reportFailedRequest(); - } - } - } catch (Throwable t) { - requestPromiseTs.promise.tryFailure(t); - } - - return requestPromiseTs.promise.future(); - } - - @Override - public void onRequestResult(long requestId, byte[] serializedValue) { - PromiseAndTimestamp pending = pendingRequests.remove(requestId); - if (pending != null && pending.promise.trySuccess(serializedValue)) { - long durationMillis = (System.nanoTime() - pending.timestamp) / 1_000_000; - stats.reportSuccessfulRequest(durationMillis); - } - } - - @Override - public void onRequestFailure(long requestId, Throwable cause) { - PromiseAndTimestamp pending = pendingRequests.remove(requestId); - if (pending != null && pending.promise.tryFailure(cause)) { - stats.reportFailedRequest(); - } - } - - @Override - public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } - } - - @Override - public String toString() { - return "EstablishedConnection{" + - "serverAddress=" + serverAddress + - ", channel=" + channel + - ", pendingRequests=" + pendingRequests.size() + - ", requestCount=" + requestCount + - ", failureCause=" + failureCause + - '}'; - } - - /** - * Pair of promise and a timestamp. - */ - private class PromiseAndTimestamp { - - private final Promise<byte[]> promise; - private final long timestamp; - - public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) { - this.promise = promise; - this.timestamp = timestamp; - } - } - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java deleted file mode 100644 index 3e6470b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandler.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.query.netty.message.KvStateRequestFailure; -import org.apache.flink.runtime.query.netty.message.KvStateRequestResult; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestType; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.channels.ClosedChannelException; - -/** - * This handler expects responses from {@link KvStateServerHandler}. - * - * <p>It deserializes the response and calls the registered callback, which is - * responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}). - */ -class KvStateClientHandler extends ChannelInboundHandlerAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class); - - private final KvStateClientHandlerCallback callback; - - /** - * Creates a {@link KvStateClientHandler} with the callback. - * - * @param callback Callback for responses. - */ - KvStateClientHandler(KvStateClientHandlerCallback callback) { - this.callback = callback; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - try { - ByteBuf buf = (ByteBuf) msg; - KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf); - - if (msgType == KvStateRequestType.REQUEST_RESULT) { - KvStateRequestResult result = KvStateRequestSerializer.deserializeKvStateRequestResult(buf); - callback.onRequestResult(result.getRequestId(), result.getSerializedResult()); - } else if (msgType == KvStateRequestType.REQUEST_FAILURE) { - KvStateRequestFailure failure = KvStateRequestSerializer.deserializeKvStateRequestFailure(buf); - callback.onRequestFailure(failure.getRequestId(), failure.getCause()); - } else if (msgType == KvStateRequestType.SERVER_FAILURE) { - throw KvStateRequestSerializer.deserializeServerFailure(buf); - } else { - throw new IllegalStateException("Unexpected response type '" + msgType + "'"); - } - } catch (Throwable t1) { - try { - callback.onFailure(t1); - } catch (Throwable t2) { - LOG.error("Failed to notify callback about failure", t2); - } - } finally { - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - try { - callback.onFailure(cause); - } catch (Throwable t) { - LOG.error("Failed to notify callback about failure", t); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // Only the client is expected to close the channel. Otherwise it - // indicates a failure. Note that this will be invoked in both cases - // though. If the callback closed the channel, the callback must be - // ignored. - try { - callback.onFailure(new ClosedChannelException()); - } catch (Throwable t) { - LOG.error("Failed to notify callback about failure", t); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java deleted file mode 100644 index 65ff781..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateClientHandlerCallback.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.query.netty.message.KvStateRequest; - -/** - * Callback for {@link KvStateClientHandler}. - */ -interface KvStateClientHandlerCallback { - - /** - * Called on a successful {@link KvStateRequest}. - * - * @param requestId ID of the request - * @param serializedValue Serialized value for the request - */ - void onRequestResult(long requestId, byte[] serializedValue); - - /** - * Called on a failed {@link KvStateRequest}. - * - * @param requestId ID of the request - * @param cause Cause of the request failure - */ - void onRequestFailure(long requestId, Throwable cause); - - /** - * Called on any failure, which is not related to a specific request. - * - * <p>This can be for example a caught Exception in the channel pipeline - * or an unexpected channel close. - * - * @param cause Cause of the failure - */ - void onFailure(Throwable cause); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java index 1c0d8d5..9781e23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateRequestStats.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.query.netty; +import org.apache.flink.runtime.query.KvStateServer; + /** * Simple statistics for {@link KvStateServer} monitoring. */ http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java deleted file mode 100644 index 7cf2148..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServer.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.io.network.netty.NettyBufferPool; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServerAddress; -import org.apache.flink.runtime.query.netty.message.KvStateRequest; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption; -import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; -import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -/** - * Netty-based server answering {@link KvStateRequest} messages. - * - * <p>Requests are handled by asynchronous query tasks (see {@link KvStateServerHandler.AsyncKvStateQueryTask}) - * that are executed by a separate query Thread pool. This pool is shared among - * all TCP connections. - * - * <p>The incoming pipeline looks as follows: - * <pre> - * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler - * </pre> - * - * <p>Received binary messages are expected to contain a frame length field. Netty's - * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before - * giving it to our {@link KvStateServerHandler}. - * - * <p>Connections are established and closed by the client. The server only - * closes the connection on a fatal failure that cannot be recovered. A - * server-side connection close is considered a failure by the client. - */ -public class KvStateServer { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateServer.class); - - /** Server config: low water mark. */ - private static final int LOW_WATER_MARK = 8 * 1024; - - /** Server config: high water mark. */ - private static final int HIGH_WATER_MARK = 32 * 1024; - - /** Netty's ServerBootstrap. */ - private final ServerBootstrap bootstrap; - - /** Query executor thread pool. */ - private final ExecutorService queryExecutor; - - /** Address of this server. */ - private KvStateServerAddress serverAddress; - - /** - * Creates the {@link KvStateServer}. - * - * <p>The server needs to be started via {@link #start()} in order to bind - * to the configured bind address. - * - * @param bindAddress Address to bind to - * @param bindPort Port to bind to. Pick random port if 0. - * @param numEventLoopThreads Number of event loop threads - * @param numQueryThreads Number of query threads - * @param kvStateRegistry KvStateRegistry to query for KvState instances - * @param stats Statistics tracker - */ - public KvStateServer( - InetAddress bindAddress, - int bindPort, - int numEventLoopThreads, - int numQueryThreads, - KvStateRegistry kvStateRegistry, - KvStateRequestStats stats) { - - Preconditions.checkArgument(bindPort >= 0 && bindPort <= 65536, "Port " + bindPort + - " is out of valid port range (0-65536)."); - - Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads."); - Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads."); - - Preconditions.checkNotNull(kvStateRegistry, "KvStateRegistry"); - Preconditions.checkNotNull(stats, "KvStateRequestStats"); - - NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads); - - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink KvStateServer EventLoop Thread %d") - .build(); - - NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory); - - queryExecutor = createQueryExecutor(numQueryThreads); - - // Shared between all channels - KvStateServerHandler serverHandler = new KvStateServerHandler( - kvStateRegistry, - queryExecutor, - stats); - - bootstrap = new ServerBootstrap() - // Bind address and port - .localAddress(bindAddress, bindPort) - // NIO server channels - .group(nioGroup) - .channel(NioServerSocketChannel.class) - // Server channel Options - .option(ChannelOption.ALLOCATOR, bufferPool) - // Child channel options - .childOption(ChannelOption.ALLOCATOR, bufferPool) - .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, HIGH_WATER_MARK) - .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, LOW_WATER_MARK) - // See initializer for pipeline details - .childHandler(new KvStateServerChannelInitializer(serverHandler)); - } - - /** - * Starts the server by binding to the configured bind address (blocking). - * - * @throws InterruptedException If interrupted during the bind operation - */ - public void start() throws InterruptedException { - Channel channel = bootstrap.bind().sync().channel(); - - InetSocketAddress localAddress = (InetSocketAddress) channel.localAddress(); - serverAddress = new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort()); - } - - /** - * Returns the address of this server. - * - * @return Server address - * @throws IllegalStateException If server has not been started yet - */ - public KvStateServerAddress getAddress() { - if (serverAddress == null) { - throw new IllegalStateException("KvStateServer not started yet."); - } - - return serverAddress; - } - - /** - * Shuts down the server and all related thread pools. - */ - public void shutDown() { - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0, 10, TimeUnit.SECONDS); - } - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } - - serverAddress = null; - } - - /** - * Creates a thread pool for the query execution. - * - * @param numQueryThreads Number of query threads. - * @return Thread pool for query execution - */ - private static ExecutorService createQueryExecutor(int numQueryThreads) { - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("Flink KvStateServer Query Thread %d") - .build(); - - return Executors.newFixedThreadPool(numQueryThreads, threadFactory); - } - - /** - * Channel pipeline initializer. - * - * <p>The request handler is shared, whereas the other handlers are created - * per channel. - */ - private static final class KvStateServerChannelInitializer extends ChannelInitializer<SocketChannel> { - - /** The shared request handler. */ - private final KvStateServerHandler sharedRequestHandler; - - /** - * Creates the channel pipeline initializer with the shared request handler. - * - * @param sharedRequestHandler Shared request handler. - */ - public KvStateServerChannelInitializer(KvStateServerHandler sharedRequestHandler) { - this.sharedRequestHandler = Preconditions.checkNotNull(sharedRequestHandler, "Request handler"); - } - - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline() - .addLast(new ChunkedWriteHandler()) - .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)) - .addLast(sharedRequestHandler); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java deleted file mode 100644 index 1af55dc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.netty.message.KvStateRequest; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; -import org.apache.flink.runtime.query.netty.message.KvStateRequestType; -import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.util.ExceptionUtils; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * This handler dispatches asynchronous tasks, which query {@link InternalKvState} - * instances and write the result to the channel. - * - * <p>The network threads receive the message, deserialize it and dispatch the - * query task. The actual query is handled in a separate thread as it might - * otherwise block the network threads (file I/O etc.). - */ [email protected] -class KvStateServerHandler extends ChannelInboundHandlerAdapter { - - private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class); - - /** KvState registry holding references to the KvState instances. */ - private final KvStateRegistry registry; - - /** Thread pool for query execution. */ - private final ExecutorService queryExecutor; - - /** Exposed server statistics. */ - private final KvStateRequestStats stats; - - /** - * Create the handler. - * - * @param kvStateRegistry Registry to query. - * @param queryExecutor Thread pool for query execution. - * @param stats Exposed server statistics. - */ - KvStateServerHandler( - KvStateRegistry kvStateRegistry, - ExecutorService queryExecutor, - KvStateRequestStats stats) { - - this.registry = Objects.requireNonNull(kvStateRegistry, "KvStateRegistry"); - this.queryExecutor = Objects.requireNonNull(queryExecutor, "Query thread pool"); - this.stats = Objects.requireNonNull(stats, "KvStateRequestStats"); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - stats.reportActiveConnection(); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - stats.reportInactiveConnection(); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - KvStateRequest request = null; - - try { - ByteBuf buf = (ByteBuf) msg; - KvStateRequestType msgType = KvStateRequestSerializer.deserializeHeader(buf); - - if (msgType == KvStateRequestType.REQUEST) { - // ------------------------------------------------------------ - // Request - // ------------------------------------------------------------ - request = KvStateRequestSerializer.deserializeKvStateRequest(buf); - - stats.reportRequest(); - - InternalKvState<?> kvState = registry.getKvState(request.getKvStateId()); - - if (kvState != null) { - // Execute actual query async, because it is possibly - // blocking (e.g. file I/O). - // - // A submission failure is not treated as fatal. - queryExecutor.submit(new AsyncKvStateQueryTask(ctx, request, kvState, stats)); - } else { - ByteBuf unknown = KvStateRequestSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new UnknownKvStateID(request.getKvStateId())); - - ctx.writeAndFlush(unknown); - - stats.reportFailedRequest(); - } - } else { - // ------------------------------------------------------------ - // Unexpected - // ------------------------------------------------------------ - ByteBuf failure = KvStateRequestSerializer.serializeServerFailure( - ctx.alloc(), - new IllegalArgumentException("Unexpected message type " + msgType - + ". KvStateServerHandler expects " - + KvStateRequestType.REQUEST + " messages.")); - - ctx.writeAndFlush(failure); - } - } catch (Throwable t) { - String stringifiedCause = ExceptionUtils.stringifyException(t); - - ByteBuf err; - if (request != null) { - String errMsg = "Failed to handle incoming request with ID " + - request.getRequestId() + ". Caused by: " + stringifiedCause; - err = KvStateRequestSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new RuntimeException(errMsg)); - - stats.reportFailedRequest(); - } else { - String errMsg = "Failed to handle incoming message. Caused by: " + stringifiedCause; - err = KvStateRequestSerializer.serializeServerFailure( - ctx.alloc(), - new RuntimeException(errMsg)); - } - - ctx.writeAndFlush(err); - } finally { - // IMPORTANT: We have to always recycle the incoming buffer. - // Otherwise we will leak memory out of Netty's buffer pool. - // - // If any operation ever holds on to the buffer, it is the - // responsibility of that operation to retain the buffer and - // release it later. - ReferenceCountUtil.release(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - String stringifiedCause = ExceptionUtils.stringifyException(cause); - String msg = "Exception in server pipeline. Caused by: " + stringifiedCause; - - ByteBuf err = KvStateRequestSerializer.serializeServerFailure( - ctx.alloc(), - new RuntimeException(msg)); - - ctx.writeAndFlush(err).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Task to execute the actual query against the {@link InternalKvState} instance. - */ - private static class AsyncKvStateQueryTask implements Runnable { - - private final ChannelHandlerContext ctx; - - private final KvStateRequest request; - - private final InternalKvState<?> kvState; - - private final KvStateRequestStats stats; - - private final long creationNanos; - - public AsyncKvStateQueryTask( - ChannelHandlerContext ctx, - KvStateRequest request, - InternalKvState<?> kvState, - KvStateRequestStats stats) { - - this.ctx = Objects.requireNonNull(ctx, "Channel handler context"); - this.request = Objects.requireNonNull(request, "State query"); - this.kvState = Objects.requireNonNull(kvState, "KvState"); - this.stats = Objects.requireNonNull(stats, "State query stats"); - this.creationNanos = System.nanoTime(); - } - - @Override - public void run() { - boolean success = false; - - try { - if (!ctx.channel().isActive()) { - return; - } - - // Query the KvState instance - byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace(); - byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace); - - if (serializedResult != null) { - // We found some data, success! - ByteBuf buf = KvStateRequestSerializer.serializeKvStateRequestResult( - ctx.alloc(), - request.getRequestId(), - serializedResult); - - int highWatermark = ctx.channel().config().getWriteBufferHighWaterMark(); - - ChannelFuture write; - if (buf.readableBytes() <= highWatermark) { - write = ctx.writeAndFlush(buf); - } else { - write = ctx.writeAndFlush(new ChunkedByteBuf(buf, highWatermark)); - } - - write.addListener(new QueryResultWriteListener()); - - success = true; - } else { - // No data for the key/namespace. This is considered to be - // a failure. - ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure( - ctx.alloc(), - request.getRequestId(), - new UnknownKeyOrNamespace()); - - ctx.writeAndFlush(unknownKey); - } - } catch (Throwable t) { - try { - String stringifiedCause = ExceptionUtils.stringifyException(t); - String errMsg = "Failed to query state backend for query " + - request.getRequestId() + ". Caused by: " + stringifiedCause; - - ByteBuf err = KvStateRequestSerializer.serializeKvStateRequestFailure( - ctx.alloc(), request.getRequestId(), new RuntimeException(errMsg)); - - ctx.writeAndFlush(err); - } catch (IOException e) { - LOG.error("Failed to respond with the error after failed to query state backend", e); - } - } finally { - if (!success) { - stats.reportFailedRequest(); - } - } - } - - @Override - public String toString() { - return "AsyncKvStateQueryTask{" + - ", request=" + request + - ", creationNanos=" + creationNanos + - '}'; - } - - /** - * Callback after query result has been written. - * - * <p>Gathers stats and logs errors. - */ - private class QueryResultWriteListener implements ChannelFutureListener { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - long durationNanos = System.nanoTime() - creationNanos; - long durationMillis = TimeUnit.MILLISECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - - if (future.isSuccess()) { - stats.reportSuccessfulRequest(durationMillis); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Query " + request + " failed after " + durationMillis + " ms", future.cause()); - } - - stats.reportFailedRequest(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java deleted file mode 100644 index 4e5a1de..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKeyOrNamespace.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -/** - * Thrown if the KvState does not hold any state for the given key or namespace. - */ -public class UnknownKeyOrNamespace extends IllegalStateException { - - private static final long serialVersionUID = 1L; - - UnknownKeyOrNamespace() { - super("KvState does not hold any state for key/namespace."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java deleted file mode 100644 index cc60035..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/UnknownKvStateID.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.query.netty; - -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.util.Preconditions; - -/** - * Thrown if no KvState with the given ID cannot found by the server handler. - */ -public class UnknownKvStateID extends IllegalStateException { - - private static final long serialVersionUID = 1L; - - public UnknownKvStateID(KvStateID kvStateId) { - super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") + - " at TaskManager."); - } -}
