This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2f1a6b0aa3eeae973e8a6124bef888f223bf43ae Author: PengFei Li <[email protected]> AuthorDate: Thu May 14 10:40:33 2020 +0800 [FLINK-16076][docs-zh] Translate "Queryable State" page into Chinese This closes #12139. --- docs/dev/stream/state/queryable_state.zh.md | 197 +++++++++++----------------- 1 file changed, 75 insertions(+), 122 deletions(-) diff --git a/docs/dev/stream/state/queryable_state.zh.md b/docs/dev/stream/state/queryable_state.zh.md index 3c14c7e..1d62efd 100644 --- a/docs/dev/stream/state/queryable_state.zh.md +++ b/docs/dev/stream/state/queryable_state.zh.md @@ -1,5 +1,5 @@ --- -title: "可查询状态" +title: "Queryable State" nav-parent_id: streaming_state nav-pos: 4 is_beta: true @@ -27,75 +27,52 @@ under the License. {:toc} <div class="alert alert-warning"> - <strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and - there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is - likely that there will be breaking API changes on the client side in the upcoming Flink versions. + <strong>注意:</strong> 目前 querable state 的客户端 API 还在不断演进,<strong>不保证</strong>现有接口的稳定性。在后续的 Flink 版本中有可能发生 API 变化。 </div> -In a nutshell, this feature exposes Flink's managed keyed (partitioned) state -(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and -allows the user to query a job's state from outside Flink. For some scenarios, queryable state -eliminates the need for distributed operations/transactions with external systems such as key-value -stores which are often the bottleneck in practice. In addition, this feature may be particularly -useful for debugging purposes. +简而言之, 这个特性将 Flink 的 managed keyed (partitioned) state +(参考 [Working with State]({{ site.baseurl }}/zh/dev/stream/state/state.html)) 暴露给外部,从而用户可以在 Flink 外部查询作业 state。 +在某些场景中,Queryable State 消除了对外部系统的分布式操作以及事务的需求,比如 KV 存储系统,而这些外部系统往往会成为瓶颈。除此之外,这个特性对于调试作业非常有用。 <div class="alert alert-warning"> - <strong>Attention:</strong> When querying a state object, that object is accessed from a concurrent - thread without any synchronization or copying. This is a design choice, as any of the above would lead - to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, - <i>e.g.</i> <code>MemoryStateBackend</code> or <code>FsStateBackend</code>, does not work - with copies when retrieving values but instead directly references the stored values, read-modify-write - patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications. - The <code>RocksDBStateBackend</code> is safe from these issues. + <strong>注意:</strong> 进行查询时,state 会在并发线程中被访问,但 state 不会进行同步和拷贝。这种设计是为了避免同步和拷贝带来的作业延时。对于使用 Java 堆内存的 state backend, + <i>比如</i> <code>MemoryStateBackend</code> 或者 <code>FsStateBackend</code>,它们获取状态时不会进行拷贝,而是直接引用状态对象,所以对状态的 read-modify-write 是不安全的,并且可能会因为并发修改导致查询失败。但 <code>RocksDBStateBackend</code> 是安全的,不会遇到上述问题。 </div> -## Architecture +## 架构 -Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it. -The Queryable State feature consists of three main entities: +在展示如何使用 Queryable State 之前,先简单描述一下该特性的组成部分,主要包括以下三部分: - 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, - 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible - for receiving the client's queries, fetching the requested state from the responsible Task Manager on his behalf, and - returning it to the client, and - 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state. + 1. `QueryableStateClient`,默认运行在 Flink 集群外部,负责提交用户的查询请求; + 2. `QueryableStateClientProxy`,运行在每个 `TaskManager` 上(*即* Flink 集群内部),负责接收客户端的查询请求,从所负责的 Task Manager 获取请求的 state,并返回给客户端; + 3. `QueryableStateServer`, 运行在 `TaskManager` 上,负责服务本地存储的 state。 -The client connects to one of the proxies and sends a request for the state associated with a specific -key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in -*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is -responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will -then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the -response back to the client. +客户端连接到一个代理,并发送请求获取特定 `k` 对应的 state。 如 [Working with State]({{ site.baseurl }}/zh/dev/stream/state/state.html) 所述,keyed state 按照 +*Key Groups* 进行划分,每个 `TaskManager` 会分配其中的一些 key groups。代理会询问 `JobManager` 以找到 `k` 所属 key group 的 TaskManager。根据返回的结果, 代理将会向运行在 `TaskManager` 上的 `QueryableStateServer` 查询 `k` 对应的 state, 并将结果返回给客户端。 -## Activating Queryable State +## 激活 Queryable State -To enable queryable state on your Flink cluster, you need to do the following: +为了在 Flink 集群上使用 queryable state,需要进行以下操作: - 1. copy the `flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` -from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), -to the `lib/` folder. - 2. set the property `queryable-state.enable` to `true`. See the [Configuration]({{ site.baseurl }}/ops/config.html#queryable-state) documentation for details and additional parameters. + 1. 将 `flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` +从 [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads") 的 `opt/` 目录拷贝到 `lib/` 目录; + 2. 将参数 `queryable-state.enable` 设置为 `true`。详细信息以及其它配置可参考文档 [Configuration]({{ site.baseurl }}/zh/ops/config.html#queryable-state)。 -To verify that your cluster is running with queryable state enabled, check the logs of any -task manager for the line: `"Started the Queryable State Proxy Server @ ..."`. +为了验证集群的 queryable state 已经被激活,可以检查任意 task manager 的日志中是否包含 "Started the Queryable State Proxy Server @ ..."。 -## Making State Queryable +## 将 state 设置为可查询的 -Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to -be visible to the outside world, it needs to be explicitly made queryable by using: +激活集群的 queryable state 功能后,还要将 state 设置为可查询的才能对外可见,可以通过以下两种方式进行设置: -* either a `QueryableStateStream`, a convenience object which acts as a sink and offers its incoming values as queryable -state, or -* the `stateDescriptor.setQueryable(String queryableStateName)` method, which makes the keyed state represented by the - state descriptor, queryable. +* 创建 `QueryableStateStream`,它会作为一个 sink,并将输入数据转化为 queryable state; +* 通过 `stateDescriptor.setQueryable(String queryableStateName)` 将 state 描述符所表示的 keyed state 设置成可查询的。 -The following sections explain the use of these two approaches. +接下来的部分将详细解释这两种方式。 ### Queryable State Stream -Calling `.asQueryableState(stateName, stateDescriptor)` on a `KeyedStream` returns a `QueryableStateStream` which offers -its values as queryable state. Depending on the type of state, there are the following variants of the `asQueryableState()` -method: +在 `KeyedStream` 上调用 `.asQueryableState(stateName, stateDescriptor)` 将会返回一个 `QueryableStateStream`, 它会将流数据转化为 queryable state。 +对应不同的 state 类型,`asQueryableState()` 有以下一些方法变体: {% highlight java %} // ValueState @@ -119,28 +96,23 @@ QueryableStateStream asQueryableState( <div class="alert alert-info"> - <strong>Note:</strong> There is no queryable <code>ListState</code> sink as it would result in an ever-growing - list which may not be cleaned up and thus will eventually consume too much memory. + <strong>注意:</strong> 没有可查询的 <code>ListState</code> sink,因为这种情况下 list 会不断增长,并且可能不会被清理,最终会消耗大量的内存。 </div> -The returned `QueryableStateStream` can be seen as a sink and **cannot** be further transformed. Internally, a -`QueryableStateStream` gets translated to an operator which uses all incoming records to update the queryable state -instance. The updating logic is implied by the type of the `StateDescriptor` provided in the `asQueryableState` call. -In a program like the following, all records of the keyed stream will be used to update the state instance via the -`ValueState.update(value)`: +返回的 `QueryableStateStream` 可以被视作一个sink,而且**不能再**被进一步转换。在内部实现上,一个 `QueryableStateStream` 被转换成一个 operator,使用输入的数据来更新 queryable state。state 如何更新是由 `asQueryableState` 提供的 `StateDescriptor` 来决定的。在下面的代码中, keyed stream 的所有数据将会通过 `ValueState.update(value)` 来更新状态: {% highlight java %} stream.keyBy(0).asQueryableState("query-name") {% endhighlight %} -This acts like the Scala API's `flatMapWithState`. +这个行为类似于 Scala API 中的 `flatMapWithState`。 ### Managed Keyed State -Managed keyed state of an operator -(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state)) -can be made queryable by making the appropriate state descriptor queryable via -`StateDescriptor.setQueryable(String queryableStateName)`, as in the example below: +operator 中的 Managed keyed state +(参考 [Using Managed Keyed State]({{ site.baseurl }}/zh/dev/stream/state/state.html#using-managed-keyed-state)) +可以通过 `StateDescriptor.setQueryable(String queryableStateName)` 将 state descriptor 设置成可查询的,从而使 state 可查询,如下所示: + {% highlight java %} ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( @@ -150,20 +122,17 @@ descriptor.setQueryable("query-name"); // queryable state name {% endhighlight %} <div class="alert alert-info"> - <strong>Note:</strong> The <code>queryableStateName</code> parameter may be chosen arbitrarily and is only - used for queries. It does not have to be identical to the state's own name. + <strong>注意:</strong> 参数 <code>queryableStateName</code> 可以任意选取,并且只被用来进行查询,它可以和 state 的名称不同。 </div> -This variant has no limitations as to which type of state can be made queryable. This means that this can be used for -any `ValueState`, `ReduceState`, `ListState`, `MapState`, `AggregatingState`, and the currently deprecated `FoldingState`. +这种方式不会限制 state 类型,即任意的 `ValueState`、`ReduceState`、`ListState`、`MapState`、`AggregatingState` 以及已弃用的 `FoldingState` +均可作为 queryable state。 -## Querying State +## 查询 state -So far, you have set up your cluster to run with queryable state and you have declared (some of) your state as -queryable. Now it is time to see how to query this state. +目前为止,你已经激活了集群的 queryable state 功能,并且将一些 state 设置成了可查询的,接下来将会展示如何进行查询。 -For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` -jar which must be explicitly included as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below: +为了进行查询,可以使用辅助类 `QueryableStateClient`,这个类位于 `flink-queryable-state-client` 的 jar 中,在项目的 `pom.xml` 需要显示添加对 `flink-queryable-state-client` 和 `flink-core` 的依赖, 如下所示: <div data-lang="java" markdown="1"> {% highlight xml %} @@ -180,18 +149,16 @@ jar which must be explicitly included as a dependency in the `pom.xml` of your p {% endhighlight %} </div> -For more on this, you can check how to [set up a Flink program]({{ site.baseurl }}/dev/projectsetup/dependencies.html). +关于依赖的更多信息, 可以参考如何 [配置 Flink 项目]({{ site.baseurl }}/zh/dev/projectsetup/dependencies.html). -The `QueryableStateClient` will submit your query to the internal proxy, which will then process your query and return -the final result. The only requirement to initialize the client is to provide a valid `TaskManager` hostname (remember -that there is a queryable state proxy running on each task manager) and the port where the proxy listens. More on how -to configure the proxy and state server port(s) in the [Configuration Section](#configuration). +`QueryableStateClient` 将提交你的请求到内部代理,代理会处理请求并返回结果。客户端的初始化只需要提供一个有效的 `TaskManager` 主机名 +(每个 task manager 上都运行着一个 queryable state 代理),以及代理监听的端口号。关于如何配置代理以及端口号可以参考 [Configuration Section](#configuration). {% highlight java %} QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); {% endhighlight %} -With the client ready, to query a state of type `V`, associated with a key of type `K`, you can use the method: +客户端就绪后,为了查询类型为 `K` 的 key,以及类型为 `V` 的state,可以使用如下方法: {% highlight java %} CompletableFuture<S> getKvState( @@ -202,35 +169,29 @@ CompletableFuture<S> getKvState( StateDescriptor<S, V> stateDescriptor) {% endhighlight %} -The above returns a `CompletableFuture` eventually holding the state value for the queryable state instance identified -by `queryableStateName` of the job with ID `jobID`. The `key` is the key whose state you are interested in and the -`keyTypeInfo` will tell Flink how to serialize/deserialize it. Finally, the `stateDescriptor` contains the necessary -information about the requested state, namely its type (`Value`, `Reduce`, etc) and the necessary information on how -to serialize/deserialize it. +该方法会返回一个最终将包含 state 的 queryable state 实例,该实例可通过 JobID 和 queryableStateName 识别。在方法参数中,`key` 用来指定所要查询的状态所属的 key。 +`keyTypeInfo` 告诉 Flink 如何对 key 进行序列化和反序列化。`stateDescriptor` 包含了所请求 state 的必要信息,即 state 类型(`Value`,`Reduce` 等等), +以及如何对其进行序列化和反序列。 -The careful reader will notice that the returned future contains a value of type `S`, *i.e.* a `State` object containing -the actual value. This can be any of the state types supported by Flink: `ValueState`, `ReduceState`, `ListState`, `MapState`, -`AggregatingState`, and the currently deprecated `FoldingState`. +细心的读者会注意到返回的 future 包含类型为 `S` 的值,*即*一个存储实际值的 `State` 对象。它可以是Flink支持的任何类型的 state:`ValueState`、`ReduceState`、 +`ListState`、`MapState`、`AggregatingState` 以及弃用的 `FoldingState`。 <div class="alert alert-info"> - <strong>Note:</strong> These state objects do not allow modifications to the contained state. You can use them to get - the actual value of the state, <i>e.g.</i> using <code>valueState.get()</code>, or iterate over - the contained <code><K, V></code> entries, <i>e.g.</i> using the <code>mapState.entries()</code>, but you cannot - modify them. As an example, calling the <code>add()</code> method on a returned list state will throw an - <code>UnsupportedOperationException</code>. + <strong>注意:</strong> 这些 state 对象不允许对其中的 state 进行修改。你可以通过 <code>valueState.get()</code> 获取实际的 state, + 或者通过 <code>mapState.entries()</code> 遍历所有 <code><K, V></code>,但是不能修改它们。举例来说,对返回的 list state 调用 <code>add()</code> + 方法将会导致 <code>UnsupportedOperationException</code>。 </div> <div class="alert alert-info"> - <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs - to be shutdown via <code>QueryableStateClient.shutdown()</code> when unused in order to free - resources. + <strong>注意:</strong> 客户端是异步的,并且可能被多个线程共享。客户端不再使用后需要通过 <code>QueryableStateClient.shutdown()</code> + 来终止,从而释放资源。 </div> -### Example +### 示例 -The following example extends the `CountWindowAverage` example -(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state)) -by making it queryable and shows how to query this value: +下面的例子扩展自 `CountWindowAverage` +(参考 [Using Managed Keyed State]({{ site.baseurl }}/zh/dev/stream/state/state.html#using-managed-keyed-state)), +将其中的 state 设置成可查询的,并展示了如何进行查询: {% highlight java %} public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { @@ -262,7 +223,7 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, } {% endhighlight %} -Once used in a job, you can retrieve the job ID and then query any key's current state from this operator: +上面的代码作为作业运行后,可以获取作业的 ID,然后可以通过下面的方式查询任何 key 下的 state。 {% highlight java %} QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort); @@ -288,34 +249,26 @@ resultFuture.thenAccept(response -> { ## Configuration -The following configuration parameters influence the behaviour of the queryable state server and client. -They are defined in `QueryableStateOptions`. +下面的配置会影响 queryable state 服务器端和客户端的行为,它们定义在 `QueryableStateOptions`。 ### State Server -* `queryable-state.server.ports`: the server port range of the queryable state server. This is useful to avoid port clashes if more - than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200", - or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9067. -* `queryable-state.server.network-threads`: number of network (event loop) threads receiving incoming requests for the state server (0 => #slots) -* `queryable-state.server.query-threads`: number of threads handling/serving incoming requests for the state server (0 => #slots). +* `queryable-state.server.ports`: 服务器端口范围,如果同一台机器上运行了多个 task manager,可以避免端口冲突。指定的可以是一个具体的端口号,如 "9123", + 可以是一个端口范围,如 "50100-50200",或者可以是端口范围以及端口号的组合,如 "50100-50200,50300-50400,51234"。默认端口号是 9067。 +* `queryable-state.server.network-threads`: 服务器端 network (event loop) thread 的数量,用来接收查询请求 (如果设置为0,则线程数为 slot 数)。 +* `queryable-state.server.query-threads`: 服务器端处理查询请求的线程数 (如果设置为0,则线程数为 slot 数)。 ### Proxy -* `queryable-state.proxy.ports`: the server port range of the queryable state proxy. This is useful to avoid port clashes if more - than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200", - or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9069. -* `queryable-state.proxy.network-threads`: number of network (event loop) threads receiving incoming requests for the client proxy (0 => #slots) -* `queryable-state.proxy.query-threads`: number of threads handling/serving incoming requests for the client proxy (0 => #slots). - -## Limitations - -* The queryable state life-cycle is bound to the life-cycle of the job, *e.g.* tasks register -queryable state on startup and unregister it on disposal. In future versions, it is desirable to -decouple this in order to allow queries after a task finishes, and to speed up recovery via state -replication. -* Notifications about available KvState happen via a simple tell. In the future this should be improved to be -more robust with asks and acknowledgements. -* The server and client keep track of statistics for queries. These are currently disabled by -default as they would not be exposed anywhere. As soon as there is better support to publish these -numbers via the Metrics system, we should enable the stats. +* `queryable-state.proxy.ports`: 代理的服务端口范围。如果同一台机器上运行了多个 task manager,可以避免端口冲突。指定的可以是一个具体的端口号,如 "9123", + 可以是一个端口范围,如"50100-50200",或者可以是端口范围以及端口号的组合,如 "50100-50200,50300-50400,51234"。默认端口号是 9069。 +* `queryable-state.proxy.network-threads`: 代理上 network (event loop) thread 的数量,用来接收查询请求 (如果设置为0,则线程数为 slot 数)。 +* `queryable-state.proxy.query-threads`: 代理上处理查询请求的线程数 (如果设置为0,则线程数为 slot 数)。 + +## 限制 + +* queryable state 的生命周期受限于作业的生命周期,*比如* tasks 在启动时注册可查询状态,并在退出时注销。在后续版本中,希望能够将其解耦 +从而允许 task 结束后依然能够查询 state,并且通过 state 备份来加速恢复。 +* 目前是通过 tell 来通知可用的 KvState。将来会使用 asks 和 acknowledgements 来提升稳定性。 +* 服务器端和客户端会记录请求的统计信息。因为统计信息目前不会暴露给外部,所以这个功能默认没有开启。如果将来支持通过 Metrics 系统发布这些数据,将开启统计功能。 {% top %}
