[ 
https://issues.apache.org/jira/browse/FLINK-6297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006566#comment-16006566
 ] 

Kostas Kloudas edited comment on FLINK-6297 at 5/11/17 2:54 PM:
----------------------------------------------------------------

Hi [~vijayakumarpl]. Thanks for reporting this. 

I tried to reproduce the bug on both Flink 1.2 and 1.3 and I cannot make it.

Could you post here the code that generated this bug? This will help me pin 
down the source of the problem (if there is one).
This is the code that I run on 1.2

{code}
public static void main(String[] args) throws Exception {

                List<MyEvent> inputElements = new ArrayList<>();
                inputElements.add(new MyEvent(1, 'a', 1, 1));
                inputElements.add(new MyEvent(1, 'b', 1, 2));
                inputElements.add(new MyEvent(1, 'a', 2, 2));
                inputElements.add(new MyEvent(1, 'a', 3, 5));

                Pattern<MyEvent, ?> pattern = 
Pattern.<MyEvent>begin("a").where(new FilterFunction<MyEvent>() {
                        private static final long serialVersionUID = 
7219646616484327688L;

                        @Override
                        public boolean filter(MyEvent myEvent) throws Exception 
{
                                return myEvent.getPayload() == 'a';
                        }
                }).next("b").where(new FilterFunction<MyEvent>() {
                        private static final long serialVersionUID = 
7219646616484327688L;

                        @Override
                        public boolean filter(MyEvent myEvent) throws Exception 
{
                                return myEvent.getPayload() == 'b';
                        }
                }).within(Time.milliseconds(2L));

                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.getConfig().setAutoWatermarkInterval(20000);

                DataStream<MyEvent> input = 
env.fromCollection(inputElements).assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<MyEvent>() {
                        private static final long serialVersionUID = 
-6619787346214245526L;

                        @Override
                        public long extractAscendingTimestamp(MyEvent myEvent) {
                                return myEvent.getTimestamp();
                        }
                });

                PatternStream<MyEvent> patternStream = 
CEP.pattern(input.keyBy(new KeySelector<MyEvent, Long>() {
                        private static final long serialVersionUID = 
6928745840509494198L;

                        @Override
                        public Long getKey(MyEvent myEvent) throws Exception {
                                return myEvent.getId();
                        }
                }), pattern);


                patternStream.select(new PatternTimeoutFunction<MyEvent, 
String>() {
                        @Override
                        public String timeout(Map<String, MyEvent> map, long l) 
throws Exception {
                                return map.toString() +" @ "+ l;
                        }

                        private static final long serialVersionUID = 
300759199619789416L;


                }, new PatternSelectFunction<MyEvent, String>() {

                        @Override
                        public String select(Map<String, MyEvent> map) throws 
Exception {
                                return map.toString();
                        }

                        private static final long serialVersionUID = 
732172159423132724L;
                }).print();

                env.execute("Bug Reproducing Job");
        }
{code}



was (Author: kkl0u):
Hi [~vijayakumarpl]. Thanks for reporting this. 

I tried to reproduce the bug on both Flink 1.2 and 1.3 and I cannot make it.

Could you post here the code that generated this bug? This will help me pin 
down the source of the problem (if there is one).
This is the code that I run on 1.2

{code
public static void main(String[] args) throws Exception {

                List<MyEvent> inputElements = new ArrayList<>();
                inputElements.add(new MyEvent(1, 'a', 1, 1));
                inputElements.add(new MyEvent(1, 'b', 1, 2));
                inputElements.add(new MyEvent(1, 'a', 2, 2));
                inputElements.add(new MyEvent(1, 'a', 3, 5));

                Pattern<MyEvent, ?> pattern = 
Pattern.<MyEvent>begin("a").where(new FilterFunction<MyEvent>() {
                        private static final long serialVersionUID = 
7219646616484327688L;

                        @Override
                        public boolean filter(MyEvent myEvent) throws Exception 
{
                                return myEvent.getPayload() == 'a';
                        }
                }).next("b").where(new FilterFunction<MyEvent>() {
                        private static final long serialVersionUID = 
7219646616484327688L;

                        @Override
                        public boolean filter(MyEvent myEvent) throws Exception 
{
                                return myEvent.getPayload() == 'b';
                        }
                }).within(Time.milliseconds(2L));

                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.getConfig().setAutoWatermarkInterval(20000);

                DataStream<MyEvent> input = 
env.fromCollection(inputElements).assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<MyEvent>() {
                        private static final long serialVersionUID = 
-6619787346214245526L;

                        @Override
                        public long extractAscendingTimestamp(MyEvent myEvent) {
                                return myEvent.getTimestamp();
                        }
                });

                PatternStream<MyEvent> patternStream = 
CEP.pattern(input.keyBy(new KeySelector<MyEvent, Long>() {
                        private static final long serialVersionUID = 
6928745840509494198L;

                        @Override
                        public Long getKey(MyEvent myEvent) throws Exception {
                                return myEvent.getId();
                        }
                }), pattern);


                patternStream.select(new PatternTimeoutFunction<MyEvent, 
String>() {
                        @Override
                        public String timeout(Map<String, MyEvent> map, long l) 
throws Exception {
                                return map.toString() +" @ "+ l;
                        }

                        private static final long serialVersionUID = 
300759199619789416L;


                }, new PatternSelectFunction<MyEvent, String>() {

                        @Override
                        public String select(Map<String, MyEvent> map) throws 
Exception {
                                return map.toString();
                        }

                        private static final long serialVersionUID = 
732172159423132724L;
                }).print();

                env.execute("Bug Reproducing Job");
        }
/code}


> CEP timeout does not trigger under certain conditions
> -----------------------------------------------------
>
>                 Key: FLINK-6297
>                 URL: https://issues.apache.org/jira/browse/FLINK-6297
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.2.0
>            Reporter: Vijayakumar Palaniappan
>
> -TimeoutPattern does not trigger under certain conditions. Following are the 
> preconditions: 
> -Assume a pattern of Event A followed by Event B within 2 Seconds
> -PeriodicWaterMarks every 1 second
> -Assume following events have arrived. 
> -Event A-1[time: 1 sec]
> -Event B-1[time: 2 sec] 
> -Event A-2[time: 2 sec]
> -Event A-3[time: 5 sec] 
> -WaterMark[time: 5 sec]
> I would assume that after watermark arrival, Event A-1,B-1 detected. A-2 
> timed out. But A-2 timeout does not happen.
> if i use a punctuated watermark and generate watermark for every event, it 
> seems to work as expected.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to