I am using Flink 1.3.2 CEP pattern to detect a frequently occurring condition. On scale testing this pattern with 10k events per minute, memory leak happens finally OOM.
I found a related JIRA FLINK-7606 where it mentioned to specifiying EventTime as streamTimeCharacteristic. I have also configured the same, Also i am using RMQSource . Still i am facing the memory leak, can you please let me know whether i am missing anything. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<ApCheckInEvent> apCheckInReqStreams = env.addSource(RMQSourceHelper.createAPCheckInSource(parameters)) .assignTimestampsAndWatermarks(new IngestionTimeExtractor<ApCheckInEvent>()); DataStream<ApConnectEvent> apconnectReqStream = env.addSource(RMQSourceHelper.createAPConnectSource(parameters)) .assignTimestampsAndWatermarks(new IngestionTimeExtractor< ApConnectEvent>()); DataStream<Event> apcheckInEventStream = apCheckInReqStreams.flatMap(new APCheckInEventMapper()); DataStream<Event> apconnectEventStream = apconnectReqStream.flatMap(new APConnectCheckInEventMapper()); DataStream<Event> unifiedDevicecheckInStream = apcheckInEventStream.union(apconnectEventStream); DataStream<DeviceCheckInEvent> deviceCheckInStream = unifiedDevicecheckInStream .flatMap(new FlatMapFunction<Event, DeviceCheckInEvent>() { @Override public void flatMap(Event value, Collector<DeviceCheckInEvent> out) throws Exception { DeviceCheckInEvent deviceCheckInEvent = new DeviceCheckInEvent(); deviceCheckInEvent.setEntityType(value.getDeviceType()); deviceCheckInEvent.setDeviceMacAddress(value.getMac()); deviceCheckInEvent.setEventType(value.getType()); deviceCheckInEvent.setSerialNum(value.getSource()); deviceCheckInEvent.setDeviceModel(value.getModel()); deviceCheckInEvent.setStreamType(StreamType.getStreamType(value.getType())); deviceCheckInEvent.setEventTime(value.getTimeStamp()); deviceCheckInEvent.setTenantId(value.getTenantId()); deviceCheckInEvent.setSiteid(value.getSiteId()); out.collect(deviceCheckInEvent); } }); Below is my pattern detection code: Pattern<DeviceCheckInEvent, ?> connectEventPattern1 = Pattern.<DeviceCheckInEvent>begin("first") .where(new IterativeCondition<DeviceCheckInEvent>() { @Override public boolean filter(DeviceCheckInEvent value, Context<DeviceCheckInEvent> ctx) throws Exception { return value.getEventType() == EventType.ApConnect; } }).times(4).within(Time.minutes(6)); DataStream<Event> frequentEventStream = CEP.pattern(inputStream.keyBy(new KeySelector<DeviceCheckInEvent, String>() { @Override public String getKey(DeviceCheckInEvent arg0) throws Exception { return arg0.getSerialNum(); } }), connectEventPattern1).flatSelect(new PatternFlatSelectFunction<DeviceCheckInEvent, Event>() { @Override public void flatSelect(Map<String, List<DeviceCheckInEvent>> pattern, Collector<Event> out) throws Exception { List<DeviceCheckInEvent> connectOccurrences = pattern.get("first"); DeviceCheckInEvent freqConnEvent = connectOccurrences.get(connectOccurrences.size()-1); Event event = new Event(EventType.FrequentConnects, freqConnEvent.getEntityType(), freqConnEvent.getTenantId(), freqConnEvent.getSerialNum(), freqConnEvent.getSiteid(), freqConnEvent.getMac(), freqConnEvent.getEventTime()); event.setEventDescr(EMEventMessage.frequentconnects.getMsgFormat(freqConnEvent.getEntityType(), freqConnEvent.getSerialNum(), count, timeElapsed)); logger.warn("Detected Frequent Connects:"+ builder.toString()); out.collect(event); } }); -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/