[FLINK-6512] [docs] improved code formatting in some examples This closes #3857
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81b6c821 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81b6c821 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81b6c821 Branch: refs/heads/release-1.3 Commit: 81b6c82142ad1c4a8c1288bff754840c65ec4059 Parents: 560db53 Author: David Anderson <[email protected]> Authored: Tue May 9 17:23:46 2017 +0200 Committer: Greg Hogan <[email protected]> Committed: Wed May 10 14:37:39 2017 -0400 ---------------------------------------------------------------------- docs/dev/best_practices.md | 30 ++-- docs/dev/migration.md | 300 +++++++++++++++++++++------------------- 2 files changed, 171 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81b6c821/docs/dev/best_practices.md ---------------------------------------------------------------------- diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md index b2111c4..4dfd7fd 100644 --- a/docs/dev/best_practices.md +++ b/docs/dev/best_practices.md @@ -59,8 +59,8 @@ ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile); This allows getting arguments like `--input hdfs:///mydata --elements 42` from the command line. {% highlight java %} public static void main(String[] args) { - ParameterTool parameter = ParameterTool.fromArgs(args); - // .. regular code .. + ParameterTool parameter = ParameterTool.fromArgs(args); + // .. regular code .. {% endhighlight %} @@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a `Configuration` object t {% highlight java %} ParameterTool parameters = ParameterTool.fromArgs(args); -DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration()) +DataSet<Tuple2<String, Integer>> counts = text + .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration()) {% endhighlight %} In the `Tokenizer`, the object is now accessible in the `open(Configuration conf)` method: {% highlight java %} public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { - @Override - public void open(Configuration parameters) throws Exception { - parameters.getInteger("myInt", -1); - // .. do + @Override + public void open(Configuration parameters) throws Exception { + parameters.getInteger("myInt", -1); + // .. do {% endhighlight %} @@ -147,11 +148,12 @@ Access them in any rich user function: {% highlight java %} public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { - @Override - public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { - ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); - parameters.getRequired("input"); - // .. do more .. + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + ParameterTool parameters = (ParameterTool) + getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); + parameters.getRequired("input"); + // .. do more .. {% endhighlight %} @@ -198,8 +200,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyClass implements MapFunction { - private static final Logger LOG = LoggerFactory.getLogger(MyClass.class); - // ... + private static final Logger LOG = LoggerFactory.getLogger(MyClass.class); + // ... {% endhighlight %} http://git-wip-us.apache.org/repos/asf/flink/blob/81b6c821/docs/dev/migration.md ---------------------------------------------------------------------- diff --git a/docs/dev/migration.md b/docs/dev/migration.md index a5910a8..11eb42c 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -51,69 +51,70 @@ As running examples for the remainder of this document we will use the `CountMap functions. The first is an example of a function with **keyed** state, while the second has **non-keyed** state. The code for the aforementioned two functions in Flink 1.1 is presented below: - public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { +{% highlight java %} +public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> { - private transient ValueState<Integer> counter; + private transient ValueState<Integer> counter; - private final int numberElements; + private final int numberElements; - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } + public CountMapper(int numberElements) { + this.numberElements = numberElements; + } - @Override - public void open(Configuration parameters) throws Exception { - counter = getRuntimeContext().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } + @Override + public void open(Configuration parameters) throws Exception { + counter = getRuntimeContext().getState( + new ValueStateDescriptor<>("counter", Integer.class, 0)); + } - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); + @Override + public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { + int count = counter.value() + 1; + counter.update(count); - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } + if (count % numberElements == 0) { + out.collect(Tuple2.of(value.f0, count)); + counter.update(0); // reset to 0 } } +} +public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, + Checkpointed<ArrayList<Tuple2<String, Integer>>> { - public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - Checkpointed<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; + private final int threshold; - private ArrayList<Tuple2<String, Integer>> bufferedElements; + private ArrayList<Tuple2<String, Integer>> bufferedElements; - BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } + BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink } + bufferedElements.clear(); + } + } - @Override - public ArrayList<Tuple2<String, Integer>> snapshotState( - long checkpointId, long checkpointTimestamp) throws Exception { - return bufferedElements; - } + @Override + public ArrayList<Tuple2<String, Integer>> snapshotState( + long checkpointId, long checkpointTimestamp) throws Exception { + return bufferedElements; + } - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - bufferedElements.addAll(state); - } + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + bufferedElements.addAll(state); } +} +{% endhighlight %} The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key input stream of the form @@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl }}/dev/stream/state.html). The `ListCheckpointed` interface requires the implementation of two methods: - List<T> snapshotState(long checkpointId, long timestamp) throws Exception; +{% highlight java %} +List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - void restoreState(List<T> state) throws Exception; +void restoreState(List<T> state) throws Exception; +{% endhighlight %} Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference is that now `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and @@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of objects to checkpoint, as return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The updated code for `BufferingSink` is included below: - public class BufferingSinkListCheckpointed implements - SinkFunction<Tuple2<String, Integer>>, - ListCheckpointed<Tuple2<String, Integer>>, - CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { +{% highlight java %} +public class BufferingSinkListCheckpointed implements + SinkFunction<Tuple2<String, Integer>>, + ListCheckpointed<Tuple2<String, Integer>>, + CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - private final int threshold; + private final int threshold; - private transient ListState<Tuple2<String, Integer>> checkpointedState; + private transient ListState<Tuple2<String, Integer>> checkpointedState; - private List<Tuple2<String, Integer>> bufferedElements; + private List<Tuple2<String, Integer>> bufferedElements; - public BufferingSinkListCheckpointed(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } + public BufferingSinkListCheckpointed(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - this.bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + this.bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink } + bufferedElements.clear(); } + } - @Override - public List<Tuple2<String, Integer>> snapshotState( - long checkpointId, long timestamp) throws Exception { - return this.bufferedElements; - } - - @Override - public void restoreState(List<Tuple2<String, Integer>> state) throws Exception { - if (!state.isEmpty()) { - this.bufferedElements.addAll(state); - } - } + @Override + public List<Tuple2<String, Integer>> snapshotState( + long checkpointId, long timestamp) throws Exception { + return this.bufferedElements; + } - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. + @Override + public void restoreState(List<Tuple2<String, Integer>> state) throws Exception { + if (!state.isEmpty()) { this.bufferedElements.addAll(state); } } + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); + } +} +{% endhighlight %} + As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards compatibility reasons and more details will be explained at the end of this section. @@ -224,9 +229,11 @@ compatibility reasons and more details will be explained at the end of this sect The `CheckpointedFunction` interface requires again the implementation of two methods: - void snapshotState(FunctionSnapshotContext context) throws Exception; +{% highlight java %} +void snapshotState(FunctionSnapshotContext context) throws Exception; - void initializeState(FunctionInitializationContext context) throws Exception; +void initializeState(FunctionInitializationContext context) throws Exception; +{% endhighlight %} As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is performed, but now `initializeState()` (which is the counterpart of the `restoreState()`) is called every time the user-defined function is initialized, rather than only @@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given this, `initializeState( types of state are initialized, but also where state recovery logic is included. An implementation of the `CheckpointedFunction` interface for `BufferingSink` is presented below. - public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, - CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { +{% highlight java %} +public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, + CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - private final int threshold; + private final int threshold; - private transient ListState<Tuple2<String, Integer>> checkpointedState; + private transient ListState<Tuple2<String, Integer>> checkpointedState; - private List<Tuple2<String, Integer>> bufferedElements; + private List<Tuple2<String, Integer>> bufferedElements; - public BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } + public BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink } + bufferedElements.clear(); } + } - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - checkpointedState.clear(); - for (Tuple2<String, Integer> element : bufferedElements) { - checkpointedState.add(element); - } + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkpointedState.clear(); + for (Tuple2<String, Integer> element : bufferedElements) { + checkpointedState.add(element); } + } - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - checkpointedState = context.getOperatorStateStore(). - getSerializableListState("buffered-elements"); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + checkpointedState = context.getOperatorStateStore(). + getSerializableListState("buffered-elements"); - if (context.isRestored()) { - for (Tuple2<String, Integer> element : checkpointedState.get()) { - bufferedElements.add(element); - } + if (context.isRestored()) { + for (Tuple2<String, Integer> element : checkpointedState.get()) { + bufferedElements.add(element); } } + } - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); } +} +{% endhighlight %} The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects @@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface was to be used in the `Co the old `open()` method could be removed and the new `snapshotState()` and `initializeState()` methods would look like this: - public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> - implements CheckpointedFunction { +{% highlight java %} +public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> + implements CheckpointedFunction { - private transient ValueState<Integer> counter; + private transient ValueState<Integer> counter; - private final int numberElements; + private final int numberElements; - public CountMapper(int numberElements) { - this.numberElements = numberElements; - } + public CountMapper(int numberElements) { + this.numberElements = numberElements; + } - @Override - public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { - int count = counter.value() + 1; - counter.update(count); + @Override + public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception { + int count = counter.value() + 1; + counter.update(count); - if (count % numberElements == 0) { - out.collect(Tuple2.of(value.f0, count)); - counter.update(0); // reset to 0 - } - } + if (count % numberElements == 0) { + out.collect(Tuple2.of(value.f0, count)); + counter.update(0); // reset to 0 } + } - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - //all managed, nothing to do. - } + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // all managed, nothing to do. + } - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - counter = context.getKeyedStateStore().getState( - new ValueStateDescriptor<>("counter", Integer.class, 0)); - } + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + counter = context.getKeyedStateStore().getState( + new ValueStateDescriptor<>("counter", Integer.class, 0)); } +} +{% endhighlight %} Notice that the `snapshotState()` method is empty as Flink itself takes care of snapshotting managed keyed state upon checkpointing.
