sjwiesman commented on a change in pull request #8861: 
[FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new 
savepoints
URL: https://github.com/apache/flink/pull/8861#discussion_r297759834
 
 

 ##########
 File path: docs/dev/libs/state_processor_api.md
 ##########
 @@ -0,0 +1,459 @@
+---
+title: "State Processor API"
+nav-title: State Processor API
+nav-parent_id: libs
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Apache Flink's State Processor API provides powerful functionality to reading, 
writing, and modifing savepoints and checkpoints using Flink’s batch DataSet 
api.
+This is useful for tasks such as analyzing state for interesting patterns, 
troubleshooting or auditing jobs by checking for discrepancies, and 
bootstrapping state for new applications.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Abstraction
+
+To understand how to best interact with savepoints in a batch context it is 
important to have a clear mental model of how the data in Flink state relates 
to a traditional relational database.
+
+A database can be thought of as one or more namespaces, each containing a 
collection of tables.
+Those tables in turn contain columns whose values have some intrinsic 
relationship between them, such as being scoped under the same key.
+
+A savepoint represents the state of a Flink job at a particular point in time 
which is made up of many operators.
+Those operators contain various kinds of state, both partitioned or keyed 
state, and non-partitioned or operator state. 
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+MapStateDescriptor<Integer, Double> CURRENCY_RATES = new 
MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
+ 
+class CurrencyConverter extends BroadcastProcessFunction<Transaction, 
CurrencyRate, Transaction> {
+ 
+  public void processElement(
+        Transaction value,
+        ReadOnlyContext ctx,
+        Collector<Transaction> out) throws Exception {
+ 
+     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
+     if (rate != null) {
+        value.amount *= rate;
+     }
+     out.collect(value);
+  }
+ 
+  public void processBroadcastElement(
+        CurrencyRate value,
+        Context ctx,
+        Collector<Transaction> out) throws Exception {
+        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, 
value.rate);
+  }
+}
+  
+class Summarize extends RichFlatMapFunction<Transaction, Summary> {
+  transient ValueState<Double> totalState;
+  transient ValueState<Integer> countState;
+ 
+  public void open(Configuration configuration) throws Exception {
+     totalState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("total", Types.DOUBLE));
+     countState = getRuntimeContext().getState(new 
ValueStateDescriptor<>("count", Types.INT));
+  }
+ 
+  public void flatMap(Transaction value, Collector<Summary> out) throws 
Exception {
+     Summary summary = new Summary();
+     summary.total = value.amount;
+     summary.count = 1;
+ 
+     Double currentTotal = totalState.value();
+     if (currentTotal != null) {
+        summary.total += currentTotal;
+     }
+ 
+     Integer currentCount = countState.value();
+     if (currentCount != null) {
+        summary.count += currentCount;
+     }
+     countState.update(summary.count);
+ 
+     out.collect(summary);
+  }
+}
+ 
+DataStream<Transaction> transactions = . . .
+BroadcastStream<CurrencyRate> rates = . . .
+transactions
+  .connect(rates)
+  .process(new CurrencyConverter())
+  .uid("currency_converter")
+  .keyBy(transaction -> transaction.accountId)
+  .flatMap(new Summarize())
+  .uid("summarize")
+{% endhighlight %}
+</div>
+
+This job contains multiple operators along with various kinds of state.
+When analyzing that state we can first scope data by its operator, named by 
setting its uid.
+Within each operator we can look at the registered states.
+`CurrencyConverter` has a broadcast state, which is a type of non-partitioned 
operator state.
+In general, there is no relationship between any two elements in an operator 
state and so we can look at each value as being its own row.
+Contrast this with Summarize, which contains two keyed states.
+Because both states are scoped under the same key we can safely assume there 
exists some relationship between the two values.
+Therefore, keyed state is best understood as a single table per operator 
containing one _key_ column along with _n_ value columns, one for each 
registered state.
+All of this means that the state for this job could be described using the 
following pseudo-sql commands. 
+
+{% highlight sql %}
+CREATE NAMESPACE currency_converter;
+ 
+CREATE TABLE currency_converter.rates (
+   value Tuple2<Integer, Double>
+);
+ 
+CREATE NAMESPACE summarize;
+ 
+CREATE TABLE summarize.keyed_state (
+   key   INTEGER PRIMARY KEY,
+   total DOUBLE,
+   count INTEGER
 
 Review comment:
   I put it and key in backticks, it's more about expressing the idea. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to