sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283954075
##########
File path: docs/dev/stream/testing.md
##########
@@ -23,138 +23,387 @@ specific language governing permissions and limitations
under the License.
-->
-This page briefly discusses how to test a Flink application in your IDE or a
local environment.
+Testing is an integral part of every software development process. As such
Apache Flink comes with tooling to test your Apache Flink application code on
different levels of the testing pyramid.
* This will be replaced by the TOC
{:toc}
-## Unit testing
+## Testing User-Defined Functions
-Usually, one can assume that Flink produces correct results outside of a
user-defined `Function`. Therefore, it is recommended to test `Function`
classes that contain the main business logic with unit tests as much as
possible.
+Usually, one can assume that Flink produces correct results outside of a
user-defined function. Therefore, it is recommended to test these classes that
contain the main business logic with unit tests as much as possible.
-For example if one implements the following `ReduceFunction`:
+### Unit Testing Stateless, Timeless UDFs
+
+
+For example, let's take the following stateless `MapFunction`.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduce implements ReduceFunction<Long> {
+public class IncrementMapFunction implements MapFunction<Long, Long> {
@Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
+ public Long map(Long record) throws Exception {
+ return record +1 ;
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduce extends ReduceFunction[Long] {
+class IncrementMapFunction extends MapFunction[Long, Long] {
- override def reduce(value1: java.lang.Long, value2: java.lang.Long):
java.lang.Long = {
- value1 + value2
+ override def map(record: java.lang.Long): java.lang.Long = {
+ record + 1
}
}
{% endhighlight %}
</div>
</div>
-It is very easy to unit test it with your favorite framework by passing
suitable arguments and verify the output:
+It is very easy to unit test such a function with your favorite testing
framework by passing suitable arguments and verifying the output.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-public class SumReduceTest {
+public class IncrementMapFunctionTest {
@Test
- public void testSum() throws Exception {
+ public void testIncrement() throws Exception {
// instantiate your function
- SumReduce sumReduce = new SumReduce();
+ IncrementMapFunction incrementer = new IncrementMapFunction();
// call the methods that you have implemented
- assertEquals(42L, sumReduce.reduce(40L, 2L));
+ assertEquals(3L, incrementer.map(2L));
}
}
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class SumReduceTest extends FlatSpec with Matchers {
+class IncrementMapFunctionTest extends FlatSpec with Matchers {
+
+ "IncrementMapFunction" should "increment values" in {
+ // instantiate your function
+ val incrementer: IncrementMapFunction = new IncrementMapFunction()
+
+ // call the methods that you have implemented
+ incremeter.map(2) should be (3)
+ }
+}
+{% endhighlight %}
+</div>
+</div>
+
+Similarly, a user-defined function which uses an
`org.apache.flink.util.Collector` (e.g. a `FlatMapFunction` or
`ProcessFunction`) can be easily tested by providing a mock object instead of a
real collector. A `FlatMapFunction` with the same functionality as the
`IncrementMapFunction` could be unit tested as follows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class IncrementFlatMapFunctionTest {
- "SumReduce" should "add values" in {
+ @Test
+ public void testIncrement() throws Exception {
// instantiate your function
- val sumReduce: SumReduce = new SumReduce()
+ IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
+
+ Collector<Integer> collector = mock(Collector.class);
// call the methods that you have implemented
- sumReduce.reduce(40L, 2L) should be (42L)
+ incrementer.flatMap(2L, collector)
+
+ //verify collector was called with the right output
+ Mockito.verify(collector, times(1)).collect(3L);
}
}
{% endhighlight %}
</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class IncrementFlatMapFunctionTest extends FlatSpec with MockFactory {
+
+ "IncrementFlatMapFunction" should "increment values" in {
+ // instantiate your function
+ val incrementer : IncrementFlatMapFunction = new
IncrementFlatMapFunction()
+
+ val collector = mock[Collector[Integer]]
+
+ //verify collector was called with the right output
+ (collector.collect _).expects(3)
+
+ // call the methods that you have implemented
+ flattenFunction.flatMap(2, collector)
+ }
+}
+{% endhighlight %}
+</div>
</div>
-## Integration testing
+### Unit Testing Stateful or Timely UDFs & Custom Operators
+
+Testing the functionality of a user-defined function, which makes use of
managed state and/or timers is more difficult, because it involves testing the
interaction between the user code and Flink's runtime.
Review comment:
Unnecessary comma.
```suggestion
Testing the functionality of a user-defined function, which makes use of
managed state or timers is more difficult because it involves testing the
interaction between the user code and Flink's runtime.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services