Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4454#discussion_r131352677
  
    --- Diff: docs/dev/testing.md ---
    @@ -90,13 +121,69 @@ public class ExampleIntegrationTest extends 
StreamingMultipleProgramsTestBase {
             public static final List<Long> values = new ArrayList<>();
     
             @Override
    -        public void invoke(Long value) throws Exception {
    +        public synchronized void invoke(Long value) throws Exception {
                 values.add(value);
             }
         }
     }
     ~~~
     
    -Static variable in `CollectSink` is required because Flink serializes all 
operators before distributing them across a cluster.
    +or in Scala:
    +
    +~~~scala
    +class MultiplyByTwo extends MapFunction[Long, Long] {
    +  override def map(value: java.lang.Long): java.lang.Long = value * 2
    +}
    +~~~
    +
    +~~~scala
    +class ExampleIntegrationTest extends FlatSpec with Matchers {
    +    "MultiplyByTwo" should "multiply it input by two" in {
    +        val env: StreamExecutionEnvironment =
    +            StreamExecutionEnvironment.getExecutionEnvironment
    +        env.setParallelism(1)
    +        // values are collected on a static variable
    +        CollectSink.values.clear()
    +        env
    +            .fromElements(1L, 21L, 22L)
    +            .map(new MultiplyByTwo())
    +            .addSink(new CollectSink())
    +            env.execute()
    +        CollectSink.values should be (Lists.newArrayList(2L, 42L, 44L))
    +    }
    +}
    +
    +object CollectSink {
    +    // must be static
    +    val values: List[Long] = new ArrayList()
    +}
    +
    +class CollectSink extends SinkFunction[Long] {
    +    override def invoke(value: java.lang.Long): Unit = {
    +        synchronized {
    +            values.add(value)
    +        }
    +    }
    +}
    +~~~
    +
    +Static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
    +Communicating with operators instantiated by a local flink mini cluster 
via static variables is one way around this issue.
     Alternatively in your test sink you could for example write the data to 
files in a temporary directory.
     Of course you could use your own custom sources and sinks, which can emit 
watermarks.
    +
    +## Testing checkpointing and state handling
    +
    +One way to test state handling is to enable checkpointing in integration 
tests. You can do that by
    +configuring `environment` in the test:
    +~~~java
    +env.enableCheckpointing(500);
    +env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
    +~~~
    +and for example adding to your Flink application an identity mapper 
operator that will throw and exception
    --- End diff --
    
    "throw an exception" (typo)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to