This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0f72aa [hotfix] add scala note to spa docs
c0f72aa is described below
commit c0f72aa4a4ebade8f47e8de16ee0a2a6d942a797
Author: Seth Wiesman <[email protected]>
AuthorDate: Fri Jan 8 10:02:08 2021 -0600
[hotfix] add scala note to spa docs
---
docs/dev/libs/state_processor_api.md | 334 +-------------------------------
docs/dev/libs/state_processor_api.zh.md | 332 +------------------------------
2 files changed, 9 insertions(+), 657 deletions(-)
diff --git a/docs/dev/libs/state_processor_api.md
b/docs/dev/libs/state_processor_api.md
index fa5ef3a..7ed30ec 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.md
@@ -87,20 +87,11 @@ Since the operator “Snk” does not have any state, its
namespace is empty.
Reading state begins by specifying the path to a valid savepoint or checkpoint
along with the `StateBackend` that should be used to restore the data.
The compatibility guarantees for restoring state are identical to those when
restoring a `DataStream` application.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new
MemoryStateBackend());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val bEnv = ExecutionEnvironment.getExecutionEnvironment
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
-{% endhighlight %}
-</div>
-</div>
+
### Operator State
@@ -113,24 +104,12 @@ When reading operator state, users specify the operator
uid, the state name, and
Operator state stored in a `CheckpointedFunction` using `getListState` can be
read using `ExistingSavepoint#readListState`.
The state name and type information should match those used to define the
`ListStateDescriptor` that declared this state in the DataStream application.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
- "my-uid",
- "list-state",
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
#### Operator Union List State
@@ -138,25 +117,12 @@ Operator state stored in a `CheckpointedFunction` using
`getUnionListState` can
The state name and type information should match those used to define the
`ListStateDescriptor` that declared this state in the DataStream application.
The framework will return a _single_ copy of the state, equivalent to
restoring a DataStream with parallelism 1.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readUnionState<>(
"my-uid",
"union-state",
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readUnionState(
- "my-uid",
- "union-state",
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
-
#### Broadcast State
@@ -164,8 +130,6 @@ val listState = savepoint.readUnionState(
The state name and type information should match those used to define the
`MapStateDescriptor` that declared this state in the DataStream application.
The framework will return a _single_ copy of the state, equivalent to
restoring a DataStream with parallelism 1.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Tuple2<Integer, Integer>> broadcastState =
savepoint.readBroadcastState<>(
"my-uid",
@@ -173,24 +137,11 @@ DataSet<Tuple2<Integer, Integer>> broadcastState =
savepoint.readBroadcastState<
Types.INT,
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val broadcastState = savepoint.readBroadcastState(
- "my-uid",
- "broadcast-state",
- Types.INT,
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
#### Using Custom Serializers
Each of the operator state readers support using custom `TypeSerializers` if
one was used to define the `StateDescriptor` that wrote out the state.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readListState<>(
"uid",
@@ -198,17 +149,6 @@ DataSet<Integer> listState = savepoint.readListState<>(
Types.INT,
new MyCustomIntSerializer());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
- "uid",
- "list-state",
- Types.INT,
- new MyCustomIntSerializer)
-{% endhighlight %}
-</div>
-</div>
### Keyed State
@@ -218,8 +158,6 @@ When reading a keyed state, users specify the operator id
and a `KeyedStateReade
The `KeyedStateReaderFunction` allows users to read arbitrary columns and
complex state types such as ListState, MapState, and AggregatingState.
This means if an operator contains a stateful process function such as:
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer,
Integer, Void> {
@@ -243,36 +181,9 @@ public class StatefulFunctionWithTime extends
KeyedProcessFunction<Integer, Inte
}
}
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
- var state: ValueState[Int] = _
- var updateTimes: ListState[Long] = _
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val stateDescriptor = new ValueStateDescriptor("state",
createTypeInformation[Int])
- state = getRuntimeContext().getState(stateDescriptor)
-
- val updateDescriptor = new ListStateDescriptor("times",
createTypeInformation[Long])
- updateTimes = getRuntimeContext().getListState(updateDescriptor)
- }
-
- @throws[Exception]
- override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int,
Void]#Context, out: Collector[Void]): Unit = {
- state.update(value + 1)
- updateTimes.add(System.currentTimeMillis)
- }
-}
-{% endhighlight %}
-</div>
-</div>
Then it can read by defining an output type and corresponding
`KeyedStateReaderFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new
ReaderFunction());
@@ -316,30 +227,6 @@ public class ReaderFunction extends
KeyedStateReaderFunction<Integer, KeyedState
}
}
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] {
- var state: ValueState[Int] = _
- var updateTimes: ListState[Long] = _
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val stateDescriptor = new ValueStateDescriptor("state",
createTypeInformation[Int])
- state = getRuntimeContext().getState(stateDescriptor)
-
- val updateDescriptor = new ListStateDescriptor("times",
createTypeInformation[Long])
- updateTimes = getRuntimeContext().getListState(updateDescriptor)
- }
-
- override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out:
Collector[KeyedState]): Unit = {
- val data = KeyedState(key, state.value, updateTimes.get.asScala.toList)
- out.collect(data)
- }
-}
-{% endhighlight %}
-</div>
-</div>
Along with reading registered state values, each key has access to a `Context`
with metadata such as registered event time and processing time timers.
@@ -355,10 +242,7 @@ to a `WindowFunction` or `ProcessWindowFunction`.
Suppose a DataStream application that counts the number of clicks per user per
minute.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
-
class Click {
public String userId;
@@ -398,42 +282,9 @@ clicks
.addSink(new Sink());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger}
-
-case class Click(userId: String, time: LocalDateTime)
-
-class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
-
- override def createAccumulator(): JInteger = 0
-
- override def add(value: Click, accumulator: JInteger): JInteger = 1 +
accumulator
-
- override def getResult(accumulator: JInteger): JInteger = accumulator
-
- override def merge(a: JInteger, b: JInteger): JInteger = a + b
-}
-
-DataStream[Click] clicks = . . .
-
-clicks
- .keyBy(click => click.userId)
- .window(TumblingEventTimeWindows.of(Time.minutes(1)))
- .aggregate(new ClickCounter())
- .uid("click-window")
- .addSink(new Sink())
-
-{% endhighlight %}
-</div>
-</div>
This state can be read using the code below.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
class ClickState {
@@ -470,44 +321,6 @@ savepoint
.print();
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger, Long => JLong}
-import java.util.{Set => JSet}
-
-case class ClickState(userId: String, count: JInteger, window: TimeWindow,
triggerTimers: JSet[JLong])
-
-class ClickReader extends WindowReaderFunction[JInteger, ClickState, String,
TimeWindow] {
-
- override def readWindow(
- key: String,
- context: Context[TimeWindow],
- elements: Iterable[JInteger],
- out: Collector[ClickState]): Unit = {
-
- state = ClickState(
- userId = key,
- count = elements.iterator().next(),
- window = context.window()k
- triggerTimers = context.registeredEventTimeTimers())
-
- out.collect(state)
- }
-}
-
-val batchEnv = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new
MemoryStateBackend())
-
-savepoint
- .window(TumblingEventTimeWindows.of(Time.minutes(1)))
- .aggregate("click-window", new ClickCounter(), new ClickReader(),
Types.String, Types.INT, Types.INT)
- .print()
-
-{% endhighlight %}
-</div>
-</div>
Additionally, trigger state - from `CountTrigger`s or custom triggers - can be
read using the method
`Context#triggerState` inside the `WindowReaderFunction`.
@@ -517,8 +330,10 @@ Additionally, trigger state - from `CountTrigger`s or
custom triggers - can be r
`Savepoint`'s may also be written, which allows such use cases as
bootstrapping state based on historical data.
Each savepoint is made up of one or more `BootstrapTransformation`'s
(explained below), each of which defines the state for an individual operator.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+**Note** The state processor api does not currently provide a Scala API. As a
result
+it will always auto-derive serializers using the Java type stack. To bootstrap
+a savepoint for the Scala DataStream API please manually pass in all type
information.
+
{% highlight java %}
int maxParallelism = 128;
@@ -528,19 +343,6 @@ Savepoint
.withOperator("uid2", transformation2)
.write(savepointPath);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val maxParallelism = 128
-
-Savepoint
- .create(new MemoryStateBackend(), maxParallelism)
- .withOperator("uid1", transformation1)
- .withOperator("uid2", transformation2)
- .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
The [UIDs]({% link ops/state/savepoints.md %}#assigning-operator-ids)
associated with each operator must match one to one with the UIDs assigned to
the operators in your `DataStream` application; these are how Flink knows what
state maps to which operator.
@@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.md
%}#assigning-operator-ids) associated
Simple operator state, using `CheckpointedFunction`, can be created using the
`StateBootstrapFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
@@ -577,44 +377,11 @@ BootstrapTransformation transformation =
OperatorTransformation
.bootstrapWith(data)
.transform(new SimpleBootstrapFunction());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
-
- var ListState[Integer] state = _
-
- @throws[Exception]
- override def processElement(value: Integer, ctx: Context): Unit = {
- state.add(value)
- }
-
- @throws[Exception]
- override def snapshotState(context: FunctionSnapshotContext): Unit = {
- }
-
- @throws[Exception]
- override def initializeState(context: FunctionInitializationContext): Unit
= {
- state = context.getOperatorState().getListState(new
ListStateDescriptor("state", Types.INT))
- }
-}
-
-val env = ExecutionEnvironment.getExecutionEnvironment
-val data = env.fromElements(1, 2, 3)
-
-BootstrapTransformation transformation = OperatorTransformation
- .bootstrapWith(data)
- .transform(new SimpleBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
### Broadcast State
[BroadcastState]({% link dev/stream/state/broadcast_state.md %}) can be
written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state
in the `DataStream` API, the full state must fit in memory.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class CurrencyRate {
public String currency;
@@ -640,38 +407,11 @@ BootstrapTransformation<CurrencyRate>
broadcastTransformation = OperatorTransfor
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class CurrencyRate(currency: String, rate: Double)
-
-object CurrencyBootstrapFunction {
- val descriptor = new MapStateDescriptor("currency-rates", Types.STRING,
Types.DOUBLE)
-}
-
-class CurrencyBootstrapFunction extends
BroadcastStateBootstrapFunction[CurrencyRate] {
-
- @throws[Exception]
- override processElement(value: CurrencyRate, ctx: Context): Unit = {
- ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
- }
-}
-
-val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0),
CurrencyRate("EUR", 1.3))
-
-val broadcastTransformation = OperatorTransformation
- .bootstrapWith(currencyDataSet)
- .transform(new CurrencyBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
### Keyed State
Keyed state for `ProcessFunction`'s and other `RichFunction` types can be
written using a `KeyedStateBootstrapFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class Account {
public int id;
@@ -705,37 +445,6 @@ BootstrapTransformation<Account> transformation =
OperatorTransformation
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer,
Account] {
- var state: ValueState[Double]
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
- state = getRuntimeContext().getState(descriptor)
- }
-
- @throws[Exception]
- override def processElement(value: Account, ctx: Context): Unit = {
- state.update(value.amount)
- }
-}
-
-val bEnv = ExecutionEnvironment.getExecutionEnvironment()
-
-val accountDataSet = bEnv.fromCollection(accounts)
-
-val transformation = OperatorTransformation
- .bootstrapWith(accountDataSet)
- .keyBy(acc => acc.id)
- .transform(new AccountBootstrapper)
-{% endhighlight %}
-</div>
-</div>
The `KeyedStateBootstrapFunction` supports setting event time and processing
time timers.
The timers will not fire inside the bootstrap function and only become active
once restored within a `DataStream` application.
@@ -749,8 +458,6 @@ The state processor api supports writing state for the
[window operator]({% link
When writing window state, users specify the operator id, window assigner,
evictor, optional trigger, and aggregation type.
It is important the configurations on the bootstrap transformation match the
configurations on the DataStream window.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class Account {
public int id;
@@ -773,45 +480,14 @@ BootstrapTransformation<Account> transformation =
OperatorTransformation
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((left, right) -> left + right);
{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-val bEnv = ExecutionEnvironment.getExecutionEnvironment();
-val accountDataSet = bEnv.fromCollection(accounts);
-
-val transformation = OperatorTransformation
- .bootstrapWith(accountDataSet)
- // When using event time windows, its important
- // to assign timestamps to each record.
- .assignTimestamps(account => account.timestamp)
- .keyBy(acc => acc.id)
- .window(TumblingEventTimeWindows.of(Time.minutes(5)))
- .reduce((left, right) => left + right)
-{% endhighlight %}
-</div>
-</div>
## Modifying Savepoints
Besides creating a savepoint from scratch, you can base one off an existing
savepoint such as when bootstrapping a single new operator for an existing job.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
Savepoint
.load(bEnv, new MemoryStateBackend(), oldPath)
.withOperator("uid", transformation)
.write(newPath);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
- .load(bEnv, new MemoryStateBackend, oldPath)
- .withOperator("uid", transformation)
- .write(newPath)
-{% endhighlight %}
-</div>
-</div>
diff --git a/docs/dev/libs/state_processor_api.zh.md
b/docs/dev/libs/state_processor_api.zh.md
index 581d3eb..18d505a 100644
--- a/docs/dev/libs/state_processor_api.zh.md
+++ b/docs/dev/libs/state_processor_api.zh.md
@@ -87,20 +87,10 @@ Since the operator “Snk” does not have any state, its
namespace is empty.
Reading state begins by specifying the path to a valid savepoint or checkpoint
along with the `StateBackend` that should be used to restore the data.
The compatibility guarantees for restoring state are identical to those when
restoring a `DataStream` application.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new
MemoryStateBackend());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val bEnv = ExecutionEnvironment.getExecutionEnvironment
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
-{% endhighlight %}
-</div>
-</div>
### Operator State
@@ -113,24 +103,12 @@ When reading operator state, users specify the operator
uid, the state name, and
Operator state stored in a `CheckpointedFunction` using `getListState` can be
read using `ExistingSavepoint#readListState`.
The state name and type information should match those used to define the
`ListStateDescriptor` that declared this state in the DataStream application.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
- "my-uid",
- "list-state",
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
#### Operator Union List State
@@ -138,25 +116,12 @@ Operator state stored in a `CheckpointedFunction` using
`getUnionListState` can
The state name and type information should match those used to define the
`ListStateDescriptor` that declared this state in the DataStream application.
The framework will return a _single_ copy of the state, equivalent to
restoring a DataStream with parallelism 1.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readUnionState<>(
"my-uid",
"union-state",
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readUnionState(
- "my-uid",
- "union-state",
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
-
#### Broadcast State
@@ -164,8 +129,6 @@ val listState = savepoint.readUnionState(
The state name and type information should match those used to define the
`MapStateDescriptor` that declared this state in the DataStream application.
The framework will return a _single_ copy of the state, equivalent to
restoring a DataStream with parallelism 1.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Tuple2<Integer, Integer>> broadcastState =
savepoint.readBroadcastState<>(
"my-uid",
@@ -173,24 +136,11 @@ DataSet<Tuple2<Integer, Integer>> broadcastState =
savepoint.readBroadcastState<
Types.INT,
Types.INT);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val broadcastState = savepoint.readBroadcastState(
- "my-uid",
- "broadcast-state",
- Types.INT,
- Types.INT)
-{% endhighlight %}
-</div>
-</div>
#### Using Custom Serializers
Each of the operator state readers support using custom `TypeSerializers` if
one was used to define the `StateDescriptor` that wrote out the state.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<Integer> listState = savepoint.readListState<>(
"uid",
@@ -198,17 +148,6 @@ DataSet<Integer> listState = savepoint.readListState<>(
Types.INT,
new MyCustomIntSerializer());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
- "uid",
- "list-state",
- Types.INT,
- new MyCustomIntSerializer)
-{% endhighlight %}
-</div>
-</div>
### Keyed State
@@ -218,8 +157,6 @@ When reading a keyed state, users specify the operator id
and a `KeyedStateReade
The `KeyedStateReaderFunction` allows users to read arbitrary columns and
complex state types such as ListState, MapState, and AggregatingState.
This means if an operator contains a stateful process function such as:
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer,
Integer, Void> {
@@ -243,36 +180,9 @@ public class StatefulFunctionWithTime extends
KeyedProcessFunction<Integer, Inte
}
}
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
- var state: ValueState[Int] = _
- var updateTimes: ListState[Long] = _
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val stateDescriptor = new ValueStateDescriptor("state",
createTypeInformation[Int])
- state = getRuntimeContext().getState(stateDescriptor)
-
- val updateDescriptor = new ListStateDescriptor("times",
createTypeInformation[Long])
- updateTimes = getRuntimeContext().getListState(updateDescriptor)
- }
-
- @throws[Exception]
- override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int,
Void]#Context, out: Collector[Void]): Unit = {
- state.update(value + 1)
- updateTimes.add(System.currentTimeMillis)
- }
-}
-{% endhighlight %}
-</div>
-</div>
Then it can read by defining an output type and corresponding
`KeyedStateReaderFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new
ReaderFunction());
@@ -316,30 +226,6 @@ public class ReaderFunction extends
KeyedStateReaderFunction<Integer, KeyedState
}
}
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] {
- var state: ValueState[Int] = _
- var updateTimes: ListState[Long] = _
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val stateDescriptor = new ValueStateDescriptor("state",
createTypeInformation[Int])
- state = getRuntimeContext().getState(stateDescriptor)
-
- val updateDescriptor = new ListStateDescriptor("times",
createTypeInformation[Long])
- updateTimes = getRuntimeContext().getListState(updateDescriptor)
- }
-
- override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out:
Collector[KeyedState]): Unit = {
- val data = KeyedState(key, state.value, updateTimes.get.asScala.toList)
- out.collect(data)
- }
-}
-{% endhighlight %}
-</div>
-</div>
Along with reading registered state values, each key has access to a `Context`
with metadata such as registered event time and processing time timers.
@@ -355,10 +241,7 @@ to a `WindowFunction` or `ProcessWindowFunction`.
Suppose a DataStream application that counts the number of clicks per user per
minute.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
-
class Click {
public String userId;
@@ -398,42 +281,9 @@ clicks
.addSink(new Sink());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger}
-
-case class Click(userId: String, time: LocalDateTime)
-
-class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
-
- override def createAccumulator(): JInteger = 0
-
- override def add(value: Click, accumulator: JInteger): JInteger = 1 +
accumulator
-
- override def getResult(accumulator: JInteger): JInteger = accumulator
-
- override def merge(a: JInteger, b: JInteger): JInteger = a + b
-}
-
-DataStream[Click] clicks = . . .
-
-clicks
- .keyBy(click => click.userId)
- .window(TumblingEventTimeWindows.of(Time.minutes(1)))
- .aggregate(new ClickCounter())
- .uid("click-window")
- .addSink(new Sink())
-
-{% endhighlight %}
-</div>
-</div>
This state can be read using the code below.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
class ClickState {
@@ -470,44 +320,6 @@ savepoint
.print();
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger, Long => JLong}
-import java.util.{Set => JSet}
-
-case class ClickState(userId: String, count: JInteger, window: TimeWindow,
triggerTimers: JSet[JLong])
-
-class ClickReader extends WindowReaderFunction[JInteger, ClickState, String,
TimeWindow] {
-
- override def readWindow(
- key: String,
- context: Context[TimeWindow],
- elements: Iterable[JInteger],
- out: Collector[ClickState]): Unit = {
-
- state = ClickState(
- userId = key,
- count = elements.iterator().next(),
- window = context.window()k
- triggerTimers = context.registeredEventTimeTimers())
-
- out.collect(state)
- }
-}
-
-val batchEnv = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new
MemoryStateBackend())
-
-savepoint
- .window(TumblingEventTimeWindows.of(Time.minutes(1)))
- .aggregate("click-window", new ClickCounter(), new ClickReader(),
Types.String, Types.INT, Types.INT)
- .print()
-
-{% endhighlight %}
-</div>
-</div>
Additionally, trigger state - from `CountTrigger`s or custom triggers - can be
read using the method
`Context#triggerState` inside the `WindowReaderFunction`.
@@ -517,8 +329,10 @@ Additionally, trigger state - from `CountTrigger`s or
custom triggers - can be r
`Savepoint`'s may also be written, which allows such use cases as
bootstrapping state based on historical data.
Each savepoint is made up of one or more `BootstrapTransformation`'s
(explained below), each of which defines the state for an individual operator.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+**Note** The state processor api does not currently provide a Scala API. As a
result
+it will always auto-derive serializers using the Java type stack. To bootstrap
+a savepoint for the Scala DataStream API please manually pass in all type
information.
+
{% highlight java %}
int maxParallelism = 128;
@@ -528,19 +342,7 @@ Savepoint
.withOperator("uid2", transformation2)
.write(savepointPath);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val maxParallelism = 128
-Savepoint
- .create(new MemoryStateBackend(), maxParallelism)
- .withOperator("uid1", transformation1)
- .withOperator("uid2", transformation2)
- .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
The [UIDs]({% link ops/state/savepoints.zh.md %}#assigning-operator-ids)
associated with each operator must match one to one with the UIDs assigned to
the operators in your `DataStream` application; these are how Flink knows what
state maps to which operator.
@@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.zh.md
%}#assigning-operator-ids) associa
Simple operator state, using `CheckpointedFunction`, can be created using the
`StateBootstrapFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
@@ -577,44 +377,11 @@ BootstrapTransformation transformation =
OperatorTransformation
.bootstrapWith(data)
.transform(new SimpleBootstrapFunction());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
-
- var ListState[Integer] state = _
-
- @throws[Exception]
- override def processElement(value: Integer, ctx: Context): Unit = {
- state.add(value)
- }
-
- @throws[Exception]
- override def snapshotState(context: FunctionSnapshotContext): Unit = {
- }
-
- @throws[Exception]
- override def initializeState(context: FunctionInitializationContext): Unit
= {
- state = context.getOperatorState().getListState(new
ListStateDescriptor("state", Types.INT))
- }
-}
-
-val env = ExecutionEnvironment.getExecutionEnvironment
-val data = env.fromElements(1, 2, 3)
-
-BootstrapTransformation transformation = OperatorTransformation
- .bootstrapWith(data)
- .transform(new SimpleBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
### Broadcast State
[BroadcastState]({% link dev/stream/state/broadcast_state.zh.md %}) can be
written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state
in the `DataStream` API, the full state must fit in memory.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class CurrencyRate {
public String currency;
@@ -640,38 +407,11 @@ BootstrapTransformation<CurrencyRate>
broadcastTransformation = OperatorTransfor
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class CurrencyRate(currency: String, rate: Double)
-
-object CurrencyBootstrapFunction {
- val descriptor = new MapStateDescriptor("currency-rates", Types.STRING,
Types.DOUBLE)
-}
-
-class CurrencyBootstrapFunction extends
BroadcastStateBootstrapFunction[CurrencyRate] {
-
- @throws[Exception]
- override processElement(value: CurrencyRate, ctx: Context): Unit = {
- ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
- }
-}
-
-val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0),
CurrencyRate("EUR", 1.3))
-
-val broadcastTransformation = OperatorTransformation
- .bootstrapWith(currencyDataSet)
- .transform(new CurrencyBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
### Keyed State
Keyed state for `ProcessFunction`'s and other `RichFunction` types can be
written using a `KeyedStateBootstrapFunction`.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class Account {
public int id;
@@ -705,37 +445,6 @@ BootstrapTransformation<Account> transformation =
OperatorTransformation
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer,
Account] {
- var state: ValueState[Double]
-
- @throws[Exception]
- override def open(parameters: Configuration): Unit = {
- val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
- state = getRuntimeContext().getState(descriptor)
- }
-
- @throws[Exception]
- override def processElement(value: Account, ctx: Context): Unit = {
- state.update(value.amount)
- }
-}
-
-val bEnv = ExecutionEnvironment.getExecutionEnvironment()
-
-val accountDataSet = bEnv.fromCollection(accounts)
-
-val transformation = OperatorTransformation
- .bootstrapWith(accountDataSet)
- .keyBy(acc => acc.id)
- .transform(new AccountBootstrapper)
-{% endhighlight %}
-</div>
-</div>
The `KeyedStateBootstrapFunction` supports setting event time and processing
time timers.
The timers will not fire inside the bootstrap function and only become active
once restored within a `DataStream` application.
@@ -749,8 +458,6 @@ The state processor api supports writing state for the
[window operator]({% link
When writing window state, users specify the operator id, window assigner,
evictor, optional trigger, and aggregation type.
It is important the configurations on the bootstrap transformation match the
configurations on the DataStream window.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
public class Account {
public int id;
@@ -773,45 +480,14 @@ BootstrapTransformation<Account> transformation =
OperatorTransformation
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((left, right) -> left + right);
{% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-val bEnv = ExecutionEnvironment.getExecutionEnvironment();
-val accountDataSet = bEnv.fromCollection(accounts);
-
-val transformation = OperatorTransformation
- .bootstrapWith(accountDataSet)
- // When using event time windows, its important
- // to assign timestamps to each record.
- .assignTimestamps(account => account.timestamp)
- .keyBy(acc => acc.id)
- .window(TumblingEventTimeWindows.of(Time.minutes(5)))
- .reduce((left, right) => left + right)
-{% endhighlight %}
-</div>
-</div>
## Modifying Savepoints
Besides creating a savepoint from scratch, you can base one off an existing
savepoint such as when bootstrapping a single new operator for an existing job.
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
{% highlight java %}
Savepoint
.load(bEnv, new MemoryStateBackend(), oldPath)
.withOperator("uid", transformation)
.write(newPath);
{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
- .load(bEnv, new MemoryStateBackend, oldPath)
- .withOperator("uid", transformation)
- .write(newPath)
-{% endhighlight %}
-</div>
-</div>