Zakelly commented on code in PR #26107:
URL: https://github.com/apache/flink/pull/26107#discussion_r1946224899


##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same
+user function.
+
+The keyed state interfaces provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java. And 
then most importantly,
+the keyed stream needs to be enabled for asynchronous state access by calling 
`enableAsyncState()`.
+The new API set are only available on `KeyedStream` with `enableAsyncState()` 
invoked.
+
+Now, we will look at the different types of state available, and then we will 
see
+how they can be used in a program. Since the synchronous APIs are identical 
with the original API,
+We only focus on the asynchronous ones here. 
+
+### The Return Values
+
+First of all, we should get familiar with the return value of those 
asynchronous state access methods.
+
+`StateFuture<T>` is a future that will be completed with the result of the 
state access.
+The return bype is T. It provides multiple methods to handle the result, 
listed as:
+* `StateFuture<Void> thenAccept(Consumer<T>)`: This method takes a `Consumer` 
that will be called with the result
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the
+  `Consumer` is done.
+* `StateFuture<R> thenApply(Function<T, R>)`: This method takes a `Function` 
that will be called with the result
+  when the state access is done. The return value of the function will be the 
result of the following
+  `StateFuture`, which will be finished when the `Function` is done.
+* `StateFuture<R> thenCompose(Function<T, StateFuture<R>>)`: This method takes 
a `Function` that will
+  be called with the result when the state access is done. The return value of 
the function should be
+  a `StateFuture<R>`, which is exposed to the invoker of `thenCompose` as a 
return value.
+  The `StateFuture<R>` will be finished when the inner `StateFuture<R>` of 
`Function` is finished.
+* `StateFuture<R> thenCombine(StateFuture<U>, BiFunction<T, U, R>)`: This 
method takes another `StateFuture` and a
+  `BiFunction` that will be called with the results of both `StateFuture`s 
when they are done. The return
+  value of the `BiFunction` will be the result of the following `StateFuture`, 
which will be finished when
+  the `BiFunction` is done.
+
+Those methods are similar to the corresponding ones of the 
`CompletableFuture`. Besides these methods,
+keep in mind that `StateFuture` does not provide a `get()` method to block the 
current thread until the
+state access is done. This is because blocking the current thread may cause 
recursive blocking. The 
+`StateFuture<T>` also provides conditional version of `thenAccept`, 
`thenApply`, `thenCompose` and `thenCombine`,
+which is for the case that the state access is done and the following logic 
will be split into two branches
+based on the result of the state access. The conditional version of those 
methods are `thenConditionallyAccept`,
+`thenConditionallyApply`, `thenConditionallyCompose` and 
`thenConditionallyCombine`.
+
+`StateIterator<T>` is an iterator that can be used to iterate over the 
elements of a state. It provides
+the following methods:
+* `boolean isEmpty()` : A synchronous method returns a `true` if the iterator 
has no elements, and `false` otherwise.
+* `StateFuture<Void> onNext(Consumer<T>)` : This method takes a `Consumer` 
that will be called with the next element
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the `Consumer` is done.
+  Also, a function version of `onNext` is provided, which is 
`StateFuture<Collection<R>> onNext(Function<T, R>)`. This method
+  takes a `Function` that will be called with the next element when the state 
access is done. The return value of the function 
+  will be collected and returned as a collection of the following 
`StateFuture`, which will be
+  finished when the `Function` is done.
+
+We also provide a `StateFutureUtils` class that contains some utility methods 
to handle `StateFuture`s.
+These methods are:
+* `StateFuture<T> completedFuture(T)`: This method returns a completed 
`StateFuture` with the given value. This is
+useful when you want to return a constant value in a `thenCompose` method for 
further processing.
+* `StateFuture<Void> completedVoidFuture()`: This method returns a completed 
`StateFuture` with `null` value. 
+A void value version of `completedFuture`.
+* `StateFuture<Collection<T>> combineAll(Collection<StateFuture<T>>)` : This 
method takes a collection of `StateFuture`s
+and returns a `StateFuture` that will be completed when all the input 
`StateFuture`s are completed. The result of the
+returned `StateFuture` is a collection of the results of the input 
`StateFuture`s. This is useful when you want to
+combine the results of multiple `StateFuture`s.
+* `StateFuture<Iterable<T>> toIterable(StateFuture<StateIterator<T>>)` : This 
method takes a `StateFuture` of `StateIterator`
+and returns a `StateFuture` of `Iterable`. The result of the returned 
`StateFuture` is an `Iterable` that contains all
+the elements of the `StateIterator`. This is useful when you want to convert a 
`StateIterator` to an `Iterable`.
+There is no good reason to do so, since this may disable the capability of 
lazy loading. Only useful when the further 
+calculation depends on the whole data from the iterator.
+
+
+### State Primitives
+
+The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there 
will possibly be one value
+for each key that the operation sees). The value can be set using 
`asyncUpdate(T)` and retrieved using
+`StateFuture<T> asyncValue()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `StateIterator`
+over all currently stored elements. Elements are added using `asyncAdd(T)` or 
`asyncAddAll(List<T>)`,
+the Iterable can be retrieved using `StateFuture<StateIterator<T>> asyncGet()`.
+You can also override the existing list with `asyncUpdate(List<T>)`
+
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is similar to `ListState` but elements added 
using
+`asyncAdd(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+* `AggregatingState<IN, OUT>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
+of elements that are added to the state. The interface is the same as for 
`ListState` but elements
+added using `asyncAdd(IN)` are aggregated using a specified 
`AggregateFunction`.
+
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
+retrieve an `StateIterator` over all currently stored mappings. Mappings are 
added using `asyncPut(UK, UV)` or
+`asyncPutAll(Map<UK, UV>)`. The value associated with a user key can be 
retrieved using `asyncGet(UK)`.
+The iterable views for mappings, keys and values can be retrieved using 
`asyncEntries()`,
+`asyncKeys()` and `asyncValues()` respectively. You can also use 
`asyncIsEmpty()` to check whether
+this map contains any key-value mappings.
+
+All types of state also have a method `asyncClear()` that clears the state for 
the currently
+active key, i.e. the key of the input element.
+
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one 
invocation of your
+user function can differ from the value in another invocation if the keys 
involved are different.
+
+To get a state handle, you have to create a `StateDescriptor`. This holds the 
name of the state
+(as we will see later, you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds, and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`, 
+an `AggregatingStateDescriptor`, a `ReducingStateDescriptor`, or a 
`MapStateDescriptor`.
+To differentiate between the previous state APIs, you should use the 
`StateDescriptor`s under the
+`org.apache.flink.api.common.state.v2` package (note the **v2**).
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) for
+information about that, but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+* `AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
+
+{{< tabs "348fd48c-fa36-4d27-9e53-9bffb74d8a87" >}}
+{{< tab "Java" >}}
+```java
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // access the state value
+        sum.asyncValue().thenApply(currentSum -> {
+            // if it hasn't been used before, it will be null
+            Tuple2<Long, Long> current = currentSum == null ? Tuple2.of(0L, 
0L) : currentSum;
+
+            // update the count
+            current.f0 += 1;
+
+            // add the second field of the input value
+            current.f1 += input.f1;
+
+            return current;
+        }).thenAccept(r -> {
+            // if the count reaches 2, emit the average and clear the state
+            if (r.f0 >= 2) {
+                out.collect(Tuple2.of(input.f0, r.f1 / r.f0));
+                sum.asyncClear();
+            } else {
+                sum.asyncUpdate(r);
+            }
+        });
+    }
+
+    @Override
+    public void open(OpenContext ctx) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{})); // type information
+        sum = getRuntimeContext().getState(descriptor);

Review Comment:
   I thought this is a bug. I opened #26120 to fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to