sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283959906
##########
File path: docs/dev/stream/testing.md
##########
@@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends
AbstractTestBase {
<div data-lang="scala" markdown="1">
{% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with
BeforeAndAfter {
- @Test
- def testMultiply(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val flinkCluster = new MiniClusterWithClientResource(new
MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .build)
- // configure your test environment
- env.setParallelism(1)
+ before {
+ flinkCluster.before()
+ }
- // values are collected in a static variable
- CollectSink.values.clear()
+ after {
+ flinkCluster.after()
+ }
- // create a stream of custom elements and apply transformations
- env
- .fromElements(1L, 21L, 22L)
- .map(new MultiplyByTwo())
- .addSink(new CollectSink())
- // execute
- env.execute()
+ "IncrementFlatMapFunction pipeline" should "incrementValues" in {
- // verify your results
- assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
- }
-}
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ // configure your test environment
+ env.setParallelism(2)
+ // values are collected in a static variable
+ CollectSink.values.clear()
+
+ // create a stream of custom elements and apply transformations
+ env.fromElements(1, 21, 22)
+ .map(new IncrementMapFunction())
+ .addSink(new CollectSink())
+
+ // execute
+ env.execute()
+
+ // verify your results
+ CollectSink.values should contain allOf (1,22,23)
+ }
+}
// create a testing sink
class CollectSink extends SinkFunction[Long] {
- override def invoke(value: java.lang.Long): Unit = {
- synchronized {
- values.add(value)
- }
+ override def invoke(value: Long): Unit = {
+ synchronized {
+ CollectSink.values.add(value)
}
+ }
}
object CollectSink {
-
// must be static
- val values: List[Long] = new ArrayList()
+ val values: util.List[Long] = new util.ArrayList()
}
{% endhighlight %}
</div>
</div>
-The static variable in `CollectSink` is used here because Flink serializes all
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
-One way to test state handling is to enable checkpointing in integration
tests.
+* In order not to copy your whole pipeline code from production to test, make
sources and sinks pluggable in your production code and inject special test
sources and test sinks in your tests.
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-</div>
-</div>
+* The static variable in `CollectSink` is used here because Flink serializes
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary
directory with your test sink.
-And for example adding to your Flink application an identity mapper operator
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for
emitting watermarks.
-Another approach is to write a unit test using the Flink internal testing
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java`
module.
+* It is recommended to always test your pipelines locally with a parallelism >
1 in order to identify bugs which only surface, when the pipelines is executed
in parallel.
Review comment:
```suggestion
* It is recommended to always test your pipelines locally with a parallelism
> 1 to identify bugs which only surface for the pipelines executed in parallel.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services