qinjunjerry commented on a change in pull request #10502: 
[FLINK-14825][state-processor-api][docs] Rework state processor api 
documentation
URL: https://github.com/apache/flink/pull/10502#discussion_r357173100
 
 

 ##########
 File path: docs/dev/libs/state_processor_api.md
 ##########
 @@ -23,166 +23,99 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
api.
-This is useful for tasks such as analyzing state for interesting patterns, 
troubleshooting or auditing jobs by checking for discrepancies, and 
bootstrapping state for new applications.
+Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
API.
+Due to the [interoperability of DataSet and Table 
API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api),
 you can even use relational Table API or SQL queries to analyze and process 
state data.
+
+For example, you can take a savepoint of a running stream processing 
application and analyze it with a DataSet batch program to verify that the 
application behaves correctly.
+Or you can read a batch of data from any store, preprocess it, and write the 
result to a savepoint that you use to bootstrap the state of a streaming 
application.
+It's also possible to fix inconsistent state entries.
+Finally, the State Processor API opens up many ways to evolve a stateful 
application that were previously blocked by parameter and design choices that 
could not be changed without losing all the state of the application after it 
was started.
+For example, you can now arbitrarily modify the data types of states, adjust 
the maximum parallelism of operators, split or merge operator state, re-assign 
operator UIDs, and so on.
+
+To get started with the state processor api, include the following library in 
your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-state-processor-api{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
 
 * This will be replaced by the TOC
 {:toc}
 
-## Abstraction
-
-To understand how to best interact with savepoints in a batch context it is 
important to have a clear mental model of how the data in Flink state relates 
to a traditional relational database.
+## Mapping Application State to DataSets 
 
-A database can be thought of as one or more namespaces, each containing a 
collection of tables.
-Those tables in turn contain columns whose values have some intrinsic 
relationship between them, such as being scoped under the same key.
+The State Processor API maps the state of a streaming application to one or 
more data sets that can be separately processed.
+In order to be able to use the API, you need to understand how this mapping 
works.
 
-A savepoint represents the state of a Flink job at a particular point in time 
which is made up of many operators.
-Those operators contain various kinds of state, both partitioned or keyed 
state, and non-partitioned or operator state. 
+But let's first have a look at what a stateful Flink job looks like.
+A Flink job is composed of operators, typically one or more source operators, 
a few operators for the actual processing, and one or more sink operators.
+Each operator runs in parallel in one or more tasks and can work with 
different types of state.
+An operator can have zero, one, or more *“operator states”* which are 
organized as lists that are scoped to the operator's tasks.
+If the operator is applied on a keyed stream, it can also have zero, one, or 
more *“keyed states”* which are scoped to a key that is extracted from each 
processed record.
+You can think of keyed state as a distributed key-value map. 
 
-<div data-lang="java" markdown="1">
-{% highlight java %}
-MapStateDescriptor<Integer, Double> CURRENCY_RATES = new 
MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
- 
-class CurrencyConverter extends BroadcastProcessFunction<Transaction, 
CurrencyRate, Transaction> {
- 
-  public void processElement(
-        Transaction value,
-        ReadOnlyContext ctx,
-        Collector<Transaction> out) throws Exception {
- 
-     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
-     if (rate != null) {
-        value.amount *= rate;
-     }
-     out.collect(value);
-  }
- 
-  public void processBroadcastElement(
-        CurrencyRate value,
-        Context ctx,
-        Collector<Transaction> out) throws Exception {
-        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, 
value.rate);
-  }
-}
-  
-class Summarize extends RichFlatMapFunction<Transaction, Summary> {
-  transient ValueState<Double> totalState;
-  transient ValueState<Integer> countState;
- 
-  public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("count", Types.INT));
-  }
- 
-  public void flatMap(Transaction value, Collector<Summary> out) throws 
Exception {
-     Summary summary = new Summary();
-     summary.total = value.amount;
-     summary.count = 1;
- 
-     Double currentTotal = totalState.value();
-     if (currentTotal != null) {
-        summary.total += currentTotal;
-     }
- 
-     Integer currentCount = countState.value();
-     if (currentCount != null) {
-        summary.count += currentCount;
-     }
-     countState.update(summary.count);
- 
-     out.collect(summary);
-  }
-}
- 
-DataStream<Transaction> transactions = . . .
-BroadcastStream<CurrencyRate> rates = . . .
-transactions
-  .connect(rates)
-  .process(new CurrencyConverter())
-  .uid("currency_converter")
-  .keyBy(transaction -> transaction.accountId)
-  .flatMap(new Summarize())
-  .uid("summarize")
-{% endhighlight %}
-</div>
+The following figure shows the application “MyApp” which consists of three 
operators called “Src”, “Proc”, and “Snk”.
+Src has one operator state (os1), Proc has one operator state (os2) and two 
keyed states (ks1, ks2) and Snk is stateless.
 
-This job contains multiple operators along with various kinds of state.
-When analyzing that state we can first scope data by its operator, named by 
setting its uid.
-Within each operator we can look at the registered states.
-`CurrencyConverter` has a broadcast state, which is a type of non-partitioned 
operator state.
-In general, there is no relationship between any two elements in an operator 
state and so we can look at each value as being its own row.
-Contrast this with Summarize, which contains two keyed states.
-Because both states are scoped under the same key we can safely assume there 
exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator 
containing one _key_ column along with _n_ value columns, one for each 
registered state.
-All of this means that the state for this job could be described using the 
following pseudo-sql commands. 
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/application-my-app-state-processor-api.png" width="600px" 
alt="Application: My App"/>
+</p>
 
-{% highlight sql %}
-CREATE NAMESPACE currency_converter;
- 
-CREATE TABLE currency_converter.rates (
-   value Tuple2<Integer, Double>
-);
- 
-CREATE NAMESPACE summarize;
- 
-CREATE TABLE summarize.keyed_state (
-   key   INTEGER PRIMARY KEY,
-   total DOUBLE,
-   count INTEGER
-);
-{% endhighlight %}
+A savepoint or checkpoint of MyApp consists of the data of all states, 
organized in a way that the states of each task can be restored.
+When processing the data of a savepoint (or checkpoint) with a batch job, we 
need a mental model that maps the data of the individual tasks' states into 
data sets or tables.
+In fact, we can think of a savepoint as a database. Every operator (identified 
by its UID) represents a namespace.
+Each operator state of an operator is mapped to a dedicated table in the 
namespace with a single column that holds the state's data of all tasks.
+All keyed states of an operator are mapped to a single table consisting of a 
column for the key, and one column for each keyed state.
+The following figure shows how a savepoint of MyApp is mapped to a database.
 
-In general, the savepoint ↔ database relationship can be summarized as:
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 
20px">
+       <img src="{{ site.baseurl 
}}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: My 
App"/>
+</p>
 
-    * A savepoint is a database
-    * An operator is a namespace named by its uid
-    * Each operator state represents a single table
-        * Each element in an operator state represents a single row in that 
table
-    * Each operator containing keyed state has a single “keyed_state” table
-        * Each keyed_state table has one key column mapping the key value of 
the operator
-        * Each registered state represents a single column in the table
-        * Each row in the table maps to a single key
+The figure shows how the values of Src's operator state are mapped to a table 
with one column and five rows, one row for all list entries across all parallel 
tasks of Src.
+Operator state os2 of the operator “Proc” is similarly mapped to an individual 
table.
+The keyed states ks1 and ks2 are combined to a single table with three 
columns, one for the key, one for ks1 and one for ks2.
+The keyed table holds one row for each distinct key of both keyed states.
+Since the operator “Snk” does not have any state, its namespace is empty.
 
 ## Reading State
 
-Reading state begins by specifiying the path to a valid savepoint or 
checkpoint along with the `StateBackend` that should be used to restore the 
data.
-The compatability guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
+Reading state begins by specifying the path to a valid savepoint or checkpoint 
along with the `StateBackend` that should be used to restore the data.
+The compatibility guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
-ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
RocksDBStateBackend());
+ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
MemoryStateBackend());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val bEnv      = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend())
+val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend())
 {% endhighlight %}
 </div>
 </div>
 
-When reading operator state, simply specify the operator uid, state name, and 
type information.
+### Operator State
 
 Review comment:
   I think adding the following generalized sentence here before expanding to 
different kind of operator states is helpful, especially for understanding 
those parameters given in the followed examples:
   
   >When reading operator state, users specify the operator uid, the state 
name, and the type information.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to