Repository: flink Updated Branches: refs/heads/master e8931bd58 -> a19d8bfa9
[FLINK-7822][QS][doc] Update Queryable State docs. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19d8bfa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19d8bfa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19d8bfa Branch: refs/heads/master Commit: a19d8bfa9ab964e49fc29da037f862bafe7a853b Parents: 84746a8 Author: kkloudas <[email protected]> Authored: Mon Nov 6 17:21:45 2017 +0100 Committer: kkloudas <[email protected]> Committed: Tue Nov 7 14:07:54 2017 +0100 ---------------------------------------------------------------------- docs/dev/stream/state/queryable_state.md | 265 ++++++++++++++------------ 1 file changed, 144 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a19d8bfa/docs/dev/stream/state/queryable_state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md index 8012f67..af646df 100644 --- a/docs/dev/stream/state/queryable_state.md +++ b/docs/dev/stream/state/queryable_state.md @@ -32,38 +32,68 @@ under the License. likely that there will be breaking API changes on the client side in the upcoming Flink versions. </div> -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. +In a nutshell, this feature exposes Flink's managed keyed (partitioned) state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and +allows the user to query a job's state from outside Flink. For some scenarios, queryable state +eliminates the need for distributed operations/transactions with external systems such as key-value +stores which are often the bottleneck in practice. In addition, this feature may be particularly +useful for debugging purposes. <div class="alert alert-warning"> - <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. + <strong>Attention:</strong> When querying a state object, that object is accessed from a concurrent + thread without any synchronization or copying. This is a design choice, as any of the above would lead + to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, + <i>e.g.</i> <code>MemoryStateBackend</code> or <code>FsStateBackend</code>, does not work + with copies when retrieving values but instead directly references the stored values, read-modify-write + patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. + The <code>RocksDBStateBackend</code> is safe from these issues. </div> +## Architecture + +Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. +The Queryable State feature consists of three main entities: + + 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, + 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible + for receiving the client's queries, fetching the requested state from the responsible Task Manager on his behalf, and + returning it to the client, and + 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + +In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific +key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in +*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is +responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will +then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the +response back to the client. + +## Activating Queryable State + +To enable queryable state on your Flink cluster, you just have to copy the +`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), +to the `lib/` folder. Otherwise, the queryable state feature is not enabled. + +To verify that your cluster is running with queryable state enabled, check the logs of any +task manager for the line: `"Started the Queryable State Proxy Server @ ..."`. + ## Making State Queryable -In order to make state queryable, the queryable state server first needs to be enabled globally -by setting the `query.server.enable` configuration parameter to `true` (current default). -Then appropriate state needs to be made queryable by using either +Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to +be visible to the outside world, it needs to be explicitly made queryable by using: -* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as -queryable state, or -* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an -operator queryable. +* either a `QueryableStateStream`, a convenience object which acts as a sink and offers its incoming values as queryable +state, or +* the `stateDescriptor.setQueryable(String queryableStateName)` method, which makes the keyed state represented by the + state descriptor, queryable. The following sections explain the use of these two approaches. ### Queryable State Stream -A `KeyedStream` may offer its values as queryable state by using the following methods: +Calling `.asQueryableState(stateName, stateDescriptor)` on a `KeyedStream` returns a `QueryableStateStream` which offers +its values as queryable state. Depending on the type of state, there are the following variants of the `asQueryableState()` +method: {% highlight java %} // ValueState @@ -91,18 +121,16 @@ QueryableStateStream asQueryableState( list which may not be cleaned up and thus will eventually consume too much memory. </div> -A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and -currently only holds the name as well as the value and key serializer for the queryable state -stream. It is comparable to a sink, and cannot be followed by further transformations. +The returned `QueryableStateStream` can be seen as a sink and **cannot** be further transformed. Internally, a +`QueryableStateStream` gets translated to an operator which uses all incoming records to update the queryable state +instance. The updating logic is implied by the type of the `StateDescriptor` provided in the `asQueryableState` call. +In a program like the following, all records of the keyed stream will be used to update the state instance via the +`ValueState.update(value)`: -Internally a `QueryableStateStream` gets translated to an operator which uses all incoming -records to update the queryable state instance. -In a program like the following, all records of the keyed stream will be used to update the state -instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on -the chosen state variant: {% highlight java %} stream.keyBy(0).asQueryableState("query-name") {% endhighlight %} + This acts like the Scala API's `flatMapWithState`. ### Managed Keyed State @@ -110,7 +138,7 @@ This acts like the Scala API's `flatMapWithState`. Managed keyed state of an operator (see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state)) can be made queryable by making the appropriate state descriptor queryable via -`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below: +`StateDescriptor.setQueryable(String queryableStateName)`, as in the example below: {% highlight java %} ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( @@ -119,61 +147,79 @@ ValueStateDescriptor<Tuple2<Long, Long>> descriptor = Tuple2.of(0L, 0L)); // default value of the state, if nothing was set descriptor.setQueryable("query-name"); // queryable state name {% endhighlight %} + <div class="alert alert-info"> - <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only + <strong>Note:</strong> The <code>queryableStateName</code> parameter may be chosen arbitrarily and is only used for queries. It does not have to be identical to the state's own name. </div> +This variant has no limitations as to which type of state can be made queryable. This means that this can be used for +any `ValueState`, `ReduceState`, `ListState`, `MapState`, `AggregatingState`, and the currently deprecated `FoldingState`. ## Querying State -The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that -serve the state internally. It needs to be set up with a valid `JobManager` address and port and is -created as follows: +So far, you have set up your cluster to run with queryable state and you have declared (some of) your state as +queryable. Now it is time to see how to query this state. -{% highlight java %} -final Configuration config = new Configuration(); -config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress); -config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort); +For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` +jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below: -final HighAvailabilityServices highAvailabilityServices = - HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.newSingleThreadScheduledExecutor(), - HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); +<div data-lang="java" markdown="1"> +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-queryable-state-client-java_{{ site.scala_version_suffix }}</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} +</div> + +For more on this, you can check how to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html). + +The `QueryableStateClient` will submit your query to the internal proxy, which will then process your query and return +the final result. The only requirement to initialize the client is to provide a valid `TaskManager` hostname (remember +that there is a queryable state proxy running on each task manager) and the port where the proxy listens. More on how +to configure the proxy and state server port(s) in the [Configuration Section](#Configuration). -QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); +{% highlight java %} +QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); {% endhighlight %} -The query method is this: +With the client ready, to query a state of type `V`, associated with a key of type `K`, you can use the method: {% highlight java %} -Future<byte[]> getKvState( - JobID jobID, - String queryableStateName, - int keyHashCode, - byte[] serializedKeyAndNamespace) +CompletableFuture<S> getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeInformation<K> keyTypeInfo, + final StateDescriptor<S, V> stateDescriptor) {% endhighlight %} -A call to this method returns a `Future` eventually holding the serialized state value for the -queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The -`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the -`serializedKeyAndNamespace` is the serialized key and namespace. +The above returns a `CompletableFuture` eventually holding the state value for the queryable state instance identified +by `queryableStateName` of the job with ID `jobID`. The `key` is the key whose state you are interested in and the +`keyTypeInfo` will tell Flink how to serialize/deserialize it. Finally, the `stateDescriptor` contains the necessary +information about the requested state, namely its type (`Value`, `Reduce`, etc) and the necessary information on how +to serialize/deserialize it. + +The careful reader will notice that the returned future contains a value of type `S`, *i.e.* a `State` object containing +the actual value. This can be any of the state types supported by Flink: `ValueState`, `ReduceState`, `ListState`, `MapState`, +`AggregatingState`, and the currently deprecated `FoldingState`. + +<div class="alert alert-info"> + <strong>Note:</strong>These state objects do not allow modifications to the contained state. You can use them to get + the actual value of the state, <i>e.g.</i> using <code>valueState.get()</code>, or iterate over + the contained <code><K, V></code> entries, <i>e.g.</i> using the <code>mapState.entries()</code>, but you cannot + modify them. As an example, calling the <code>add()</code> method on a returned list state will throw an + <code>UnsupportedOperationException</code>. +</div> + <div class="alert alert-info"> <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs - to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free + to be shutdown via <code>QueryableStateClient.shutdown()</code> when unused in order to free resources. </div> -The current implementation is still pretty low-level in the sense that it only works with -serialized data both for providing the key/namespace and the returned results. It is the -responsibility of the user (or some follow-up utilities) to set up the serializers for this. The -nice thing about this is that the query services don't have to get into the business of worrying -about any class loading issues etc. - -There are some serialization utils for key/namespace and value serialization included in -`KvStateRequestSerializer`. - ### Example The following example extends the `CountWindowAverage` example @@ -183,7 +229,7 @@ by making it queryable and showing how to query this value: {% highlight java %} public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum; + private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { @@ -214,74 +260,51 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Once used in a job, you can retrieve the job ID and then query any key's current state from this operator: {% highlight java %} -final Configuration config = new Configuration(); -config.setString(JobManagerOptions.ADDRESS, queryAddress); -config.setInteger(JobManagerOptions.PORT, queryPort); - -final HighAvailabilityServices highAvailabilityServices = - HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.newSingleThreadScheduledExecutor(), - HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); - -QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); - -final TypeSerializer<Long> keySerializer = - TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig()); -final TypeSerializer<Tuple2<Long, Long>> valueSerializer = - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig()); - -final byte[] serializedKey = - KvStateRequestSerializer.serializeKeyAndNamespace( - key, keySerializer, - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - -Future<byte[]> serializedResult = - client.getKvState(jobId, "query-name", key.hashCode(), serializedKey); - -// now wait for the result and return it -final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS); -byte[] serializedValue = Await.result(serializedResult, duration); -Tuple2<Long, Long> value = - KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); -{% endhighlight %} - -### Note for Scala Users - -Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import: +QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); -```scala -import org.apache.flink.streaming.api.scala._ -``` - -Now you can create the type serializers as follows: - -```scala -val keySerializer = createTypeInformation[Long] - .createSerializer(new ExecutionConfig) -``` - -If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be captured at runtime. +// the state descriptor of the state to be fetched. +ValueStateDescriptor<Tuple2<Long, Long>> descriptor = + new ValueStateDescriptor<>( + "average", + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), + Tuple2.of(0L, 0L)); + +CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture = + client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor); + +// now handle the returned value +resultFuture.thenAccept(response -> { + try { + Tuple2<Long, Long> res = response.get(); + } catch (Exception e) { + e.printStackTrace(); + } +}); +{% endhighlight %} ## Configuration The following configuration parameters influence the behaviour of the queryable state server and client. They are defined in `QueryableStateOptions`. -### Server -* `query.server.enable`: flag to indicate whether to start the queryable state server -* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port) -* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots) -* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots). +### State Server +* `query.server.ports`: the server port range of the queryable state server. This is useful to avoid port clashes if more + than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200", + or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9067. +* `query.server.network-threads`: number of network (event loop) threads receiving incoming requests for the state server (0 => #slots) +* `query.server.query-threads`: number of threads handling/serving incoming requests for the state server (0 => #slots). + -### Client (`QueryableStateClient`) -* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores) -* `query.client.lookup.num-retries`: number of retries on location lookup failures -* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis) +### Proxy +* `query.proxy.ports`: the server port range of the queryable state proxy. This is useful to avoid port clashes if more + than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200", + or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9069. +* `query.proxy.network-threads`: number of network (event loop) threads receiving incoming requests for the client proxy (0 => #slots) +* `query.proxy.query-threads`: number of threads handling/serving incoming requests for the client proxy (0 => #slots). ## Limitations -* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register +* The queryable state life-cycle is bound to the life-cycle of the job, *e.g.* tasks register queryable state on startup and unregister it on disposal. In future versions, it is desirable to decouple this in order to allow queries after a task finishes, and to speed up recovery via state replication.
