[
https://issues.apache.org/jira/browse/FLINK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Paolo Rendano resolved FLINK-7549.
----------------------------------
Resolution: Fixed
> CEP - Pattern not discovered if source streaming is very fast
> -------------------------------------------------------------
>
> Key: FLINK-7549
> URL: https://issues.apache.org/jira/browse/FLINK-7549
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.1, 1.3.2
> Reporter: Paolo Rendano
>
> Hi all,
> I'm doing some stress test on my pattern using JMeter to populate source data
> on a rabbitmq queue. This queue contains status generated by different
> devices . In my test case I set to loop on a base of 1000 cycles, each one
> sending respectively the first and the second status that generate the event
> using flink CEP (status keyed by device). I expect to get an output of 1000
> events.
> In my early tests I launched that but I noticed that I get only partial
> results in output (70/80% of the expected ones). Introducing a delay in
> jmeter plan between the sending of the two status solved the problem. The
> minimum delay (of course this is on my local machine, on other machines may
> vary) that make things work is 20/25 ms.
> My code is structured this way (the following is a semplification):
> {code:java}
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().setAutoWatermarkInterval(100L);
> // source definition
> DataStream<MyMessageWrapper> dataStreamSource =
> env.addSource(new
> MYRMQAutoboundQueueSource<>(connectionConfig,
> conf.getSourceExchange(),
> conf.getSourceRoutingKey(),
> conf.getSourceQueueName(),
> true,
> new MyMessageWrapperSchema()))
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<MyMessageWrapper>(Time.minutes(1)) {
> private static final long serialVersionUID =
> -1L;
> @Override
> public long extractTimestamp(MyMessageWrapper
> element) {
> if
> (element.getData().get("stateTimestamp")==null) {
> throw new RuntimeException("Status
> Timestamp is null during time ordering for device [" +
> element.getData().get("deviceCode") + "]");
> }
> return
> FlinkUtils.getTimeFromJsonTimestamp(element.getData().get("stateTimestamp")).getTime();
> }
> })
> .name("MyIncomingStatus");
> // PATTERN DEFINITION
> Pattern<MyMessageWrapper, ?> myPattern = Pattern
> .<MyMessageWrapper>begin("start")
> .subtype(MyMessageWrapper.class)
> .where(whereEquals("st", "none"))
> .next("end")
> .subtype(MyMessageWrapper.class)
> .where(whereEquals("st","started"))
> .within(Time.minutes(3));
> // CEP DEFINITION
> PatternStream<MyMessageWrapper> myPatternStream =
> CEP.pattern(dataStreamSource.keyBy(keySelector), myPattern);
> DataStream<Either<TimeoutEvent, MyMessageWrapper >> outputStream =
> myPatternStream.flatSelect(patternFlatTimeoutFunction,
> patternFlatSelectFunction);
> // SINK DEFINITION
> outputStream.addSink(new MYRMQExchangeSink<>(connectionConfig,
> outputExchange, new MyMessageWrapperSchema())).name("MyGeneratedEvent");
> {code}
> digging and logging messages received by flink in "extractTimestamp", what
> happens is that with that so high rate of messages, source may receive
> messages with the same timestamp but with different deviceCode.
> Any idea?
> Thanks, regards
> Paolo
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)