[ 
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16192886#comment-16192886
 ] 

Paolo Rendano commented on FLINK-7549:
--------------------------------------

Hi [~kkl0u] yes, I solved setting:

{code:java}
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
{code}

Thanks

> CEP - Pattern not discovered if source streaming is very fast
> -------------------------------------------------------------
>
>                 Key: FLINK-7549
>                 URL: https://issues.apache.org/jira/browse/FLINK-7549
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data 
> on a rabbitmq queue. This queue contains status generated by different 
> devices . In my test case I set to loop on a base of 1000 cycles, each one 
> sending respectively the first and the second status that generate the event 
> using flink CEP (status keyed by device). I expect to get an output of 1000 
> events.
> In my early tests I launched that but I noticed that I get only partial 
> results in output (70/80% of the expected ones). Introducing a delay in 
> jmeter plan between the sending of the two status solved the problem. The 
> minimum delay (of course this is on my local machine, on other machines may 
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream<MyMessageWrapper> dataStreamSource =
>                     env.addSource(new 
> MYRMQAutoboundQueueSource<>(connectionConfig,
>                             conf.getSourceExchange(),
>                             conf.getSourceRoutingKey(),
>                             conf.getSourceQueueName(),
>                             true,
>                             new MyMessageWrapperSchema()))
>                             .assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
>                                 private static final long serialVersionUID = 
> -1L;
>                                 @Override
>                                 public long extractTimestamp(MyMessageWrapper 
> element) {
>                                     if 
> (element.getData().get("stateTimestamp")==null) {
>                                         throw new RuntimeException("Status 
> Timestamp is null during time ordering for device [" +  
> element.getData().get("deviceCode") + "]");
>                                     }
>                                     return 
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
>                                 }
>                             })
>                             .name("MyIncomingStatus");
> // PATTERN  DEFINITION
> Pattern<MyMessageWrapper, ?> myPattern = Pattern
>                 .<MyMessageWrapper>begin("start")
>                       .subtype(MyMessageWrapper.class)
>                       .where(whereEquals("st", "none"))
>                 .next("end")
>                       .subtype(MyMessageWrapper.class)
>                       .where(whereEquals("st","started"))
>                 .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream<MyMessageWrapper> myPatternStream = 
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream = 
> myPatternStream.flatSelect(patternFlatTimeoutFunction, 
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig, 
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what 
> happens is that with that so high rate of messages, source may receive 
> messages with the same timestamp but with different deviceCode. 
> Any idea?
> Thanks, regards
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to