[ 
https://issues.apache.org/jira/browse/FLINK-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16029303#comment-16029303
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6772:
--------------------------------------------

So, for example,

{code}
public class FlinkCEPTest {

        public static void main(String[] args) throws Exception {

                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(2);

                DataStream<String> input = env.fromElements(
                                "a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"
                );

                Pattern<String, ?> pattern = Pattern
                                .<String>begin("start")
                                .where(new SimpleCondition<String>() {
                                        public boolean filter(String s) throws 
Exception {
                                                return s.startsWith("a-");
                                        }
                                }).times(4).allowCombinations()
                                .followedByAny("randomStateName")
                                .where(new SimpleCondition<String>() {
                                        public boolean filter(String s) throws 
Exception {
                                                return s.startsWith("b-");
                                        }
                                }).times(3).consecutive();

                CEP.pattern(input, pattern).select(new 
PatternSelectFunction<String, String>() {
                        public String select(Map<String, List<String>> pattern) 
throws Exception {
                                return pattern.toString();
                        }
                }).print();

                env.execute();
        }
}
{code}

The result is:
{code}
1> {randomStateName=[b-3, b-1, b-2], start=[a-1, a-2, a-3, a-4]}
{code}

> Incorrect ordering of matched state events in Flink CEP
> -------------------------------------------------------
>
>                 Key: FLINK-6772
>                 URL: https://issues.apache.org/jira/browse/FLINK-6772
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>            Reporter: Tzu-Li (Gordon) Tai
>
> I've stumbled across an unexepected ordering of the matched state events. 
> Pattern:
> {code}
> Pattern<String, ?> pattern = Pattern
>     .<String>begin("start")
>         .where(new IterativeCondition<String>() {
>             @Override
>             public boolean filter(String s, Context<String> context) throws 
> Exception {
>                 return s.startsWith("a-");
>             }
>         }).times(4).allowCombinations()
>     .followedByAny("end")
>         .where(new IterativeCondition<String>() {
>             public boolean filter(String s, Context<String> context) throws 
> Exception {
>                 return s.startsWith("b-");
>             }
>     }).times(3).consecutive();
> {code}
> Input event sequence:
> a-1, a-2, a-3, a-4, b-1, b-2, b-3
> On b-3 a matched pattern would be triggered.
> Now, in the {{Map<String, List<IN>>}} map passed via {{select}} in 
> {{PatternSelectFunction}}, the list for the "end" state is:
> b-3, b-1, b-2.
> Based on the timestamp of the events (simply using processing time), the 
> correct order should be b-1, b-2, b-3.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to