[
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514212#comment-16514212
]
ASF GitHub Bot commented on FLINK-9563:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6170#discussion_r195826270
--- Diff:
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
@@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec)
throws Exception {
}
});
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ CollectSink.VALUES.clear();
- expected = "(1,a)\n(3,a)";
+ result.map(new MapFunction<Tuple2<Integer, String>, String>() {
+ @Override
+ public String map(Tuple2<Integer, String> value) throws
Exception {
+ return value.toString();
+ }
+ }).addSink(new CollectSink());
env.execute();
+
+ CollectSink.VALUES.sort(String::compareTo);
+
+ List<String> expected =
Arrays.asList("(1,a)\n(3,a)".split("\n"));
+
+ assertEquals(expected, CollectSink.VALUES);
}
+
+ private static class CollectSink implements SinkFunction<String> {
+
+ public static final List<String> VALUES = new ArrayList<>();
+
+ @Override
+ public synchronized void invoke(String value) throws Exception {
--- End diff --
this synchronization may not work correctly if multiple instances of this
function exists. Synchronize on `VALUES` instead.
> Migrate integration tests for CEP
> ---------------------------------
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
> Issue Type: Sub-task
> Reporter: Deepak Sharma
> Assignee: Deepak Sharma
> Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)