sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283958118
##########
File path: docs/dev/stream/testing.md
##########
@@ -23,138 +23,387 @@ specific language governing permissions and limitations
under the License.
-->
-This page briefly discusses how to test a Flink application in your IDE or a
local environment.
+Testing is an integral part of every software development process. As such
Apache Flink comes with tooling to test your Apache Flink application code on
different levels of the testing pyramid.
* This will be replaced by the TOC
{:toc}
-## Unit testing
+## Testing User-Defined Functions
-Usually, one can assume that Flink produces correct results outside of a
user-defined `Function`. Therefore, it is recommended to test `Function`
classes that contain the main business logic with unit tests as much as
possible.
+Usually, one can assume that Flink produces correct results outside of a
user-defined function. Therefore, it is recommended to test these classes that
contain the main business logic with unit tests as much as possible.
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
+ public Long map(Long record) throws Exception {
+ return record +1 ;
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
- override def reduce(value1: java.lang.Long, value2: java.lang.Long):
java.lang.Long = {
- value1 + value2
+ override def map(record: java.lang.Long): java.lang.Long = {
+ record + 1
}
}
{% endhighlight %}
</div>
</div>
-It is very easy to unit test it with your favorite framework by passing
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing
framework by passing suitable arguments and verifying the output.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
@Test
- public void testSum() throws Exception {
+ public void testIncrement() throws Exception {
// instantiate your function
- SumReduce sumReduce = new SumReduce();
+ IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
- assertEquals(42L, sumReduce.reduce(40L, 2L));
+ assertEquals(3L, incrementer.map(2L));
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+ "IncrementMapFunction" should "increment values" in {
+ // instantiate your function
+ val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+ // call the methods that you have implemented
+ incremeter.map(2) should be (3)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or
`ProcessFunction`) can be easily tested by providing a mock object instead of a
real collector. A `FlatMapFunction` with the same functionality as the
`IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
- "SumReduce" should "add values" in {
+ @Test
+ public void testIncrement() throws Exception {
// instantiate your function
- val sumReduce: SumReduce = new SumReduce()
+ IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+ Collector<Integer> collector = mock(Collector.class);
// call the methods that you have implemented
- sumReduce.reduce(40L, 2L) should be (42L)
+ incrementer.flatMap(2L, collector)
+
+ //verify collector was called with the right output
+ Mockito.verify(collector, times(1)).collect(3L);
}
}
{% endhighlight %}
</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+ "IncrementFlatMapFunction" should "increment values" in {
+ // instantiate your function
+ val incrementer : IncrementFlatMapFunction = new
IncrementFlatMapFunction()
+
+ val collector = mock[Collector[Integer]]
+
+ //verify collector was called with the right output
+ (collector.collect _).expects(3)
+
+ // call the methods that you have implemented
+ flattenFunction.flatMap(2, collector)
+ }
+}
+{% endhighlight %}
+</div>
</div>
-## Integration testing
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of
managed state and/or timers is more difficult, because it involves testing the
interaction between the user code and Flink's runtime.
+For this Flink comes with a collection of so called test harnesses, which can
be used to test such user-defined functions as well as custom operators:
-In order to end-to-end test Flink streaming pipelines, you can also write
integration tests that are executed against a local Flink mini cluster.
+* `OneInputStreamOperatorTestHarness` (for operators on `DataStreams`s)
+* `KeyedOneInputStreamOperatorTestHarness` (for operators on `KeyedStream`s)
+* `TwoInputStreamOperatorTestHarness` (for operators of `ConnectedStreams` of
two `DataStream`s)
+* `KeyedTwoInputStreamOperatorTestHarness` (for operators on
`ConnectedStreams` of two `KeyedStream`s)
-In order to do so add the test dependency `flink-test-utils`:
+To use the test harnesses a set of additional dependencies (test scoped) is
needed.
{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
+ <scope>test</scope>
+</dependency>
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+</dependency>
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
</dependency>
{% endhighlight %}
-For example, if you want to test the following `MapFunction`:
+Now, the test harnesses can be used to push records and watermarks into your
user-defined functions or custom operators, control processing time and finally
assert on the output of the operator (including side outputs).
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class MultiplyByTwo implements MapFunction<Long, Long> {
+
+public class StatefulFlatMapTest {
+ private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
+ private StatefulFlatMap statefulFlatMapFunction;
+
+ @Before
+ public void setupTestHarness() throws Exception {
+
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new OneInputStreamOperatorTestHarness<>(new
StreamFlatMap<>(statefulFlatMapFunction));
+
+ // optionally configured the execution environment
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ @Test
+ public void testingStatefulFlatMapFunction() throws Exception {
+
+ //push (timestamped) elements into the operator (and hence user
defined function)
+ testHarness.processElement(2L, 100L);
+
+ //trigger event time timers by advancing the event time of the
operator with a watermark
+ testHarness.processWatermark(100L);
+
+ //trigger proccesign time timers by advancing the processing time of
the operator directly
+ testHarness.setProcessingTime(100L);
+
+ //retrieve list of emitted records for assertions
+ assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L))
+
+ //retrieve list of records emitted to a specific side output for
assertions (ProcessFunction only)
+ //assertThat(testHarness.getSideOutput(new
OutputTag<>("invalidRecords")), hasSize(0))
+ }
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapFunctionTest extends FlatSpec with Matchers with
BeforeAndAfter {
+
+ private var testHarness: OneInputStreamOperatorTestHarness[Long, Long] = null
+ private var statefulFlatMap: StatefulFlatMapFunction = null
+
+ before {
+ //instantiate user-defined function
+ statefulFlatMap = new StatefulFlatMap
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new OneInputStreamOperatorTestHarness[Long, Long](new
StreamFlatMap(statefulFlatMap))
+
+ // optionally configured the execution environment
+ testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ "StatefulFlatMap" should "do some fancy stuff with timers and state" in {
+
+
+ //push (timestamped) elements into the operator (and hence user defined
function)
+ testHarness.processElement(2, 100);
+
+ //trigger event time timers by advancing the event time of the operator
with a watermark
+ testHarness.processWatermark(100);
+
+ //trigger proccesign time timers by advancing the processing time of the
operator directly
+ testHarness.setProcessingTime(100);
+
+ //retrieve list of emitted records for assertions
+ testHarness.getOutput should contain (3)
+
+ //retrieve list of records emitted to a specific side output for
assertions (ProcessFunction only)
+ //testHarness.getSideOutput(new OutputTag[Int]("invalidRecords")) should
have size 0
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+`KeyedOneInputStreamOperatorTestHarness` and
`KeyedTwoInputStreamOperatorTestHarness` are instantiated by additionally
providing a `KeySelector` including `TypeInformation` for the class of the key.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+public class StatefulFlatMapFunctionTest {
+ private OneInputStreamOperatorTestHarness<String, Long, Long> testHarness;
+ private StatefulFlatMap statefulFlatMapFunction;
+
+ @Before
+ public void setupTestHarness() throws Exception {
+
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StatefulFlatMapFunction();
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new
StreamFlatMap<>(statefulFlatMapFunction), new MyStringKeySelector(),
Types.STRING);
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ //tests
+
+}
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class StatefulFlatMapTest extends FlatSpec with Matchers with BeforeAndAfter {
+
+ private var testHarness: OneInputStreamOperatorTestHarness[String, Long,
Long] = null
+ private var statefulFlatMapFunction: FlattenFunction = null
+
+ before {
+ //instantiate user-defined function
+ statefulFlatMapFunction = new StateFulFlatMap
+
+ // wrap user defined function into a the corresponding operator
+ testHarness = new KeyedOneInputStreamOperatorTestHarness(new
StreamFlatMap(statefulFlatMapFunction),new MyStringKeySelector(),
Types.STRING())
+
+ // open the test harness (will also call open() on RichFunctions)
+ testHarness.open();
+ }
+
+ //tests
+
+}
+{% endhighlight %}
+</div>
+</div>
+
+Many more examples for the usage of these test harnesses can be found in the
Flink code base, e.g.:
+
+* `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest`
is a good example for testing operators and user-defined functions, which
depend on processing or event time.
+*
`org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest`
shows how to test a custom sink with the `AbstractStreamOperatorTestHarness`.
Specifically, it uses `AbstractStreamOperatorTestHarness.snapshot` and
`AbstractStreamOperatorTestHarness.initializeState` to tests its interaction
with Flink's checkpointing mechanism.
+
+<span class="label label-info">Note</span> Be aware that
`AbstractStreamOperatorTestHarness` and its derived classes are currently not
part of the public API and can be subject to change.
+
+## Testing Flink Jobs
+
+### JUnit Rule `MiniClusterWithClientResource`
+
+In order to test complete Flink jobs, you can write integration tests that are
executed against a local, embedded Flink mini cluster. For this, Flink provides
a JUnit rule
+called `MiniClusterWithClientResource`.
+
+To use `MiniClusterWithClientResource` one additional dependency (test scoped)
is needed.
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+Let us take the same simple `MapFunction` as in the previous sections.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
- public Long map(Long value) throws Exception {
- return value * 2;
+ public Long map(Long record) throws Exception {
+ return record +1 ;
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class MultiplyByTwo extends MapFunction[Long, Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
- override def map(value: Long): Long = {
- value * 2
+ override def map(record: java.lang.Long): java.lang.Long = {
Review comment:
```suggestion
override def map(record: Long): Long = {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services