[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514212#comment-16514212
 ] 

ASF GitHub Bot commented on FLINK-9563:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6170#discussion_r195826270
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---
    @@ -708,10 +724,32 @@ public boolean filter(Tuple2<Integer, String> rec) 
throws Exception {
                        }
                });
     
    -           result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +           CollectSink.VALUES.clear();
     
    -           expected = "(1,a)\n(3,a)";
    +           result.map(new MapFunction<Tuple2<Integer, String>, String>() {
    +                   @Override
    +                   public String map(Tuple2<Integer, String> value) throws 
Exception {
    +                           return value.toString();
    +                   }
    +           }).addSink(new CollectSink());
     
                env.execute();
    +
    +           CollectSink.VALUES.sort(String::compareTo);
    +
    +           List<String> expected = 
Arrays.asList("(1,a)\n(3,a)".split("\n"));
    +
    +           assertEquals(expected, CollectSink.VALUES);
        }
    +
    +   private static class CollectSink implements SinkFunction<String> {
    +
    +           public static final List<String> VALUES = new ArrayList<>();
    +
    +           @Override
    +           public synchronized void invoke(String value) throws Exception {
    --- End diff --
    
    this synchronization may not work correctly if multiple instances of this 
function exists. Synchronize on `VALUES` instead.


> Migrate integration tests for CEP
> ---------------------------------
>
>                 Key: FLINK-9563
>                 URL: https://issues.apache.org/jira/browse/FLINK-9563
>             Project: Flink
>          Issue Type: Sub-task
>            Reporter: Deepak Sharma
>            Assignee: Deepak Sharma
>            Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to