Thanks for your reply, Dawid. I understand that the approach I've tried out is not generic enough, and would need a lot more thought to be put into w.r.t parallelism considerations, out of order events, effects on downstream operators etc. The intention was to do a quick implementation to check the feasibility of the approach.
>> It will also not sort the events etc. In the application code to test this approach, I had used a Global window to sort events based on their timestamp (similar to how out of order events are dropped based on a time-bound, I'm dropping them based on a count based bound). allEvents = allEvents .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) .window(GlobalWindows.create()) .trigger(new GlobalWindowCountTrigger(propLoader.getSortWindowSize())) .process(new SortWindowProcessFunction()) .keyBy(event -> event.getAttributeValue(EVENT_SOURCE_ID)) .assignTimestampsAndWatermarks(new TimestampsExtractor()) .uid(Constants.TS_EX_UID); PatternLoader .applyPatterns(allEvents, propLoader.getPatternClassNames()) .addSink(createKafkaSink(kafkaProps)) .uid(Constants.KAFKA_SINK_COMPLEX_EVENTS_UID); >> If in the getCurrentWatermark method of your AssignerWithPeriodicWatermarks you will just return >> new Watermark(System.currentTimeMillis()), you will get the same behaviour as with that change, >> am I right? If watermarks are generated based on the machine time, the major issue I see is that we will not be able to leverage Event Time functionality. Specifically, if I have patterns which look for the absence of an Event for a fixed period of time. For eg. We have many such patterns: Pattern<Event, Event> pattern = Pattern.<Event>begin (UNDER_CHILLED_ALERT, AfterMatchSkipStrategy .skipPastLastEvent()) .where(Conditions.getUnderchilledCondition()) .notFollowedBy(COMPRESSOR_ON) .where(Conditions.getCompressorOnCondition()) .within(Time.minutes(30)) .followedBy(HIGH_TEMP) .where(Conditions.getHighTemperatureCondition()); Now when there are network issues (which are very frequent), queued events are delivered together, and such patterns will not be matched correctly as pruning of events from NFA's buffer will not be done based on the timestamp within the event, but on the watermark received by the operator. Is my understanding here correct? Thanks, Shailesh On Mon, Mar 19, 2018 at 3:17 PM, Dawid Wysakowicz < wysakowicz.da...@gmail.com> wrote: > Hi Shailesh, > > Thanks for your interest in the CEP library and sorry for late response. I > must say I am not fun of this approach. > After this change, the Processing time is no longer a processing time, > plus it will work differently in any other place of Flink. It will also not > sort the events etc. > Moreover I think you could achieve pretty similar solution if you generate > your watermark based on the machine time. If in the getCurrentWatermark > method > of your AssignerWithPeriodicWatermarks you will just return new > Watermark(System.currentTimeMillis()), you will get the same behaviour as > with that change, am I right? > > Best, > Dawid > > > On 18 Mar 2018, at 09:00, Shailesh Jain <shailesh.j...@stellapps.com> > wrote: > > > > Thanks Aljoscha. > > > > Bump. > > > > I understand everyone would be busy with 1.5.0, but would really > appreciate > > slight help in unblocking us here. > > > > Thanks, > > Shailesh > > > > On Thu, Mar 15, 2018 at 1:47 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> Hi, > >> > >> I think this should have been sent to the dev mailing list because in > the > >> user mailing list it might disappear among a lot of other mail. > >> > >> Forwarding... > >> > >> Best, > >> Aljoscha > >> > >>> On 14. Mar 2018, at 06:20, Shailesh Jain <shailesh.j...@stellapps.com> > >> wrote: > >>> > >>> Hi, > >>> > >>> We've been facing issues* w.r.t watermarks not supported per key, which > >> led us to: > >>> > >>> Either (a) run the job in Processing time for a KeyedStream -> > >> compromising on use cases which revolve around catching time-based > patterns > >>> or (b) run the job in Event time for multiple data streams (one data > >> stream per key) -> this is not scalable as the number of operators grow > >> linearly with the number of keys > >>> > >>> To address this, we've done a quick (poc) change in the > >> AbstractKeyedCEPPatternOperator to allow for the NFAs to progress based > >> on timestamps extracted from the events arriving into the operator (and > not > >> from the watermarks). We've tested it against our usecase and are > seeing a > >> significant improvement in memory usage without compromising on the > >> watermark functionality. > >>> > >>> It'll be really helpful if someone from the cep dev group can take a > >> look at this branch - https://github.com/jainshailesh/flink/commits/ > >> cep_changes <https://github.com/jainshailesh/flink/commits/cep_changes> > >> and provide comments on the approach taken, and maybe guide us on the > next > >> steps for taking it forward. > >>> > >>> Thanks, > >>> Shailesh > >>> > >>> * Links to previous email threads related to the same issue: > >>> http://apache-flink-user-mailing-list-archive.2336050. > >> n4.nabble.com/Question-on-event-time-functionality- > >> using-Flink-in-a-IoT-usecase-td18653.html <http://apache-flink-user- > >> mailing-list-archive.2336050.n4.nabble.com/Question-on- > >> event-time-functionality-using-Flink-in-a-IoT-usecase-td18653.html> > >>> http://apache-flink-user-mailing-list-archive.2336050. > >> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html > < > >> http://apache-flink-user-mailing-list-archive.2336050. > >> n4.nabble.com/Generate-watermarks-per-key-in-a-KeyedStream-td16629.html > > > >>> http://apache-flink-user-mailing-list-archive.2336050. > >> n4.nabble.com/Correlation-between-number-of-operators- > >> and-Job-manager-memory-requirements-td18384.html < > >> http://apache-flink-user-mailing-list-archive.2336050. > >> n4.nabble.com/Correlation-between-number-of-operators- > >> and-Job-manager-memory-requirements-td18384.html> > >>> > >> > >> > >