sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283958118
 
 

 ##########
 File path: docs/dev/stream/testing.md
 ##########
 @@ -23,138 +23,387 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-This page briefly discusses how to test a Flink application in your IDE or a 
local environment.
+Testing is an integral part of every software development process. As such 
Apache Flink comes with tooling to test your Apache Flink application code on 
different levels of the testing pyramid.
 
 * This will be replaced by the TOC
 {:toc}
 
-## Unit testing
+## Testing User-Defined Functions
 
-Usually, one can assume that Flink produces correct results outside of a 
user-defined `Function`. Therefore, it is recommended to test `Function` 
classes that contain the main business logic with unit tests as much as 
possible.
+Usually, one can assume that Flink produces correct results outside of a 
user-defined function. Therefore, it is recommended to test these classes that 
contain the main business logic with unit tests as much as possible.
 
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long reduce(Long value1, Long value2) throws Exception {
-        return value1 + value2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def reduce(value1: java.lang.Long, value2: java.lang.Long): 
java.lang.Long = {
-        value1 + value2
+    override def map(record: java.lang.Long): java.lang.Long = {
+        record + 1
     }
 }
 {% endhighlight %}
 </div>
 </div>
 
-It is very easy to unit test it with your favorite framework by passing 
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing 
framework by passing suitable arguments and verifying the output.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
 
     @Test
-    public void testSum() throws Exception {
+    public void testIncrement() throws Exception {
         // instantiate your function
-        SumReduce sumReduce = new SumReduce();
+        IncrementMapFunction incrementer = new IncrementMapFunction();
 
         // call the methods that you have implemented
-        assertEquals(42L, sumReduce.reduce(40L, 2L));
+        assertEquals(3L, incrementer.map(2L));
     }
 }
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+    "IncrementMapFunction" should "increment values" in {
+        // instantiate your function
+        val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+        // call the methods that you have implemented
+        incremeter.map(2) should be (3)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an 
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or 
`ProcessFunction`) can be easily tested by providing a mock object instead of a 
real collector.  A `FlatMapFunction` with the same functionality as the 
`IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
 
-    "SumReduce" should "add values" in {
+    @Test
+    public void testIncrement() throws Exception {
         // instantiate your function
-        val sumReduce: SumReduce = new SumReduce()
+        IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+        Collector<Integer> collector = mock(Collector.class);
 
         // call the methods that you have implemented
-        sumReduce.reduce(40L, 2L) should be (42L)
+        incrementer.flatMap(2L, collector)
+
+        //verify collector was called with the right output
+        Mockito.verify(collector, times(1)).collect(3L);
     }
 }
 {% endhighlight %}
 </div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+    "IncrementFlatMapFunction" should "increment values" in {
+       // instantiate your function
+      val incrementer : IncrementFlatMapFunction = new 
IncrementFlatMapFunction()
+
+      val collector = mock[Collector[Integer]]
+
+      //verify collector was called with the right output
+      (collector.collect _).expects(3)
+
+      // call the methods that you have implemented
+      flattenFunction.flatMap(2, collector)
+  }
+}
+{% endhighlight %}
+</div>
 </div>
 
-## Integration testing
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of 
managed state and/or timers is more difficult, because it involves testing the 
interaction between the user code and Flink's runtime.
+For this Flink comes with a collection of so called test harnesses, which can 
be used to test such user-defined functions as well as custom operators:
 
-In order to end-to-end test Flink streaming pipelines, you can also write 
integration tests that are executed against a local Flink mini cluster.
+* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
+* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
+* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of 
two `DataStream`s)
+* `KeyedTwoInputStreamOperatorTestHarness` (for operators on 
`ConnectedStreams` of two `KeyedStream`s)
 
-In order to do so add the test dependency `flink-test-utils`:
+To use the test harnesses a set of additional dependencies (test scoped) is 
needed.
 
 {% highlight xml %}
 <dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
   <version>{{site.version }}</version>
+  <scope>test</scope>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+  <scope>test</scope>
+  <classifier>tests</classifier>
 </dependency>
 {% endhighlight %}
 
-For example, if you want to test the following `MapFunction`:
+Now, the test harnesses can be used to push records and watermarks into your 
user-defined functions or custom operators, control processing time and finally 
assert on the output of the operator (including side outputs).
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-public class MultiplyByTwo implements MapFunction<Long, Long> {
+
+public class StatefulFlatMapTest {
+    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new OneInputStreamOperatorTestHarness<>(new 
StreamFlatMap<>(statefulFlatMapFunction));
+
+        // optionally configured the execution environment
+        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
+    }
+
+    @Test
+    public void testingStatefulFlatMapFunction() throws Exception {
+
+        //push (timestamped) elements into the operator (and hence user 
defined function)
+        testHarness.processElement(2L, 100L);
+
+        //trigger event time timers by advancing the event time of the 
operator with a watermark
+        testHarness.processWatermark(100L);
+
+        //trigger proccesign time timers by advancing the processing time of 
the operator directly
+        testHarness.setProcessingTime(100L);
+
+        //retrieve list of emitted records for assertions
+        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))
+
+        //retrieve list of records emitted to a specific side output for 
assertions (ProcessFunction only)
+        //assertThat(testHarness.getSideOutput(new 
OutputTag<>("invalidRecords")), hasSize(0))
+    }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with 
BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
+  private var statefulFlatMap: StatefulFlatMapFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMap = new StatefulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new 
StreamFlatMap(statefulFlatMap))
+
+    // optionally configured the execution environment
+    testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
+
+
+    //push (timestamped) elements into the operator (and hence user defined 
function)
+    testHarness.processElement(2, 100);
+
+    //trigger event time timers by advancing the event time of the operator 
with a watermark
+    testHarness.processWatermark(100);
+
+    //trigger proccesign time timers by advancing the processing time of the 
operator directly
+    testHarness.setProcessingTime(100);
+
+    //retrieve list of emitted records for assertions
+    testHarness.getOutput should contain (3)
+
+    //retrieve list of records emitted to a specific side output for 
assertions (ProcessFunction only)
+    //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should 
have size 0
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedOneInputStreamOperatorTestHarness` and 
`KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally 
providing a `KeySelector` including `TypeInformation` for the class of the key.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapFunctionTest {
+    private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+    private StatefulFlatMap statefulFlatMapFunction;
+
+    @Before
+    public void setupTestHarness() throws Exception {
+
+        //instantiate user-defined function
+        statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+        // wrap user defined function into a the corresponding operator
+        testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new 
StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(), 
Types.STRING);
+
+        // open the test harness (will also call open() on RichFunctions)
+        testHarness.open();
+    }
+
+    //tests
+
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+  private var testHarness: OneInputStreamOperatorTestHarness[String, Long, 
Long] = null
+  private var statefulFlatMapFunction: FlattenFunction = null
+
+  before {
+    //instantiate user-defined function
+    statefulFlatMapFunction = new StateFulFlatMap
+
+    // wrap user defined function into a the corresponding operator
+    testHarness = new KeyedOneInputStreamOperatorTestHarness(new 
StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(), 
Types.STRING())
+
+    // open the test harness (will also call open() on RichFunctions)
+    testHarness.open();
+  }
+
+  //tests
+
+}
+{% endhighlight %}
+</div>
+</div>
+
+Many more examples for the usage of these test harnesses can be found in the 
Flink code base, e.g.:
+
+* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` 
is a good example for testing operators and user-defined functions, which 
depend on processing or event time.
+* 
`org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest`
 shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`. 
Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and 
`AbstractStreamOperatorTestHarness.initializeState` to tests its interaction 
with Flink's checkpointing mechanism.
+
+<span class="label label-info">Note</span> Be aware that 
`AbstractStreamOperatorTestHarness` and its derived classes are currently not 
part of the public API and can be subject to change.
+
+## Testing Flink Jobs
+
+### JUnit Rule `MiniClusterWithClientResource`
+
+In order to test complete Flink jobs, you can write integration tests that are 
executed against a local, embedded Flink mini cluster. For this, Flink provides 
a JUnit rule
+called `MiniClusterWithClientResource`.
+
+To use `MiniClusterWithClientResource` one additional dependency (test scoped) 
is needed.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Let us take the same simple `MapFunction` as in the previous sections.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementMapFunction implements MapFunction<Long, Long> {
 
     @Override
-    public Long map(Long value) throws Exception {
-        return value * 2;
+    public Long map(Long record) throws Exception {
+        return record +1 ;
     }
 }
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class MultiplyByTwo extends MapFunction[Long, Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
 
-    override def map(value: Long): Long = {
-        value * 2
+    override def map(record: java.lang.Long): java.lang.Long = {
 
 Review comment:
   ```suggestion
       override def map(record: Long): Long = {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to