Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195826270
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec) 
throws Exception {
                        }
                });
     
    -           result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +           CollectSink.VALUES.clear();
     
    -           expected = "(1,a)\n(3,a)";
    +           result.map(new MapFunction<Tuple2<Integer, String>, String>() {
    +                   @Override
    +                   public String map(Tuple2<Integer, String> value) throws 
Exception {
    +                           return value.toString();
    +                   }
    +           }).addSink(new CollectSink());
     
                env.execute();
    +
    +           CollectSink.VALUES.sort(String::compareTo);
    +
    +           List<String> expected = 
Arrays.asList("(1,a)\n(3,a)".split("\n"));
    +
    +           assertEquals(expected, CollectSink.VALUES);
        }
    +
    +   private static class CollectSink implements SinkFunction<String> {
    +
    +           public static final List<String> VALUES = new ArrayList<>();
    +
    +           @Override
    +           public synchronized void invoke(String value) throws Exception {
    --- End diff --
    
    this synchronization may not work correctly if multiple instances of this 
function exists. Synchronize on `VALUES` instead.


---

Reply via email to