Repository: flink
Updated Branches:
  refs/heads/master 1e2a63874 -> 5ce0ded1b


[hotfix] [docs] Fix typos and improve testing docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ce0ded1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ce0ded1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ce0ded1

Branch: refs/heads/master
Commit: 5ce0ded1b71faf7e5d90865ba9481e254c3fb5be
Parents: 00fc641
Author: twalthr <twal...@apache.org>
Authored: Tue Aug 29 17:19:50 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Aug 29 17:21:31 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/testing.md | 263 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 263 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ce0ded1/docs/dev/stream/testing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/testing.md b/docs/dev/stream/testing.md
new file mode 100644
index 0000000..44f5cfd
--- /dev/null
+++ b/docs/dev/stream/testing.md
@@ -0,0 +1,263 @@
+---
+title: "Testing"
+nav-parent_id: streaming
+nav-id: testing
+nav-pos: 99
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+
+* This will be replaced by the TOC
+{:toc}
+
+## Unit testing
+
+Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+
+For example if one implements the following `ReduceFunction`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SumReduce implements ReduceFunction<Long> {
+
+    @Override
+    public Long reduce(Long value1, Long value2) throws Exception {
+        return value1 + value2;
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SumReduce extends ReduceFunction[Long] {
+
+    override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
+        value1 + value2
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class SumReduceTest {
+
+    @Test
+    public void testSum() throws Exception {
+        // intiantiate your function
+        SumReduce sumReduce = new SumReduce();
+
+        // call the methods that you have implemented
+        assertEquals(42L, sumReduce.reduce(40L, 2L));
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class SumReduceTest extends FlatSpec with Matchers {
+
+    "SumReduce" should "add values" in {
+        // intiantiate your function
+        val sumReduce: SumReduce = new SumReduce()
+
+        // call the methods that you have implemented
+        sumReduce.reduce(40L, 2L) should be (42L)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+## Integration testing
+
+In order to end-to-end test Flink streaming pipelines, you can also write 
integration tests that are executed against a local Flink mini cluster.
+
+In order to do so add the test dependency `flink-test-utils`:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+For example, if you want to test the following `MapFunction`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class MultiplyByTwo implements MapFunction<Long, Long> {
+
+    @Override
+    public Long map(Long value) throws Exception {
+        return value * 2;
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class MultiplyByTwo extends MapFunction[Long, Long] {
+
+    override def map(value: Long): Long = {
+        value * 2
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+You could write the following integration test:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+    @Test
+    public void testMultiply() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // configure your test environment
+        env.setParallelism(1);
+
+        // values are collected in a static variable
+        CollectSink.values.clear();
+
+        // create a stream of custom elements and apply transformations
+        env.fromElements(1L, 21L, 22L)
+                .map(new MultiplyByTwo())
+                .addSink(new CollectSink());
+
+        // execute
+        env.execute();
+
+        // verify your results
+        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values);
+    }
+
+    // create a testing sink
+    private static class CollectSink implements SinkFunction<Long> {
+
+        // must be static
+        public static final List<Long> values = new ArrayList<>();
+
+        @Override
+        public synchronized void invoke(Long value) throws Exception {
+            values.add(value);
+        }
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class ExampleIntegrationTest extends StreamingMultipleProgramsTestBase {
+
+    @Test
+    def testMultiply(): Unit = {
+        val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+        // configure your test environment
+        env.setParallelism(1)
+
+        // values are collected in a static variable
+        CollectSink.values.clear()
+
+        // create a stream of custom elements and apply transformations
+        env
+            .fromElements(1L, 21L, 22L)
+            .map(new MultiplyByTwo())
+            .addSink(new CollectSink())
+
+        // execute
+        env.execute()
+
+        // verify your results
+        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
+    }
+}    
+
+// create a testing sink
+class CollectSink extends SinkFunction[Long] {
+
+    override def invoke(value: java.lang.Long): Unit = {
+        synchronized {
+            values.add(value)
+        }
+    }
+}
+
+object CollectSink {
+
+    // must be static
+    val values: List[Long] = new ArrayList()
+}
+{% endhighlight %}
+</div>
+</div>
+
+The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
+You can also implement your own custom sources for emitting watermarks.
+
+## Testing checkpointing and state handling
+
+One way to test state handling is to enable checkpointing in integration 
tests. 
+
+You can do that by configuring your `StreamExecutionEnvironment` in the test:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+env.enableCheckpointing(500);
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
+{% endhighlight %}
+</div>
+</div>
+
+And for example adding to your Flink application an identity mapper operator 
that will throw an exception
+once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+
+Another approach is to write a unit test using the Flink internal testing 
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` 
module.
+
+For an example of how to do that please have a look at the 
`org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` 
also in the `flink-streaming-java` module.
+
+Be aware that `AbstractStreamOperatorTestHarness` is currently not a part of 
public API and can be subject to change.

Reply via email to