Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6170#discussion_r195826270 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java --- @@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec) throws Exception { } }); - result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + CollectSink.VALUES.clear(); - expected = "(1,a)\n(3,a)"; + result.map(new MapFunction<Tuple2<Integer, String>, String>() { + @Override + public String map(Tuple2<Integer, String> value) throws Exception { + return value.toString(); + } + }).addSink(new CollectSink()); env.execute(); + + CollectSink.VALUES.sort(String::compareTo); + + List<String> expected = Arrays.asList("(1,a)\n(3,a)".split("\n")); + + assertEquals(expected, CollectSink.VALUES); } + + private static class CollectSink implements SinkFunction<String> { + + public static final List<String> VALUES = new ArrayList<>(); + + @Override + public synchronized void invoke(String value) throws Exception { --- End diff -- this synchronization may not work correctly if multiple instances of this function exists. Synchronize on `VALUES` instead.
---