fredia commented on code in PR #26107:
URL: https://github.com/apache/flink/pull/26107#discussion_r1945875237


##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {

Review Comment:
   public class WordWithCount ?



##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same
+user function.
+
+The keyed state interfaces provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java. And 
then most importantly,
+the keyed stream needs to be enabled for asynchronous state access by calling 
`enableAsyncState()`.
+The new API set are only available on `KeyedStream` with `enableAsyncState()` 
invoked.
+
+Now, we will look at the different types of state available, and then we will 
see
+how they can be used in a program. Since the synchronous APIs are identical 
with the original API,

Review Comment:
   How about unifying `APIs` and `API`?



##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same
+user function.
+
+The keyed state interfaces provides access to different types of state that 
are all scoped to

Review Comment:
   ```suggestion
   The keyed state interfaces provide access to different types of state that 
are all scoped to
   ```



##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same
+user function.
+
+The keyed state interfaces provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java. And 
then most importantly,
+the keyed stream needs to be enabled for asynchronous state access by calling 
`enableAsyncState()`.
+The new API set are only available on `KeyedStream` with `enableAsyncState()` 
invoked.
+
+Now, we will look at the different types of state available, and then we will 
see
+how they can be used in a program. Since the synchronous APIs are identical 
with the original API,
+We only focus on the asynchronous ones here. 
+
+### The Return Values
+
+First of all, we should get familiar with the return value of those 
asynchronous state access methods.
+
+`StateFuture<T>` is a future that will be completed with the result of the 
state access.
+The return bype is T. It provides multiple methods to handle the result, 
listed as:
+* `StateFuture<Void> thenAccept(Consumer<T>)`: This method takes a `Consumer` 
that will be called with the result
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the
+  `Consumer` is done.
+* `StateFuture<R> thenApply(Function<T, R>)`: This method takes a `Function` 
that will be called with the result
+  when the state access is done. The return value of the function will be the 
result of the following
+  `StateFuture`, which will be finished when the `Function` is done.
+* `StateFuture<R> thenCompose(Function<T, StateFuture<R>>)`: This method takes 
a `Function` that will
+  be called with the result when the state access is done. The return value of 
the function should be
+  a `StateFuture<R>`, which is exposed to the invoker of `thenCompose` as a 
return value.
+  The `StateFuture<R>` will be finished when the inner `StateFuture<R>` of 
`Function` is finished.
+* `StateFuture<R> thenCombine(StateFuture<U>, BiFunction<T, U, R>)`: This 
method takes another `StateFuture` and a
+  `BiFunction` that will be called with the results of both `StateFuture`s 
when they are done. The return
+  value of the `BiFunction` will be the result of the following `StateFuture`, 
which will be finished when
+  the `BiFunction` is done.
+
+Those methods are similar to the corresponding ones of the 
`CompletableFuture`. Besides these methods,
+keep in mind that `StateFuture` does not provide a `get()` method to block the 
current thread until the
+state access is done. This is because blocking the current thread may cause 
recursive blocking. The 
+`StateFuture<T>` also provides conditional version of `thenAccept`, 
`thenApply`, `thenCompose` and `thenCombine`,
+which is for the case that the state access is done and the following logic 
will be split into two branches
+based on the result of the state access. The conditional version of those 
methods are `thenConditionallyAccept`,
+`thenConditionallyApply`, `thenConditionallyCompose` and 
`thenConditionallyCombine`.
+
+`StateIterator<T>` is an iterator that can be used to iterate over the 
elements of a state. It provides
+the following methods:
+* `boolean isEmpty()` : A synchronous method returns a `true` if the iterator 
has no elements, and `false` otherwise.
+* `StateFuture<Void> onNext(Consumer<T>)` : This method takes a `Consumer` 
that will be called with the next element
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the `Consumer` is done.
+  Also, a function version of `onNext` is provided, which is 
`StateFuture<Collection<R>> onNext(Function<T, R>)`. This method
+  takes a `Function` that will be called with the next element when the state 
access is done. The return value of the function 
+  will be collected and returned as a collection of the following 
`StateFuture`, which will be
+  finished when the `Function` is done.
+
+We also provide a `StateFutureUtils` class that contains some utility methods 
to handle `StateFuture`s.
+These methods are:
+* `StateFuture<T> completedFuture(T)`: This method returns a completed 
`StateFuture` with the given value. This is
+useful when you want to return a constant value in a `thenCompose` method for 
further processing.
+* `StateFuture<Void> completedVoidFuture()`: This method returns a completed 
`StateFuture` with `null` value. 
+A void value version of `completedFuture`.
+* `StateFuture<Collection<T>> combineAll(Collection<StateFuture<T>>)` : This 
method takes a collection of `StateFuture`s
+and returns a `StateFuture` that will be completed when all the input 
`StateFuture`s are completed. The result of the
+returned `StateFuture` is a collection of the results of the input 
`StateFuture`s. This is useful when you want to
+combine the results of multiple `StateFuture`s.
+* `StateFuture<Iterable<T>> toIterable(StateFuture<StateIterator<T>>)` : This 
method takes a `StateFuture` of `StateIterator`
+and returns a `StateFuture` of `Iterable`. The result of the returned 
`StateFuture` is an `Iterable` that contains all
+the elements of the `StateIterator`. This is useful when you want to convert a 
`StateIterator` to an `Iterable`.
+There is no good reason to do so, since this may disable the capability of 
lazy loading. Only useful when the further 
+calculation depends on the whole data from the iterator.
+
+
+### State Primitives
+
+The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there 
will possibly be one value
+for each key that the operation sees). The value can be set using 
`asyncUpdate(T)` and retrieved using
+`StateFuture<T> asyncValue()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `StateIterator`
+over all currently stored elements. Elements are added using `asyncAdd(T)` or 
`asyncAddAll(List<T>)`,
+the Iterable can be retrieved using `StateFuture<StateIterator<T>> asyncGet()`.
+You can also override the existing list with `asyncUpdate(List<T>)`
+
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is similar to `ListState` but elements added 
using
+`asyncAdd(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+* `AggregatingState<IN, OUT>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
+of elements that are added to the state. The interface is the same as for 
`ListState` but elements
+added using `asyncAdd(IN)` are aggregated using a specified 
`AggregateFunction`.
+
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
+retrieve an `StateIterator` over all currently stored mappings. Mappings are 
added using `asyncPut(UK, UV)` or
+`asyncPutAll(Map<UK, UV>)`. The value associated with a user key can be 
retrieved using `asyncGet(UK)`.
+The iterable views for mappings, keys and values can be retrieved using 
`asyncEntries()`,
+`asyncKeys()` and `asyncValues()` respectively. You can also use 
`asyncIsEmpty()` to check whether
+this map contains any key-value mappings.
+
+All types of state also have a method `asyncClear()` that clears the state for 
the currently
+active key, i.e. the key of the input element.
+
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one 
invocation of your
+user function can differ from the value in another invocation if the keys 
involved are different.
+
+To get a state handle, you have to create a `StateDescriptor`. This holds the 
name of the state
+(as we will see later, you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds, and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`, 
+an `AggregatingStateDescriptor`, a `ReducingStateDescriptor`, or a 
`MapStateDescriptor`.
+To differentiate between the previous state APIs, you should use the 
`StateDescriptor`s under the
+`org.apache.flink.api.common.state.v2` package (note the **v2**).
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) for
+information about that, but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+* `AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
+
+{{< tabs "348fd48c-fa36-4d27-9e53-9bffb74d8a87" >}}
+{{< tab "Java" >}}
+```java
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // access the state value
+        sum.asyncValue().thenApply(currentSum -> {
+            // if it hasn't been used before, it will be null
+            Tuple2<Long, Long> current = currentSum == null ? Tuple2.of(0L, 
0L) : currentSum;
+
+            // update the count
+            current.f0 += 1;
+
+            // add the second field of the input value
+            current.f1 += input.f1;
+
+            return current;
+        }).thenAccept(r -> {
+            // if the count reaches 2, emit the average and clear the state
+            if (r.f0 >= 2) {
+                out.collect(Tuple2.of(input.f0, r.f1 / r.f0));
+                sum.asyncClear();
+            } else {
+                sum.asyncUpdate(r);
+            }
+        });
+    }
+
+    @Override
+    public void open(OpenContext ctx) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{})); // type information
+        sum = getRuntimeContext().getState(descriptor);

Review Comment:
   Should we convert `getRuntimeContext()` to `StreamingRuntimeContext`?



##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same

Review Comment:
   nit: It is highly not recommended -> It is highly **not** recommended
   
   emphasize not?



##########
docs/content.zh/docs/dev/datastream/fault-tolerance/state_v2.md:
##########
@@ -0,0 +1,601 @@
+---
+title: "Working with State V2"
+weight: 2
+type: docs
+aliases:
+  - /dev/stream/state/state_v2.html
+  - /apis/streaming/state_v2.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Working with State V2 (New APIs)
+
+In this section you will learn about the new APIs that Flink provides for 
writing
+stateful programs. Please take a look at [Stateful Stream
+Processing]({{< ref "docs/concepts/stateful-stream-processing" >}})
+to learn about the concepts behind stateful stream processing.
+
+The new state API is designed to be more flexible than the previous API. User 
can perform
+asynchronous state operations, thus making it more powerful and more efficient.
+The asynchronous state access is essential for the state backend to be able to 
handle
+large state sizes and to be able to spill to remote file systems when 
necessary. 
+
+## Keyed DataStream
+
+If you want to use keyed state, you first need to specify a key on a
+`DataStream` that should be used to partition the state (and also the records
+in the stream themselves). You can specify a key using `keyBy(KeySelector)`
+in Java API on a `DataStream`. This will yield a `KeyedStream`, which then 
allows operations
+that use keyed state. You should perform `enableAsyncState()` on the 
`KeyedStream` to enable
+asynchronous state operations.
+
+A key selector function takes a single record as input and returns the key for
+that record. The key can be of any type and **must** be derived from
+deterministic computations.
+
+The data model of Flink is not based on key-value pairs. Therefore, you do not
+need to physically pack the data set types into keys and values. Keys are
+"virtual": they are defined as functions over the actual data to guide the
+grouping operator.
+
+The following example shows a key selector function that simply returns the
+field of an object:
+
+{{< tabs "54e3bde7-659a-4683-81fa-312a2c81036b" >}}
+{{< tab "Java" >}}
+```java
+// some ordinary POJO
+public class WC {
+  public String word;
+  public int count;
+
+  public String getWord() { return word; }
+}
+DataStream<WC> words = // [...]
+KeyedStream<WC> keyed = words
+  .keyBy(WC::getWord).enableAsyncState();
+```
+{{< /tabs >}}
+
+{{< top >}}
+
+## Using Keyed State V2
+
+Unlike the previous state API, the new state API is designed for asynchronous 
state access.
+Each type of state gives two versions of the API: synchronous and 
asynchronous. The synchronous
+API is a blocking API that waits for the state access to complete. The 
asynchronous API is
+non-blocking and returns a `StateFuture` that will be completed when the state 
access
+is done. After that a callback or following logic will be invoked (if any).
+The asynchronous API is more efficient and should be used whenever possible.
+It is highly not recommended to mixed synchronous and asynchronous state 
access in the same
+user function.
+
+The keyed state interfaces provides access to different types of state that 
are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)` in Java. And 
then most importantly,
+the keyed stream needs to be enabled for asynchronous state access by calling 
`enableAsyncState()`.
+The new API set are only available on `KeyedStream` with `enableAsyncState()` 
invoked.
+
+Now, we will look at the different types of state available, and then we will 
see
+how they can be used in a program. Since the synchronous APIs are identical 
with the original API,
+We only focus on the asynchronous ones here. 
+
+### The Return Values
+
+First of all, we should get familiar with the return value of those 
asynchronous state access methods.
+
+`StateFuture<T>` is a future that will be completed with the result of the 
state access.
+The return bype is T. It provides multiple methods to handle the result, 
listed as:
+* `StateFuture<Void> thenAccept(Consumer<T>)`: This method takes a `Consumer` 
that will be called with the result
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the
+  `Consumer` is done.
+* `StateFuture<R> thenApply(Function<T, R>)`: This method takes a `Function` 
that will be called with the result
+  when the state access is done. The return value of the function will be the 
result of the following
+  `StateFuture`, which will be finished when the `Function` is done.
+* `StateFuture<R> thenCompose(Function<T, StateFuture<R>>)`: This method takes 
a `Function` that will
+  be called with the result when the state access is done. The return value of 
the function should be
+  a `StateFuture<R>`, which is exposed to the invoker of `thenCompose` as a 
return value.
+  The `StateFuture<R>` will be finished when the inner `StateFuture<R>` of 
`Function` is finished.
+* `StateFuture<R> thenCombine(StateFuture<U>, BiFunction<T, U, R>)`: This 
method takes another `StateFuture` and a
+  `BiFunction` that will be called with the results of both `StateFuture`s 
when they are done. The return
+  value of the `BiFunction` will be the result of the following `StateFuture`, 
which will be finished when
+  the `BiFunction` is done.
+
+Those methods are similar to the corresponding ones of the 
`CompletableFuture`. Besides these methods,
+keep in mind that `StateFuture` does not provide a `get()` method to block the 
current thread until the
+state access is done. This is because blocking the current thread may cause 
recursive blocking. The 
+`StateFuture<T>` also provides conditional version of `thenAccept`, 
`thenApply`, `thenCompose` and `thenCombine`,
+which is for the case that the state access is done and the following logic 
will be split into two branches
+based on the result of the state access. The conditional version of those 
methods are `thenConditionallyAccept`,
+`thenConditionallyApply`, `thenConditionallyCompose` and 
`thenConditionallyCombine`.
+
+`StateIterator<T>` is an iterator that can be used to iterate over the 
elements of a state. It provides
+the following methods:
+* `boolean isEmpty()` : A synchronous method returns a `true` if the iterator 
has no elements, and `false` otherwise.
+* `StateFuture<Void> onNext(Consumer<T>)` : This method takes a `Consumer` 
that will be called with the next element
+  when the state access is done. It returns a `StateFuture<Void>`, which will 
be finished when the `Consumer` is done.
+  Also, a function version of `onNext` is provided, which is 
`StateFuture<Collection<R>> onNext(Function<T, R>)`. This method
+  takes a `Function` that will be called with the next element when the state 
access is done. The return value of the function 
+  will be collected and returned as a collection of the following 
`StateFuture`, which will be
+  finished when the `Function` is done.
+
+We also provide a `StateFutureUtils` class that contains some utility methods 
to handle `StateFuture`s.
+These methods are:
+* `StateFuture<T> completedFuture(T)`: This method returns a completed 
`StateFuture` with the given value. This is
+useful when you want to return a constant value in a `thenCompose` method for 
further processing.
+* `StateFuture<Void> completedVoidFuture()`: This method returns a completed 
`StateFuture` with `null` value. 
+A void value version of `completedFuture`.
+* `StateFuture<Collection<T>> combineAll(Collection<StateFuture<T>>)` : This 
method takes a collection of `StateFuture`s
+and returns a `StateFuture` that will be completed when all the input 
`StateFuture`s are completed. The result of the
+returned `StateFuture` is a collection of the results of the input 
`StateFuture`s. This is useful when you want to
+combine the results of multiple `StateFuture`s.
+* `StateFuture<Iterable<T>> toIterable(StateFuture<StateIterator<T>>)` : This 
method takes a `StateFuture` of `StateIterator`
+and returns a `StateFuture` of `Iterable`. The result of the returned 
`StateFuture` is an `Iterable` that contains all
+the elements of the `StateIterator`. This is useful when you want to convert a 
`StateIterator` to an `Iterable`.
+There is no good reason to do so, since this may disable the capability of 
lazy loading. Only useful when the further 
+calculation depends on the whole data from the iterator.
+
+
+### State Primitives
+
+The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there 
will possibly be one value
+for each key that the operation sees). The value can be set using 
`asyncUpdate(T)` and retrieved using
+`StateFuture<T> asyncValue()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `StateIterator`
+over all currently stored elements. Elements are added using `asyncAdd(T)` or 
`asyncAddAll(List<T>)`,
+the Iterable can be retrieved using `StateFuture<StateIterator<T>> asyncGet()`.
+You can also override the existing list with `asyncUpdate(List<T>)`
+
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is similar to `ListState` but elements added 
using
+`asyncAdd(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+* `AggregatingState<IN, OUT>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
+of elements that are added to the state. The interface is the same as for 
`ListState` but elements
+added using `asyncAdd(IN)` are aggregated using a specified 
`AggregateFunction`.
+
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
+retrieve an `StateIterator` over all currently stored mappings. Mappings are 
added using `asyncPut(UK, UV)` or
+`asyncPutAll(Map<UK, UV>)`. The value associated with a user key can be 
retrieved using `asyncGet(UK)`.
+The iterable views for mappings, keys and values can be retrieved using 
`asyncEntries()`,
+`asyncKeys()` and `asyncValues()` respectively. You can also use 
`asyncIsEmpty()` to check whether
+this map contains any key-value mappings.
+
+All types of state also have a method `asyncClear()` that clears the state for 
the currently
+active key, i.e. the key of the input element.
+
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one 
invocation of your
+user function can differ from the value in another invocation if the keys 
involved are different.
+
+To get a state handle, you have to create a `StateDescriptor`. This holds the 
name of the state
+(as we will see later, you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds, and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`, 
+an `AggregatingStateDescriptor`, a `ReducingStateDescriptor`, or a 
`MapStateDescriptor`.
+To differentiate between the previous state APIs, you should use the 
`StateDescriptor`s under the
+`org.apache.flink.api.common.state.v2` package (note the **v2**).
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{< ref "docs/dev/datastream/user_defined_functions" 
>}}#rich-functions) for
+information about that, but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+* `AggregatingState<IN, OUT> 
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
+
+{{< tabs "348fd48c-fa36-4d27-9e53-9bffb74d8a87" >}}
+{{< tab "Java" >}}
+```java
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // access the state value
+        sum.asyncValue().thenApply(currentSum -> {
+            // if it hasn't been used before, it will be null
+            Tuple2<Long, Long> current = currentSum == null ? Tuple2.of(0L, 
0L) : currentSum;
+
+            // update the count
+            current.f0 += 1;
+
+            // add the second field of the input value
+            current.f1 += input.f1;
+
+            return current;
+        }).thenAccept(r -> {
+            // if the count reaches 2, emit the average and clear the state
+            if (r.f0 >= 2) {
+                out.collect(Tuple2.of(input.f0, r.f1 / r.f0));
+                sum.asyncClear();
+            } else {
+                sum.asyncUpdate(r);
+            }
+        });
+    }
+
+    @Override
+    public void open(OpenContext ctx) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{})); // type information
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+
+// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(value -> value.f0)
+        .enableAsyncState()
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+This example implements a poor man's counting window. We key the tuples by the 
first field
+(in the example all have the same key `1`). The function stores the count and 
a running sum in
+a `ValueState`. Once the count reaches 2 it will emit the average and clear 
the state so that
+we start over from `0`. Note that this would keep a different state value for 
each different input
+key if we had tuples with different values in the first field.
+
+### Execution Order
+
+The state access methods are executed asynchronously. This means that the 
state access methods
+will not block the current thread. With synchronous APIs, the state access 
methods will be executed in
+the order they are called. However, with asynchronous APIs, the state access 
methods will be executed
+out of order, especially for those invokes for different incoming elements. 
For the above example,
+if the `flatMap` function is invoked for two different incoming elements A and 
B, the state access
+methods for A and B will be executed. Firstly, `asyncGet` is executed for A, 
then `asyncGet` is
+allowed to execute for B. The finish order of the two `asyncGet` is not 
guaranteed. Thus the order
+of continuation of the two `StateFuture`s is not guaranteed. Thus invokes of 
`asyncClear` or
+`asyncUpdate`for A and B are not determined.
+
+Although the state access methods are executed out of order, this not mean 
that all the user code
+are run in parallel. The user code in the `processElement`, `flatMap` or 
`thenXXxx` methods
+following the state access methods will be executed in a single thread (the 
task thread). So there
+is no concurrency issue for the user code.
+
+Typically, you don't need to worry about the execution order of the state 
access methods, but there
+is still some rules the Flink will ensure:
+* The execution order of user code entry `flatMap` for same-key elements are 
invoked strictly in
+order of element arrival.
+* The consumers or functions passed to the `thenXXxx` methods are executed in 
the order they are
+chained. If they are not chained, or there are multiple chains, the order is 
not guaranteed.
+
+### Best practice of asynchronous APIs
+
+The asynchronous APIs are designed to be more efficient and more powerful than 
the synchronous APIs.
+There are some best practices that you should follow when using the 
asynchronous APIs:
+* **Avoid mixing synchronous and asynchronous state access**
+* Use chaining of `thenXXxx` methods to handle the result of the state access 
and then gives another
+    state access or result. Divide the logic into multiple steps split by 
`thenXXxx` methods.
+* Avoid accessing mutable members of the user function 
(`RichFlatMapFunction`). Since the state access
+    methods are executed out of order, the mutable members may be accessed in 
unpredictable order.
+    Instead, use the result of the state access to pass the data between 
different steps. The 
+    `StateFutureUtils.completedFuture` or `thenApply` method can be used to 
pass the data. Or use
+    a captured container (`AtomicReference`) which is initialized for each 
invoke of the `flatMap`
+    to share between lambdas.
+
+
+### State Time-To-Live (TTL)
+
+A *time-to-live* (TTL) can be assigned to the keyed state of any type. If a 
TTL is configured and a
+state value has expired, the stored value will be cleaned up on a best effort 
basis which is
+discussed in more detail below.
+
+All state collection types support per-entry TTLs. This means that list 
elements and map entries
+expire independently.
+
+In order to use state TTL one must first build a `StateTtlConfig` 
configuration object. The TTL 
+functionality can then be enabled in any state descriptor by passing the 
configuration:
+
+{{< tabs "43221f0b-b56c-42e5-833e-ed416e5f9b4e" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import java.time.Duration;
+
+StateTtlConfig ttlConfig = StateTtlConfig
+    .newBuilder(Duration.ofSeconds(1))
+    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
+    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
+    .build();
+    
+ValueStateDescriptor<String> stateDescriptor = new 
ValueStateDescriptor<>("text state", String.class);
+stateDescriptor.enableTimeToLive(ttlConfig);
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The configuration has several options to consider:
+
+The first parameter of the `newBuilder` method is mandatory, it is the 
time-to-live value.
+
+The update type configures when the state TTL is refreshed (by default 
`OnCreateAndWrite`):
+
+ - `StateTtlConfig.UpdateType.OnCreateAndWrite` - only on creation and write 
access
+ - `StateTtlConfig.UpdateType.OnReadAndWrite` - also on read access
+
+    (**Notes:** If you set the state visibility to 
`StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp`
+    at the same time, the state read cache will be disabled, which will cause 
some performance loss in PyFlink)
+ 
+The state visibility configures whether the expired value is returned on read 
access 
+if it is not cleaned up yet (by default `NeverReturnExpired`):
+
+ - `StateTtlConfig.StateVisibility.NeverReturnExpired` - expired value is 
never returned 
+
+    (**Notes:** The state read/write cache will be disabled, which will cause 
some performance loss in PyFlink)
+
+ - `StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp` - returned if 
still available
+ 
+In case of `NeverReturnExpired`, the expired state behaves as if it does not 
exist anymore, 
+even if it still has to be removed. The option can be useful for use cases 
+where data has to become unavailable for read access strictly after TTL, 
+e.g. application working with privacy sensitive data.
+ 
+Another option `ReturnExpiredIfNotCleanedUp` allows to return the expired 
state before its cleanup.
+
+**Notes:** 
+
+- The state backends store the timestamp of the last modification along with 
the user value, 
+which means that enabling this feature increases consumption of state storage. 
+Heap state backend stores an additional Java object with a reference to the 
user state object 
+and a primitive long value in memory. The RocksDB/ForSt state backend adds 8 
bytes per stored value,
+list entry or map entry.
+
+- Only TTLs in reference to *processing time* are currently supported.
+
+- Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa
+will lead to compatibility failure and `StateMigrationException`.
+
+- The TTL configuration is not part of check- or savepoints but rather a way 
of how Flink treats it in the currently running job.

Review Comment:
   `check-`  -> `checkpoints`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to