sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283961380
##########
File path: docs/dev/stream/testing.md
##########
@@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends
AbstractTestBase {
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with
BeforeAndAfter {
- @Test
- def testMultiply(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val flinkCluster = new MiniClusterWithClientResource(new
MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build)
- // configure your test environment
- env.setParallelism(1)
+ before {
+ flinkCluster.before()
+ }
- // values are collected in a static variable
- CollectSink.values.clear()
+ after {
+ flinkCluster.after()
+ }
- // create a stream of custom elements and apply transformations
- env
- .fromElements(1L, 21L, 22L)
- .map(new MultiplyByTwo())
- .addSink(new CollectSink())
- // execute
- env.execute()
+ "IncrementFlatMapFunction pipeline" should "incrementValues" in {
- // verify your results
- assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
- }
-}
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // configure your test environment
+ env.setParallelism(2)
+ // values are collected in a static variable
+ CollectSink.values.clear()
+
+ // create a stream of custom elements and apply transformations
+ env.fromElements(1, 21, 22)
+ .map(new IncrementMapFunction())
+ .addSink(new CollectSink())
+
+ // execute
+ env.execute()
+
+ // verify your results
+ CollectSink.values should contain allOf (1,22,23)
+ }
+}
// create a testing sink
class CollectSink extends SinkFunction[Long] {
- override def invoke(value: java.lang.Long): Unit = {
- synchronized {
- values.add(value)
- }
+ override def invoke(value: Long): Unit = {
+ synchronized {
+ CollectSink.values.add(value)
}
+ }
}
object CollectSink {
-
// must be static
- val values: List[Long] = new ArrayList()
+ val values: util.List[Long] = new util.ArrayList()
}
{% endhighlight %}
</div>
</div>
-The static variable in `CollectSink` is used here because Flink serializes all
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
-One way to test state handling is to enable checkpointing in integration
tests.
+* In order not to copy your whole pipeline code from production to test, make
sources and sinks pluggable in your production code and inject special test
sources and test sinks in your tests.
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-</div>
-</div>
+* The static variable in `CollectSink` is used here because Flink serializes
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary
directory with your test sink.
-And for example adding to your Flink application an identity mapper operator
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for
emitting watermarks.
-Another approach is to write a unit test using the Flink internal testing
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java`
module.
+* It is recommended to always test your pipelines locally with a parallelism >
1 in order to identify bugs which only surface, when the pipelines is executed
in parallel.
-For an example of how to do that please have a look at the
`org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest`
also in the `flink-streaming-java` module.
+* If you use `@Rule` instead of `@ClassRule` a new local Flink cluster will be
brought up for each individual test method increasing the overall execution
time of your tests.
Review comment:
```suggestion
* Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same
Flink cluster. Doing so saves a significant amount of time since the startup
and shutdown of Flink clusters usually dominate the execution time of the
actual tests.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services