[FLINK-6512] [docs] improved code formatting in some examples

This closes #3857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81b6c821
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81b6c821
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81b6c821

Branch: refs/heads/release-1.3
Commit: 81b6c82142ad1c4a8c1288bff754840c65ec4059
Parents: 560db53
Author: David Anderson <[email protected]>
Authored: Tue May 9 17:23:46 2017 +0200
Committer: Greg Hogan <[email protected]>
Committed: Wed May 10 14:37:39 2017 -0400

----------------------------------------------------------------------
 docs/dev/best_practices.md |  30 ++--
 docs/dev/migration.md      | 300 +++++++++++++++++++++-------------------
 2 files changed, 171 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81b6c821/docs/dev/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/dev/best_practices.md b/docs/dev/best_practices.md
index b2111c4..4dfd7fd 100644
--- a/docs/dev/best_practices.md
+++ b/docs/dev/best_practices.md
@@ -59,8 +59,8 @@ ParameterTool parameter = 
ParameterTool.fromPropertiesFile(propertiesFile);
 This allows getting arguments like `--input hdfs:///mydata --elements 42` from 
the command line.
 {% highlight java %}
 public static void main(String[] args) {
-       ParameterTool parameter = ParameterTool.fromArgs(args);
-       // .. regular code ..
+    ParameterTool parameter = ParameterTool.fromArgs(args);
+    // .. regular code ..
 {% endhighlight %}
 
 
@@ -114,17 +114,18 @@ The example below shows how to pass the parameters as a 
`Configuration` object t
 
 {% highlight java %}
 ParameterTool parameters = ParameterTool.fromArgs(args);
-DataSet<Tuple2<String, Integer>> counts = text.flatMap(new 
Tokenizer()).withParameters(parameters.getConfiguration())
+DataSet<Tuple2<String, Integer>> counts = text
+        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
 {% endhighlight %}
 
 In the `Tokenizer`, the object is now accessible in the `open(Configuration 
conf)` method:
 
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, 
Tuple2<String, Integer>> {
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               parameters.getInteger("myInt", -1);
-               // .. do
+    @Override
+    public void open(Configuration parameters) throws Exception {
+       parameters.getInteger("myInt", -1);
+       // .. do
 {% endhighlight %}
 
 
@@ -147,11 +148,12 @@ Access them in any rich user function:
 {% highlight java %}
 public static final class Tokenizer extends RichFlatMapFunction<String, 
Tuple2<String, Integer>> {
 
-       @Override
-       public void flatMap(String value, Collector<Tuple2<String, Integer>> 
out) {
-               ParameterTool parameters = (ParameterTool) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-               parameters.getRequired("input");
-               // .. do more ..
+    @Override
+    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+       ParameterTool parameters = (ParameterTool)
+           getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+       parameters.getRequired("input");
+       // .. do more ..
 {% endhighlight %}
 
 
@@ -198,8 +200,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class MyClass implements MapFunction {
-       private static final Logger LOG = 
LoggerFactory.getLogger(MyClass.class);
-       // ...
+    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+    // ...
 {% endhighlight %}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/81b6c821/docs/dev/migration.md
----------------------------------------------------------------------
diff --git a/docs/dev/migration.md b/docs/dev/migration.md
index a5910a8..11eb42c 100644
--- a/docs/dev/migration.md
+++ b/docs/dev/migration.md
@@ -51,69 +51,70 @@ As running examples for the remainder of this document we 
will use the `CountMap
 functions. The first is an example of a function with **keyed** state, while
 the second has **non-keyed** state. The code for the aforementioned two 
functions in Flink 1.1 is presented below:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>> {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>> {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            counter = getRuntimeContext().getState(
-               new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        counter = getRuntimeContext().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-           counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-           if (count % numberElements == 0) {
-                   out.collect(Tuple2.of(value.f0, count));
-                   counter.update(0); // reset to 0
-           }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
     }
+}
 
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+    Checkpointed<ArrayList<Tuple2<String, Integer>>> {
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>,
-            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
-
-           private final int threshold;
+    private final int threshold;
 
-           private ArrayList<Tuple2<String, Integer>> bufferedElements;
+    private ArrayList<Tuple2<String, Integer>> bufferedElements;
 
-           BufferingSink(int threshold) {
-                   this.threshold = threshold;
-                   this.bufferedElements = new ArrayList<>();
-           }
+    BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-       @Override
-           public void invoke(Tuple2<String, Integer> value) throws Exception {
-                   bufferedElements.add(value);
-                   if (bufferedElements.size() == threshold) {
-                           for (Tuple2<String, Integer> element: 
bufferedElements) {
-                                   // send it to the sink
-                           }
-                           bufferedElements.clear();
-                   }
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+               // send it to the sink
            }
+           bufferedElements.clear();
+       }
+    }
 
-           @Override
-           public ArrayList<Tuple2<String, Integer>> snapshotState(
-                   long checkpointId, long checkpointTimestamp) throws 
Exception {
-                   return bufferedElements;
-           }
+    @Override
+    public ArrayList<Tuple2<String, Integer>> snapshotState(
+        long checkpointId, long checkpointTimestamp) throws Exception {
+           return bufferedElements;
+    }
 
-           @Override
-           public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
-               bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
+        bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 
 The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped-by-key 
input stream of the form
@@ -160,9 +161,11 @@ the [State documentation]({{ site.baseurl 
}}/dev/stream/state.html).
 
 The `ListCheckpointed` interface requires the implementation of two methods:
 
-    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 
-    void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
 
 Their semantics are the same as their counterparts in the old `Checkpointed` 
interface. The only difference
 is that now `snapshotState()` should return a list of objects to checkpoint, 
as stated earlier, and
@@ -170,53 +173,55 @@ is that now `snapshotState()` should return a list of 
objects to checkpoint, as
 return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. The 
updated code for `BufferingSink`
 is included below:
 
-    public class BufferingSinkListCheckpointed implements
-            SinkFunction<Tuple2<String, Integer>>,
-            ListCheckpointed<Tuple2<String, Integer>>,
-            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSinkListCheckpointed implements
+        SinkFunction<Tuple2<String, Integer>>,
+        ListCheckpointed<Tuple2<String, Integer>>,
+        CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSinkListCheckpointed(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSinkListCheckpointed(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            this.bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        this.bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public List<Tuple2<String, Integer>> snapshotState(
-                long checkpointId, long timestamp) throws Exception {
-            return this.bufferedElements;
-        }
-
-        @Override
-        public void restoreState(List<Tuple2<String, Integer>> state) throws 
Exception {
-            if (!state.isEmpty()) {
-                this.bufferedElements.addAll(state);
-            }
-        }
+    @Override
+    public List<Tuple2<String, Integer>> snapshotState(
+            long checkpointId, long timestamp) throws Exception {
+        return this.bufferedElements;
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
-            // this is from the CheckpointedRestoring interface.
+    @Override
+    public void restoreState(List<Tuple2<String, Integer>> state) throws 
Exception {
+        if (!state.isEmpty()) {
             this.bufferedElements.addAll(state);
         }
     }
 
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
+    }
+}
+{% endhighlight %}
+
 As shown in the code, the updated function also implements the 
`CheckpointedRestoring` interface. This is for backwards
 compatibility reasons and more details will be explained at the end of this 
section.
 
@@ -224,9 +229,11 @@ compatibility reasons and more details will be explained 
at the end of this sect
 
 The `CheckpointedFunction` interface requires again the implementation of two 
methods:
 
-    void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
 
-    void initializeState(FunctionInitializationContext context) throws 
Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
 
 As in Flink 1.1, `snapshotState()` is called whenever a checkpoint is 
performed, but now `initializeState()` (which is
 the counterpart of the `restoreState()`) is called every time the user-defined 
function is initialized, rather than only
@@ -234,57 +241,59 @@ in the case that we are recovering from a failure. Given 
this, `initializeState(
 types of state are initialized, but also where state recovery logic is 
included. An implementation of the
 `CheckpointedFunction` interface for `BufferingSink` is presented below.
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, 
Integer>>,
-            CheckpointedFunction, 
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
+        CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, 
Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSink(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            checkpointedState.clear();
-            for (Tuple2<String, Integer> element : bufferedElements) {
-                checkpointedState.add(element);
-            }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
         }
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) 
throws Exception {
-            checkpointedState = context.getOperatorStateStore().
-                getSerializableListState("buffered-elements");
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        checkpointedState = context.getOperatorStateStore().
+            getSerializableListState("buffered-elements");
 
-            if (context.isRestored()) {
-                for (Tuple2<String, Integer> element : 
checkpointedState.get()) {
-                    bufferedElements.add(element);
-                }
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
             }
         }
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) 
throws Exception {
-            // this is from the CheckpointedRestoring interface.
-            this.bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 The `initializeState` takes as argument a `FunctionInitializationContext`. 
This is used to initialize
 the non-keyed state "container". This is a container of type `ListState` where 
the non-keyed state objects
@@ -305,40 +314,41 @@ for Flink 1.1. If the `CheckpointedFunction` interface 
was to be used in the `Co
 the old `open()` method could be removed and the new `snapshotState()` and 
`initializeState()` methods
 would look like this:
 
-    public class CountMapper extends RichFlatMapFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>>
-            implements CheckpointedFunction {
+{% highlight java %}
+public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, 
Tuple2<String, Integer>>
+        implements CheckpointedFunction {
 
-        private transient ValueState<Integer> counter;
+    private transient ValueState<Integer> counter;
 
-        private final int numberElements;
+    private final int numberElements;
 
-        public CountMapper(int numberElements) {
-            this.numberElements = numberElements;
-        }
+    public CountMapper(int numberElements) {
+        this.numberElements = numberElements;
+    }
 
-        @Override
-        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-            int count = counter.value() + 1;
-            counter.update(count);
+    @Override
+    public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+        int count = counter.value() + 1;
+        counter.update(count);
 
-            if (count % numberElements == 0) {
-                out.collect(Tuple2.of(value.f0, count));
-               counter.update(0); // reset to 0
-               }
-            }
+        if (count % numberElements == 0) {
+            out.collect(Tuple2.of(value.f0, count));
+            counter.update(0); // reset to 0
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            //all managed, nothing to do.
-        }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        // all managed, nothing to do.
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) 
throws Exception {
-            counter = context.getKeyedStateStore().getState(
-                new ValueStateDescriptor<>("counter", Integer.class, 0));
-        }
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        counter = context.getKeyedStateStore().getState(
+            new ValueStateDescriptor<>("counter", Integer.class, 0));
     }
+}
+{% endhighlight %}
 
 Notice that the `snapshotState()` method is empty as Flink itself takes care 
of snapshotting managed keyed state
 upon checkpointing.

Reply via email to