[ https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16197023#comment-16197023 ]
Ajay commented on FLINK-7782: ----------------------------- Related to FLINK-7606 > Flink CEP not recognizing pattern > --------------------------------- > > Key: FLINK-7782 > URL: https://issues.apache.org/jira/browse/FLINK-7782 > Project: Flink > Issue Type: Bug > Reporter: Ajay > > I am using flink version 1.3.2. Flink has a kafka source. I am using > KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM > running Ubuntu 16.04. From the flink dashboard, I see that I have 2 > Taskmanagers & 4 Task slots > What I observe is the following. The input to Kafka is a json string and when > parsed on the flink side, it looks like this > {code:java} > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > {code} > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism for > the execution environment is set to 4. I have a rather simple event that I am > trying to capture > {code:java} > DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> > cepMapByHomeId = cepMap.keyBy(0); > //cepMapByHomeId.print(); > > Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 = > > Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > PatternStream<Tuple8<Integer, Date, String, String, Float, Float, > Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1); > DataStream<Tuple7<Integer, Date, Date, String, String, Float, > Float>> alerts = patternStream.select(new PackageCapturedEvents()); > {code} > The pattern checks if the 7th field in the tuple8 goes over 12 and then over > 16. The output of the pattern is like this > {code:java} > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > {code} > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by home_id. > I have about 10 partitions in kafka. The producer just loops going through a > csv file with a delay of about 100 ms between 2 rows of the csv file. The > data is exactly the same for all 100 of the csv files except for home_id and > the lat & long information. The timestamp is incremented by a step of 1 sec. > I start multiple processes to simulate data form different homes. > THE PROBLEM: > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before and > only a tiny subset after. Since the data is exactly the same, I expect all > homeid to be captured and written to my sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)