Repository: flink
Updated Branches:
  refs/heads/master e8931bd58 -> a19d8bfa9


[FLINK-7822][QS][doc] Update Queryable State docs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a19d8bfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a19d8bfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a19d8bfa

Branch: refs/heads/master
Commit: a19d8bfa9ab964e49fc29da037f862bafe7a853b
Parents: 84746a8
Author: kkloudas <[email protected]>
Authored: Mon Nov 6 17:21:45 2017 +0100
Committer: kkloudas <[email protected]>
Committed: Tue Nov 7 14:07:54 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state/queryable_state.md | 265 ++++++++++++++------------
 1 file changed, 144 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a19d8bfa/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md 
b/docs/dev/stream/state/queryable_state.md
index 8012f67..af646df 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -32,38 +32,68 @@ under the License.
   likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
 </div>
 
-In a nutshell, this feature allows users to query Flink's managed partitioned 
state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
-operations/transactions with external systems such as key-value stores which 
are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to 
the outside world and 
+allows the user to query a job's state from outside Flink. For some scenarios, 
queryable state 
+eliminates the need for distributed operations/transactions with external 
systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this feature 
may be particularly 
+useful for debugging purposes.
 
 <div class="alert alert-warning">
-  <strong>Attention:</strong> Queryable state accesses keyed state from a 
concurrent thread rather
-  than synchronizing with the operator and potentially blocking its operation. 
Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but instead 
directly
-  references the stored values, read-modify-write patterns are unsafe and may 
cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  <strong>Attention:</strong> When querying a state object, that object is 
accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, as 
any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state backend 
using Java heap space, 
+  <i>e.g.</i> <code>MemoryStateBackend</code> or <code>FsStateBackend</code>, 
does not work 
+  with copies when retrieving values but instead directly references the 
stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due to 
concurrent modifications.
+  The <code>RocksDBStateBackend</code> is safe from these issues.
 </div>
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly 
describe the entities that compose it.
+The Queryable State feature consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink 
cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* 
inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state from the 
responsible Task Manager on his behalf, and 
+ returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is 
responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a 
request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl 
}}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key groups. 
To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the 
`JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for the 
state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
+`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version 
}}.jar` 
+from the `opt/` folder of your [Flink 
distribution](https://flink.apache.org/downloads.html "Apache Flink: 
Downloads"), 
+to the `lib/` folder. Otherwise, the queryable state feature is not enabled. 
+
+To verify that your cluster is running with queryable state enabled, check the 
logs of any 
+task manager for the line: `"Started the Queryable State Proxy Server @ ..."`.
+
 ## Making State Queryable
 
-In order to make state queryable, the queryable state server first needs to be 
enabled globally
-by setting the `query.server.enable` configuration parameter to `true` 
(current default).
-Then appropriate state needs to be made queryable by using either
+Now that you have activated queryable state on your cluster, it is time to see 
how to use it. In order for a state to 
+be visible to the outside world, it needs to be explicitly made queryable by 
using:
 
-* a `QueryableStateStream`, a convenience object which behaves like a sink and 
offers incoming values as
-queryable state, or
-* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the 
keyed state of an
-operator queryable.
+* either a `QueryableStateStream`, a convenience object which acts as a sink 
and offers its incoming values as queryable
+state, or
+* the `stateDescriptor.setQueryable(String queryableStateName)` method, which 
makes the keyed state represented by the
+ state descriptor, queryable.
 
 The following sections explain the use of these two approaches.
 
 ### Queryable State Stream
 
-A `KeyedStream` may offer its values as queryable state by using the following 
methods:
+Calling `.asQueryableState(stateName, stateDescriptor)` on a `KeyedStream` 
returns a `QueryableStateStream` which offers
+its values as queryable state. Depending on the type of state, there are the 
following variants of the `asQueryableState()`
+method:
 
 {% highlight java %}
 // ValueState
@@ -91,18 +121,16 @@ QueryableStateStream asQueryableState(
   list which may not be cleaned up and thus will eventually consume too much 
memory.
 </div>
 
-A call to these methods returns a `QueryableStateStream`, which cannot be 
further transformed and
-currently only holds the name as well as the value and key serializer for the 
queryable state
-stream. It is comparable to a sink, and cannot be followed by further 
transformations.
+The returned `QueryableStateStream` can be seen as a sink and **cannot** be 
further transformed. Internally, a 
+`QueryableStateStream` gets translated to an operator which uses all incoming 
records to update the queryable state 
+instance. The updating logic is implied by the type of the `StateDescriptor` 
provided in the `asQueryableState` call. 
+In a program like the following, all records of the keyed stream will be used 
to update the state instance via the 
+`ValueState.update(value)`:
 
-Internally a `QueryableStateStream` gets translated to an operator which uses 
all incoming
-records to update the queryable state instance.
-In a program like the following, all records of the keyed stream will be used 
to update the state
-instance, either via `ValueState#update(value)` or 
`AppendingState#add(value)`, depending on
-the chosen state variant:
 {% highlight java %}
 stream.keyBy(0).asQueryableState("query-name")
 {% endhighlight %}
+
 This acts like the Scala API's `flatMapWithState`.
 
 ### Managed Keyed State
@@ -110,7 +138,7 @@ This acts like the Scala API's `flatMapWithState`.
 Managed keyed state of an operator
 (see [Using Managed Keyed State]({{ site.baseurl 
}}/dev/stream/state/state.html#using-managed-keyed-state))
 can be made queryable by making the appropriate state descriptor queryable via
-`StateDescriptor#setQueryable(String queryableStateName)`, as in the example 
below:
+`StateDescriptor.setQueryable(String queryableStateName)`, as in the example 
below:
 {% highlight java %}
 ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
         new ValueStateDescriptor<>(
@@ -119,61 +147,79 @@ ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                 Tuple2.of(0L, 0L)); // default value of the state, if nothing 
was set
 descriptor.setQueryable("query-name"); // queryable state name
 {% endhighlight %}
+
 <div class="alert alert-info">
-  <strong>Note:</strong> The `queryableStateName` parameter may be chosen 
arbitrarily and is only
+  <strong>Note:</strong> The <code>queryableStateName</code> parameter may be 
chosen arbitrarily and is only
   used for queries. It does not have to be identical to the state's own name.
 </div>
 
+This variant has no limitations as to which type of state can be made 
queryable. This means that this can be used for 
+any `ValueState`, `ReduceState`, `ListState`, `MapState`, `AggregatingState`, 
and the currently deprecated `FoldingState`.
 
 ## Querying State
 
-The `QueryableStateClient` helper class may be used for queries against the 
`KvState` instances that
-serve the state internally. It needs to be set up with a valid `JobManager` 
address and port and is
-created as follows:
+So far, you have set up your cluster to run with queryable state and you have 
declared (some of) your state as
+queryable. Now it is time to see how to query this state. 
 
-{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
-config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+For this you can use the `QueryableStateClient` helper class. This is 
available in the `flink-queryable-state-client` 
+jar which you have to explicitly include as a dependency in the `pom.xml` of 
your project, as shown below:
 
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-queryable-state-client-java_{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+
+For more on this, you can check how to [set up a Flink program]({{ 
site.baseurl }}/dev/linking_with_flink.html).
+
+The `QueryableStateClient` will submit your query to the internal proxy, which 
will then process your query and return 
+the final result. The only requirement to initialize the client is to provide 
a valid `TaskManager` hostname (remember 
+that there is a queryable state proxy running on each task manager) and the 
port where the proxy listens. More on how 
+to configure the proxy and state server port(s) in the [Configuration 
Section](#Configuration).
 
-QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
+{% highlight java %}
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
 {% endhighlight %}
 
-The query method is this:
+With the client ready, to query a state of type `V`, associated with a key of 
type `K`, you can use the method:
 
 {% highlight java %}
-Future<byte[]> getKvState(
-    JobID jobID,
-    String queryableStateName,
-    int keyHashCode,
-    byte[] serializedKeyAndNamespace)
+CompletableFuture<S> getKvState(
+    final JobID jobId,
+    final String queryableStateName,
+    final K key,
+    final TypeInformation<K> keyTypeInfo,
+    final StateDescriptor<S, V> stateDescriptor)
 {% endhighlight %}
 
-A call to this method returns a `Future` eventually holding the serialized 
state value for the
-queryable state instance identified by `queryableStateName` of the job with ID 
`jobID`. The
-`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` 
and the
-`serializedKeyAndNamespace` is the serialized key and namespace.
+The above returns a `CompletableFuture` eventually holding the state value for 
the queryable state instance identified 
+by `queryableStateName` of the job with ID `jobID`. The `key` is the key whose 
state you are interested in and the 
+`keyTypeInfo` will tell Flink how to serialize/deserialize it. Finally, the 
`stateDescriptor` contains the necessary 
+information about the requested state, namely its type (`Value`, `Reduce`, 
etc) and the necessary information on how 
+to serialize/deserialize it.
+
+The careful reader will notice that the returned future contains a value of 
type `S`, *i.e.* a `State` object containing
+the actual value. This can be any of the state types supported by Flink: 
`ValueState`, `ReduceState`, `ListState`, `MapState`,
+`AggregatingState`, and the currently deprecated `FoldingState`. 
+
+<div class="alert alert-info">
+  <strong>Note:</strong>These state objects do not allow modifications to the 
contained state. You can use them to get 
+  the actual value of the state, <i>e.g.</i> using 
<code>valueState.get()</code>, or iterate over
+  the contained <code><K, V></code> entries, <i>e.g.</i> using the 
<code>mapState.entries()</code>, but you cannot 
+  modify them. As an example, calling the <code>add()</code> method on a 
returned list state will throw an 
+  <code>UnsupportedOperationException</code>.
+</div>
+
 <div class="alert alert-info">
   <strong>Note:</strong> The client is asynchronous and can be shared by 
multiple threads. It needs
-  to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused 
in order to free
+  to be shutdown via <code>QueryableStateClient.shutdown()</code> when unused 
in order to free
   resources.
 </div>
 
-The current implementation is still pretty low-level in the sense that it only 
works with
-serialized data both for providing the key/namespace and the returned results. 
It is the
-responsibility of the user (or some follow-up utilities) to set up the 
serializers for this. The
-nice thing about this is that the query services don't have to get into the 
business of worrying
-about any class loading issues etc.
-
-There are some serialization utils for key/namespace and value serialization 
included in
-`KvStateRequestSerializer`.
-
 ### Example
 
 The following example extends the `CountWindowAverage` example
@@ -183,7 +229,7 @@ by making it queryable and showing how to query this value:
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
 
-    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
+    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple 
containing the count and the sum
 
     @Override
     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
@@ -214,74 +260,51 @@ public class CountWindowAverage extends 
RichFlatMapFunction<Tuple2<Long, Long>,
 Once used in a job, you can retrieve the job ID and then query any key's 
current state from this operator:
 
 {% highlight java %}
-final Configuration config = new Configuration();
-config.setString(JobManagerOptions.ADDRESS, queryAddress);
-config.setInteger(JobManagerOptions.PORT, queryPort);
-
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
-
-final TypeSerializer<Long> keySerializer =
-        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new 
ExecutionConfig());
-final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}).createSerializer(new ExecutionConfig());
-
-final byte[] serializedKey =
-        KvStateRequestSerializer.serializeKeyAndNamespace(
-                key, keySerializer,
-                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-
-Future<byte[]> serializedResult =
-        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
-
-// now wait for the result and return it
-final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
-byte[] serializedValue = Await.result(serializedResult, duration);
-Tuple2<Long, Long> value =
-        KvStateRequestSerializer.deserializeValue(serializedValue, 
valueSerializer);
-{% endhighlight %}
-
-### Note for Scala Users
-
-Please use the available Scala extensions when creating the `TypeSerializer` 
instances. Add the following import:
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
 
-```scala
-import org.apache.flink.streaming.api.scala._
-```
-
-Now you can create the type serializers as follows:
-
-```scala
-val keySerializer = createTypeInformation[Long]
-  .createSerializer(new ExecutionConfig)
-```
-
-If you don't do this, you can run into mismatches between the serializers used 
in the Flink job and in your client code, because types like `scala.Long` 
cannot be captured at runtime.
+// the state descriptor of the state to be fetched.
+ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+        new ValueStateDescriptor<>(
+          "average",
+          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
+          Tuple2.of(0L, 0L));
+
+CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
+        client.getKvState(jobId, "query-name", key, 
BasicTypeInfo.LONG_TYPE_INFO, descriptor);
+
+// now handle the returned value
+resultFuture.thenAccept(response -> {
+        try {
+            Tuple2<Long, Long> res = response.get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+});
+{% endhighlight %}
 
 ## Configuration
 
 The following configuration parameters influence the behaviour of the 
queryable state server and client.
 They are defined in `QueryableStateOptions`.
 
-### Server
-* `query.server.enable`: flag to indicate whether to start the queryable state 
server
-* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick 
random available port)
-* `query.server.network-threads`: number of network (event loop) threads for 
the `KvStateServer` (0 => #slots)
-* `query.server.query-threads`: number of asynchronous query threads for the 
`KvStateServerHandler` (0 => #slots).
+### State Server
+* `query.server.ports`: the server port range of the queryable state server. 
This is useful to avoid port clashes if more 
+   than 1 task managers run on the same machine. The specified range can be: a 
port: "9123", a range of ports: "50100-50200",
+   or a list of ranges and or points: "50100-50200,50300-50400,51234". The 
default port is 9067.
+* `query.server.network-threads`: number of network (event loop) threads 
receiving incoming requests for the state server (0 => #slots)
+* `query.server.query-threads`: number of threads handling/serving incoming 
requests for the state server (0 => #slots).
+
 
-### Client (`QueryableStateClient`)
-* `query.client.network-threads`: number of network (event loop) threads for 
the `KvStateClient` (0 => number of available cores)
-* `query.client.lookup.num-retries`: number of retries on location lookup 
failures
-* `query.client.lookup.retry-delay`: retry delay on location lookup failures 
(millis)
+### Proxy
+* `query.proxy.ports`: the server port range of the queryable state proxy. 
This is useful to avoid port clashes if more 
+  than 1 task managers run on the same machine. The specified range can be: a 
port: "9123", a range of ports: "50100-50200",
+  or a list of ranges and or points: "50100-50200,50300-50400,51234". The 
default port is 9069.
+* `query.proxy.network-threads`: number of network (event loop) threads 
receiving incoming requests for the client proxy (0 => #slots)
+* `query.proxy.query-threads`: number of threads handling/serving incoming 
requests for the client proxy (0 => #slots).
 
 ## Limitations
 
-* The queryable state life-cycle is bound to the life-cycle of the job, e.g. 
tasks register
+* The queryable state life-cycle is bound to the life-cycle of the job, *e.g.* 
tasks register
 queryable state on startup and unregister it on disposal. In future versions, 
it is desirable to
 decouple this in order to allow queries after a task finishes, and to speed up 
recovery via state
 replication.

Reply via email to