This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0f72aa  [hotfix] add scala note to spa docs
c0f72aa is described below

commit c0f72aa4a4ebade8f47e8de16ee0a2a6d942a797
Author: Seth Wiesman <[email protected]>
AuthorDate: Fri Jan 8 10:02:08 2021 -0600

    [hotfix] add scala note to spa docs
---
 docs/dev/libs/state_processor_api.md    | 334 +-------------------------------
 docs/dev/libs/state_processor_api.zh.md | 332 +------------------------------
 2 files changed, 9 insertions(+), 657 deletions(-)

diff --git a/docs/dev/libs/state_processor_api.md 
b/docs/dev/libs/state_processor_api.md
index fa5ef3a..7ed30ec 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.md
@@ -87,20 +87,11 @@ Since the operator “Snk” does not have any state, its 
namespace is empty.
 Reading state begins by specifying the path to a valid savepoint or checkpoint 
along with the `StateBackend` that should be used to restore the data.
 The compatibility guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
 ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
MemoryStateBackend());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val bEnv      = ExecutionEnvironment.getExecutionEnvironment
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
-{% endhighlight %}
-</div>
-</div>
+
 
 ### Operator State
 
@@ -113,24 +104,12 @@ When reading operator state, users specify the operator 
uid, the state name, and
 Operator state stored in a `CheckpointedFunction` using `getListState` can be 
read using `ExistingSavepoint#readListState`.
 The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState  = savepoint.readListState<>(
     "my-uid",
     "list-state",
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState  = savepoint.readListState(
-    "my-uid",
-    "list-state",
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
 
 #### Operator Union List State
 
@@ -138,25 +117,12 @@ Operator state stored in a `CheckpointedFunction` using 
`getUnionListState` can
 The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState  = savepoint.readUnionState<>(
     "my-uid",
     "union-state",
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState  = savepoint.readUnionState(
-    "my-uid",
-    "union-state",
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
-
 
 #### Broadcast State
 
@@ -164,8 +130,6 @@ val listState  = savepoint.readUnionState(
 The state name and type information should match those used to define the 
`MapStateDescriptor` that declared this state in the DataStream application.
 The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<>(
     "my-uid",
@@ -173,24 +137,11 @@ DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<
     Types.INT,
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val broadcastState = savepoint.readBroadcastState(
-    "my-uid",
-    "broadcast-state",
-    Types.INT,
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
 
 #### Using Custom Serializers
 
 Each of the operator state readers support using custom `TypeSerializers` if 
one was used to define the `StateDescriptor` that wrote out the state. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState = savepoint.readListState<>(
     "uid",
@@ -198,17 +149,6 @@ DataSet<Integer> listState = savepoint.readListState<>(
     Types.INT,
     new MyCustomIntSerializer());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
-    "uid",
-    "list-state", 
-    Types.INT,
-    new MyCustomIntSerializer)
-{% endhighlight %}
-</div>
-</div>
 
 ### Keyed State
 
@@ -218,8 +158,6 @@ When reading a keyed state, users specify the operator id 
and a `KeyedStateReade
 The `KeyedStateReaderFunction` allows users to read arbitrary columns and 
complex state types such as ListState, MapState, and AggregatingState.
 This means if an operator contains a stateful process function such as:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, 
Integer, Void> {
  
@@ -243,36 +181,9 @@ public class StatefulFunctionWithTime extends 
KeyedProcessFunction<Integer, Inte
    }
 }
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
-  var state: ValueState[Int] = _
-  var updateTimes: ListState[Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
-    state = getRuntimeContext().getState(stateDescriptor)
-
-    val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
-    updateTimes = getRuntimeContext().getListState(updateDescriptor)
-  }
-
-  @throws[Exception]
-  override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, 
Void]#Context, out: Collector[Void]): Unit = {
-    state.update(value + 1)
-    updateTimes.add(System.currentTimeMillis)
-  }
-}
-{% endhighlight %}
-</div>
-</div>
 
 Then it can read by defining an output type and corresponding 
`KeyedStateReaderFunction`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
 
@@ -316,30 +227,6 @@ public class ReaderFunction extends 
KeyedStateReaderFunction<Integer, KeyedState
   }
 }
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] {
-  var state: ValueState[Int] = _
-  var updateTimes: ListState[Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
-    state = getRuntimeContext().getState(stateDescriptor)
-
-    val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
-    updateTimes = getRuntimeContext().getListState(updateDescriptor)
-  }
-
-  override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out: 
Collector[KeyedState]): Unit = {
-    val data = KeyedState(key, state.value, updateTimes.get.asScala.toList)
-    out.collect(data)
-  }
-}
-{% endhighlight %}
-</div>
-</div>
 
 Along with reading registered state values, each key has access to a `Context` 
with metadata such as registered event time and processing time timers.
 
@@ -355,10 +242,7 @@ to a `WindowFunction` or `ProcessWindowFunction`.
 
 Suppose a DataStream application that counts the number of clicks per user per 
minute.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-
 class Click {
     public String userId;
 
@@ -398,42 +282,9 @@ clicks
     .addSink(new Sink());
 
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger}
-
-case class Click(userId: String, time: LocalDateTime)
-
-class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
-
-       override def createAccumulator(): JInteger = 0
-
-    override def add(value: Click, accumulator: JInteger): JInteger = 1 + 
accumulator
-    
-    override def getResult(accumulator: JInteger): JInteger = accumulator
-
-    override def merge(a: JInteger, b: JInteger): JInteger = a + b
-}
-
-DataStream[Click] clicks = . . . 
-
-clicks
-    .keyBy(click => click.userId)
-    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
-    .aggregate(new ClickCounter())
-    .uid("click-window")
-    .addSink(new Sink())
-
-{% endhighlight %}
-</div>
-</div>
 
 This state can be read using the code below.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 
 class ClickState {
@@ -470,44 +321,6 @@ savepoint
     .print();
 
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger, Long => JLong}
-import java.util.{Set => JSet}
-
-case class ClickState(userId: String, count: JInteger, window: TimeWindow, 
triggerTimers: JSet[JLong])
-
-class ClickReader extends WindowReaderFunction[JInteger, ClickState, String, 
TimeWindow] { 
-
-       override def readWindow(
-           key: String,
-           context: Context[TimeWindow],
-           elements: Iterable[JInteger],
-           out: Collector[ClickState]): Unit = {
-                
-               state = ClickState(
-                   userId = key,
-                   count = elements.iterator().next(),
-                   window = context.window()k
-                   triggerTimers = context.registeredEventTimeTimers())
-                   
-               out.collect(state)
-       }
-}
-
-val batchEnv = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new 
MemoryStateBackend())
-
-savepoint
-    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
-    .aggregate("click-window", new ClickCounter(), new ClickReader(), 
Types.String, Types.INT, Types.INT)
-    .print()
-
-{% endhighlight %}
-</div>
-</div>
 
 Additionally, trigger state - from `CountTrigger`s or custom triggers - can be 
read using the method
 `Context#triggerState` inside the `WindowReaderFunction`.
@@ -517,8 +330,10 @@ Additionally, trigger state - from `CountTrigger`s or 
custom triggers - can be r
 `Savepoint`'s may also be written, which allows such use cases as 
bootstrapping state based on historical data.
 Each savepoint is made up of one or more `BootstrapTransformation`'s 
(explained below), each of which defines the state for an individual operator.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+**Note** The state processor api does not currently provide a Scala API. As a 
result
+it will always auto-derive serializers using the Java type stack. To bootstrap 
+a savepoint for the Scala DataStream API please manually pass in all type 
information.
+
 {% highlight java %}
 int maxParallelism = 128;
 
@@ -528,19 +343,6 @@ Savepoint
     .withOperator("uid2", transformation2)
     .write(savepointPath);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val maxParallelism = 128
-
-Savepoint
-    .create(new MemoryStateBackend(), maxParallelism)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
 
 The [UIDs]({% link ops/state/savepoints.md %}#assigning-operator-ids) 
associated with each operator must match one to one with the UIDs assigned to 
the operators in your `DataStream` application; these are how Flink knows what 
state maps to which operator.
 
@@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.md 
%}#assigning-operator-ids) associated
 
 Simple operator state, using `CheckpointedFunction`, can be created using the 
`StateBootstrapFunction`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
 
@@ -577,44 +377,11 @@ BootstrapTransformation transformation = 
OperatorTransformation
     .bootstrapWith(data)
     .transform(new SimpleBootstrapFunction());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
-
-    var ListState[Integer] state = _
-
-    @throws[Exception]
-    override def processElement(value: Integer, ctx: Context): Unit = {
-        state.add(value)
-    }
-
-    @throws[Exception]
-    override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    }
-       
-    @throws[Exception]
-    override def initializeState(context: FunctionInitializationContext): Unit 
= {
-        state = context.getOperatorState().getListState(new 
ListStateDescriptor("state", Types.INT))
-    }
-}
-
-val env = ExecutionEnvironment.getExecutionEnvironment
-val data = env.fromElements(1, 2, 3)
-
-BootstrapTransformation transformation = OperatorTransformation
-    .bootstrapWith(data)
-    .transform(new SimpleBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
 
 ### Broadcast State
 
 [BroadcastState]({% link dev/stream/state/broadcast_state.md %}) can be 
written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state 
in the `DataStream` API, the full state must fit in memory. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class CurrencyRate {
     public String currency;
@@ -640,38 +407,11 @@ BootstrapTransformation<CurrencyRate> 
broadcastTransformation = OperatorTransfor
     .bootstrapWith(currencyDataSet)
     .transform(new CurrencyBootstrapFunction());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class CurrencyRate(currency: String, rate: Double)
-
-object CurrencyBootstrapFunction {
-    val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, 
Types.DOUBLE)
-}
-
-class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction[CurrencyRate] {
-
-    @throws[Exception]
-    override processElement(value: CurrencyRate, ctx: Context): Unit = {
-        ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
-    }
-}
-
-val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), 
CurrencyRate("EUR", 1.3))
-
-val broadcastTransformation = OperatorTransformation
-    .bootstrapWith(currencyDataSet)
-    .transform(new CurrencyBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
 
 ### Keyed State
 
 Keyed state for `ProcessFunction`'s and other `RichFunction` types can be 
written using a `KeyedStateBootstrapFunction`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class Account {
     public int id;
@@ -705,37 +445,6 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
     .keyBy(acc -> acc.id)
     .transform(new AccountBootstrapper());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, 
Account] {
-    var state: ValueState[Double]
-
-    @throws[Exception]
-    override def open(parameters: Configuration): Unit = {
-        val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
-        state = getRuntimeContext().getState(descriptor)
-    }
-
-    @throws[Exception]
-    override def processElement(value: Account, ctx: Context): Unit = {
-        state.update(value.amount)
-    }
-}
- 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment()
-
-val accountDataSet = bEnv.fromCollection(accounts)
-
-val transformation = OperatorTransformation
-    .bootstrapWith(accountDataSet)
-    .keyBy(acc => acc.id)
-    .transform(new AccountBootstrapper)
-{% endhighlight %}
-</div>
-</div>
 
 The `KeyedStateBootstrapFunction` supports setting event time and processing 
time timers.
 The timers will not fire inside the bootstrap function and only become active 
once restored within a `DataStream` application.
@@ -749,8 +458,6 @@ The state processor api supports writing state for the 
[window operator]({% link
 When writing window state, users specify the operator id, window assigner, 
evictor, optional trigger, and aggregation type.
 It is important the configurations on the bootstrap transformation match the 
configurations on the DataStream window.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class Account {
     public int id;
@@ -773,45 +480,14 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
     .window(TumblingEventTimeWindows.of(Time.minutes(5)))
     .reduce((left, right) -> left + right);
 {% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
- 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment();
-val accountDataSet = bEnv.fromCollection(accounts);
-
-val transformation = OperatorTransformation
-    .bootstrapWith(accountDataSet)
-    // When using event time windows, its important
-    // to assign timestamps to each record.
-    .assignTimestamps(account => account.timestamp)
-    .keyBy(acc => acc.id)
-    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
-    .reduce((left, right) => left + right)
-{% endhighlight %}
-</div>
-</div>
 
 ## Modifying Savepoints
 
 Besides creating a savepoint from scratch, you can base one off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 Savepoint
     .load(bEnv, new MemoryStateBackend(), oldPath)
     .withOperator("uid", transformation)
     .write(newPath);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
-    .load(bEnv, new MemoryStateBackend, oldPath)
-    .withOperator("uid", transformation)
-    .write(newPath)
-{% endhighlight %}
-</div>
-</div>
diff --git a/docs/dev/libs/state_processor_api.zh.md 
b/docs/dev/libs/state_processor_api.zh.md
index 581d3eb..18d505a 100644
--- a/docs/dev/libs/state_processor_api.zh.md
+++ b/docs/dev/libs/state_processor_api.zh.md
@@ -87,20 +87,10 @@ Since the operator “Snk” does not have any state, its 
namespace is empty.
 Reading state begins by specifying the path to a valid savepoint or checkpoint 
along with the `StateBackend` that should be used to restore the data.
 The compatibility guarantees for restoring state are identical to those when 
restoring a `DataStream` application.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
 ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new 
MemoryStateBackend());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val bEnv      = ExecutionEnvironment.getExecutionEnvironment
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend)
-{% endhighlight %}
-</div>
-</div>
 
 ### Operator State
 
@@ -113,24 +103,12 @@ When reading operator state, users specify the operator 
uid, the state name, and
 Operator state stored in a `CheckpointedFunction` using `getListState` can be 
read using `ExistingSavepoint#readListState`.
 The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState  = savepoint.readListState<>(
     "my-uid",
     "list-state",
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState  = savepoint.readListState(
-    "my-uid",
-    "list-state",
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
 
 #### Operator Union List State
 
@@ -138,25 +116,12 @@ Operator state stored in a `CheckpointedFunction` using 
`getUnionListState` can
 The state name and type information should match those used to define the 
`ListStateDescriptor` that declared this state in the DataStream application.
 The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState  = savepoint.readUnionState<>(
     "my-uid",
     "union-state",
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState  = savepoint.readUnionState(
-    "my-uid",
-    "union-state",
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
-
 
 #### Broadcast State
 
@@ -164,8 +129,6 @@ val listState  = savepoint.readUnionState(
 The state name and type information should match those used to define the 
`MapStateDescriptor` that declared this state in the DataStream application.
 The framework will return a _single_ copy of the state, equivalent to 
restoring a DataStream with parallelism 1.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<>(
     "my-uid",
@@ -173,24 +136,11 @@ DataSet<Tuple2<Integer, Integer>> broadcastState = 
savepoint.readBroadcastState<
     Types.INT,
     Types.INT);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val broadcastState = savepoint.readBroadcastState(
-    "my-uid",
-    "broadcast-state",
-    Types.INT,
-    Types.INT)
-{% endhighlight %}
-</div>
-</div>
 
 #### Using Custom Serializers
 
 Each of the operator state readers support using custom `TypeSerializers` if 
one was used to define the `StateDescriptor` that wrote out the state. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<Integer> listState = savepoint.readListState<>(
     "uid",
@@ -198,17 +148,6 @@ DataSet<Integer> listState = savepoint.readListState<>(
     Types.INT,
     new MyCustomIntSerializer());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val listState = savepoint.readListState(
-    "uid",
-    "list-state", 
-    Types.INT,
-    new MyCustomIntSerializer)
-{% endhighlight %}
-</div>
-</div>
 
 ### Keyed State
 
@@ -218,8 +157,6 @@ When reading a keyed state, users specify the operator id 
and a `KeyedStateReade
 The `KeyedStateReaderFunction` allows users to read arbitrary columns and 
complex state types such as ListState, MapState, and AggregatingState.
 This means if an operator contains a stateful process function such as:
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, 
Integer, Void> {
  
@@ -243,36 +180,9 @@ public class StatefulFunctionWithTime extends 
KeyedProcessFunction<Integer, Inte
    }
 }
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class StatefulFunctionWithTime extends KeyedProcessFunction[Int, Int, Void] {
-  var state: ValueState[Int] = _
-  var updateTimes: ListState[Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
-    state = getRuntimeContext().getState(stateDescriptor)
-
-    val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
-    updateTimes = getRuntimeContext().getListState(updateDescriptor)
-  }
-
-  @throws[Exception]
-  override def processElement(value: Int, ctx: KeyedProcessFunction[Int, Int, 
Void]#Context, out: Collector[Void]): Unit = {
-    state.update(value + 1)
-    updateTimes.add(System.currentTimeMillis)
-  }
-}
-{% endhighlight %}
-</div>
-</div>
 
 Then it can read by defining an output type and corresponding 
`KeyedStateReaderFunction`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new 
ReaderFunction());
 
@@ -316,30 +226,6 @@ public class ReaderFunction extends 
KeyedStateReaderFunction<Integer, KeyedState
   }
 }
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class ReaderFunction extends KeyedStateReaderFunction[Int, KeyedState] {
-  var state: ValueState[Int] = _
-  var updateTimes: ListState[Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val stateDescriptor = new ValueStateDescriptor("state", 
createTypeInformation[Int])
-    state = getRuntimeContext().getState(stateDescriptor)
-
-    val updateDescriptor = new ListStateDescriptor("times", 
createTypeInformation[Long])
-    updateTimes = getRuntimeContext().getListState(updateDescriptor)
-  }
-
-  override def readKey(key: Int, ctx: KeyedStateReaderFunction.Context, out: 
Collector[KeyedState]): Unit = {
-    val data = KeyedState(key, state.value, updateTimes.get.asScala.toList)
-    out.collect(data)
-  }
-}
-{% endhighlight %}
-</div>
-</div>
 
 Along with reading registered state values, each key has access to a `Context` 
with metadata such as registered event time and processing time timers.
 
@@ -355,10 +241,7 @@ to a `WindowFunction` or `ProcessWindowFunction`.
 
 Suppose a DataStream application that counts the number of clicks per user per 
minute.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
-
 class Click {
     public String userId;
 
@@ -398,42 +281,9 @@ clicks
     .addSink(new Sink());
 
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger}
-
-case class Click(userId: String, time: LocalDateTime)
-
-class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
-
-       override def createAccumulator(): JInteger = 0
-
-    override def add(value: Click, accumulator: JInteger): JInteger = 1 + 
accumulator
-    
-    override def getResult(accumulator: JInteger): JInteger = accumulator
-
-    override def merge(a: JInteger, b: JInteger): JInteger = a + b
-}
-
-DataStream[Click] clicks = . . . 
-
-clicks
-    .keyBy(click => click.userId)
-    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
-    .aggregate(new ClickCounter())
-    .uid("click-window")
-    .addSink(new Sink())
-
-{% endhighlight %}
-</div>
-</div>
 
 This state can be read using the code below.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 
 class ClickState {
@@ -470,44 +320,6 @@ savepoint
     .print();
 
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-
-import java.lang.{Integer => JInteger, Long => JLong}
-import java.util.{Set => JSet}
-
-case class ClickState(userId: String, count: JInteger, window: TimeWindow, 
triggerTimers: JSet[JLong])
-
-class ClickReader extends WindowReaderFunction[JInteger, ClickState, String, 
TimeWindow] { 
-
-       override def readWindow(
-           key: String,
-           context: Context[TimeWindow],
-           elements: Iterable[JInteger],
-           out: Collector[ClickState]): Unit = {
-                
-               state = ClickState(
-                   userId = key,
-                   count = elements.iterator().next(),
-                   window = context.window()k
-                   triggerTimers = context.registeredEventTimeTimers())
-                   
-               out.collect(state)
-       }
-}
-
-val batchEnv = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new 
MemoryStateBackend())
-
-savepoint
-    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
-    .aggregate("click-window", new ClickCounter(), new ClickReader(), 
Types.String, Types.INT, Types.INT)
-    .print()
-
-{% endhighlight %}
-</div>
-</div>
 
 Additionally, trigger state - from `CountTrigger`s or custom triggers - can be 
read using the method
 `Context#triggerState` inside the `WindowReaderFunction`.
@@ -517,8 +329,10 @@ Additionally, trigger state - from `CountTrigger`s or 
custom triggers - can be r
 `Savepoint`'s may also be written, which allows such use cases as 
bootstrapping state based on historical data.
 Each savepoint is made up of one or more `BootstrapTransformation`'s 
(explained below), each of which defines the state for an individual operator.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+**Note** The state processor api does not currently provide a Scala API. As a 
result
+it will always auto-derive serializers using the Java type stack. To bootstrap 
+a savepoint for the Scala DataStream API please manually pass in all type 
information. 
+
 {% highlight java %}
 int maxParallelism = 128;
 
@@ -528,19 +342,7 @@ Savepoint
     .withOperator("uid2", transformation2)
     .write(savepointPath);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val maxParallelism = 128
 
-Savepoint
-    .create(new MemoryStateBackend(), maxParallelism)
-    .withOperator("uid1", transformation1)
-    .withOperator("uid2", transformation2)
-    .write(savepointPath)
-{% endhighlight %}
-</div>
-</div>
 
 The [UIDs]({% link ops/state/savepoints.zh.md %}#assigning-operator-ids) 
associated with each operator must match one to one with the UIDs assigned to 
the operators in your `DataStream` application; these are how Flink knows what 
state maps to which operator.
 
@@ -548,8 +350,6 @@ The [UIDs]({% link ops/state/savepoints.zh.md 
%}#assigning-operator-ids) associa
 
 Simple operator state, using `CheckpointedFunction`, can be created using the 
`StateBootstrapFunction`. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
 
@@ -577,44 +377,11 @@ BootstrapTransformation transformation = 
OperatorTransformation
     .bootstrapWith(data)
     .transform(new SimpleBootstrapFunction());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-class SimpleBootstrapFunction extends StateBootstrapFunction[Integer] {
-
-    var ListState[Integer] state = _
-
-    @throws[Exception]
-    override def processElement(value: Integer, ctx: Context): Unit = {
-        state.add(value)
-    }
-
-    @throws[Exception]
-    override def snapshotState(context: FunctionSnapshotContext): Unit = {
-    }
-       
-    @throws[Exception]
-    override def initializeState(context: FunctionInitializationContext): Unit 
= {
-        state = context.getOperatorState().getListState(new 
ListStateDescriptor("state", Types.INT))
-    }
-}
-
-val env = ExecutionEnvironment.getExecutionEnvironment
-val data = env.fromElements(1, 2, 3)
-
-BootstrapTransformation transformation = OperatorTransformation
-    .bootstrapWith(data)
-    .transform(new SimpleBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
 
 ### Broadcast State
 
 [BroadcastState]({% link dev/stream/state/broadcast_state.zh.md %}) can be 
written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state 
in the `DataStream` API, the full state must fit in memory. 
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class CurrencyRate {
     public String currency;
@@ -640,38 +407,11 @@ BootstrapTransformation<CurrencyRate> 
broadcastTransformation = OperatorTransfor
     .bootstrapWith(currencyDataSet)
     .transform(new CurrencyBootstrapFunction());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class CurrencyRate(currency: String, rate: Double)
-
-object CurrencyBootstrapFunction {
-    val descriptor = new MapStateDescriptor("currency-rates", Types.STRING, 
Types.DOUBLE)
-}
-
-class CurrencyBootstrapFunction extends 
BroadcastStateBootstrapFunction[CurrencyRate] {
-
-    @throws[Exception]
-    override processElement(value: CurrencyRate, ctx: Context): Unit = {
-        ctx.getBroadcastState(descriptor).put(value.currency, value.rate)
-    }
-}
-
-val currencyDataSet = bEnv.fromCollection(CurrencyRate("USD", 1.0), 
CurrencyRate("EUR", 1.3))
-
-val broadcastTransformation = OperatorTransformation
-    .bootstrapWith(currencyDataSet)
-    .transform(new CurrencyBootstrapFunction)
-{% endhighlight %}
-</div>
-</div>
 
 ### Keyed State
 
 Keyed state for `ProcessFunction`'s and other `RichFunction` types can be 
written using a `KeyedStateBootstrapFunction`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class Account {
     public int id;
@@ -705,37 +445,6 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
     .keyBy(acc -> acc.id)
     .transform(new AccountBootstrapper());
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
-
-class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, 
Account] {
-    var state: ValueState[Double]
-
-    @throws[Exception]
-    override def open(parameters: Configuration): Unit = {
-        val descriptor = new ValueStateDescriptor("total",Types.DOUBLE)
-        state = getRuntimeContext().getState(descriptor)
-    }
-
-    @throws[Exception]
-    override def processElement(value: Account, ctx: Context): Unit = {
-        state.update(value.amount)
-    }
-}
- 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment()
-
-val accountDataSet = bEnv.fromCollection(accounts)
-
-val transformation = OperatorTransformation
-    .bootstrapWith(accountDataSet)
-    .keyBy(acc => acc.id)
-    .transform(new AccountBootstrapper)
-{% endhighlight %}
-</div>
-</div>
 
 The `KeyedStateBootstrapFunction` supports setting event time and processing 
time timers.
 The timers will not fire inside the bootstrap function and only become active 
once restored within a `DataStream` application.
@@ -749,8 +458,6 @@ The state processor api supports writing state for the 
[window operator]({% link
 When writing window state, users specify the operator id, window assigner, 
evictor, optional trigger, and aggregation type.
 It is important the configurations on the bootstrap transformation match the 
configurations on the DataStream window.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 public class Account {
     public int id;
@@ -773,45 +480,14 @@ BootstrapTransformation<Account> transformation = 
OperatorTransformation
     .window(TumblingEventTimeWindows.of(Time.minutes(5)))
     .reduce((left, right) -> left + right);
 {% endhighlight %}
-</div>
-<div data-lang="java" markdown="1">
-{% highlight scala %}
-case class Account(id: Int, amount: Double, timestamp: Long)
- 
-val bEnv = ExecutionEnvironment.getExecutionEnvironment();
-val accountDataSet = bEnv.fromCollection(accounts);
-
-val transformation = OperatorTransformation
-    .bootstrapWith(accountDataSet)
-    // When using event time windows, its important
-    // to assign timestamps to each record.
-    .assignTimestamps(account => account.timestamp)
-    .keyBy(acc => acc.id)
-    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
-    .reduce((left, right) => left + right)
-{% endhighlight %}
-</div>
-</div>
 
 ## Modifying Savepoints
 
 Besides creating a savepoint from scratch, you can base one off an existing 
savepoint such as when bootstrapping a single new operator for an existing job.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
 {% highlight java %}
 Savepoint
     .load(bEnv, new MemoryStateBackend(), oldPath)
     .withOperator("uid", transformation)
     .write(newPath);
 {% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-Savepoint
-    .load(bEnv, new MemoryStateBackend, oldPath)
-    .withOperator("uid", transformation)
-    .write(newPath)
-{% endhighlight %}
-</div>
-</div>

Reply via email to