sjwiesman commented on a change in pull request #8861: [FLINK-12963][state-processor-api] Add savepoint writer for bootstrapping new savepoints URL: https://github.com/apache/flink/pull/8861#discussion_r297759834
########## File path: docs/dev/libs/state_processor_api.md ########## @@ -0,0 +1,459 @@ +--- +title: "State Processor API" +nav-title: State Processor API +nav-parent_id: libs +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flinkās batch DataSet api. +This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications. + +* This will be replaced by the TOC +{:toc} + +## Abstraction + +To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database. + +A database can be thought of as one or more namespaces, each containing a collection of tables. +Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key. + +A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators. +Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state. + +<div data-lang="java" markdown="1"> +{% highlight java %} +MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE); + +class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> { + + public void processElement( + Transaction value, + ReadOnlyContext ctx, + Collector<Transaction> out) throws Exception { + + Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId); + if (rate != null) { + value.amount *= rate; + } + out.collect(value); + } + + public void processBroadcastElement( + CurrencyRate value, + Context ctx, + Collector<Transaction> out) throws Exception { + ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate); + } +} + +class Summarize extends RichFlatMapFunction<Transaction, Summary> { + transient ValueState<Double> totalState; + transient ValueState<Integer> countState; + + public void open(Configuration configuration) throws Exception { + totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE)); + countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT)); + } + + public void flatMap(Transaction value, Collector<Summary> out) throws Exception { + Summary summary = new Summary(); + summary.total = value.amount; + summary.count = 1; + + Double currentTotal = totalState.value(); + if (currentTotal != null) { + summary.total += currentTotal; + } + + Integer currentCount = countState.value(); + if (currentCount != null) { + summary.count += currentCount; + } + countState.update(summary.count); + + out.collect(summary); + } +} + +DataStream<Transaction> transactions = . . . +BroadcastStream<CurrencyRate> rates = . . . +transactions + .connect(rates) + .process(new CurrencyConverter()) + .uid("currency_converter") + .keyBy(transaction -> transaction.accountId) + .flatMap(new Summarize()) + .uid("summarize") +{% endhighlight %} +</div> + +This job contains multiple operators along with various kinds of state. +When analyzing that state we can first scope data by its operator, named by setting its uid. +Within each operator we can look at the registered states. +`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state. +In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. +Contrast this with Summarize, which contains two keyed states. +Because both states are scoped under the same key we can safely assume there exists some relationship between the two values. +Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state. +All of this means that the state for this job could be described using the following pseudo-sql commands. + +{% highlight sql %} +CREATE NAMESPACE currency_converter; + +CREATE TABLE currency_converter.rates ( + value Tuple2<Integer, Double> +); + +CREATE NAMESPACE summarize; + +CREATE TABLE summarize.keyed_state ( + key INTEGER PRIMARY KEY, + total DOUBLE, + count INTEGER Review comment: I put it and key in backticks, it's more about expressing the idea. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
