I am using Flink 1.3.2 CEP pattern to detect a frequently occurring
condition. On scale testing this pattern with 10k events per minute, memory
leak happens finally OOM. 

I found a related JIRA FLINK-7606 where it mentioned to specifiying
EventTime as streamTimeCharacteristic. 

I have also configured the same, Also i am using RMQSource . Still i am
facing the memory leak, can you please let me know whether i am missing
anything. 

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<ApCheckInEvent> apCheckInReqStreams =
env.addSource(RMQSourceHelper.createAPCheckInSource(parameters))
                                .assignTimestampsAndWatermarks(new
IngestionTimeExtractor<ApCheckInEvent>()); 
DataStream<ApConnectEvent> apconnectReqStream =
env.addSource(RMQSourceHelper.createAPConnectSource(parameters))
                                .assignTimestampsAndWatermarks(new 
IngestionTimeExtractor<
ApConnectEvent>());


                DataStream<Event> apcheckInEventStream =  
apCheckInReqStreams.flatMap(new
APCheckInEventMapper());

                DataStream<Event> apconnectEventStream = 
apconnectReqStream.flatMap(new
APConnectCheckInEventMapper());

                DataStream<Event> unifiedDevicecheckInStream =
apcheckInEventStream.union(apconnectEventStream); 
                
                DataStream<DeviceCheckInEvent> deviceCheckInStream = 
unifiedDevicecheckInStream .flatMap(new FlatMapFunction<Event,
DeviceCheckInEvent>() {

                        @Override
                        public void flatMap(Event value, 
Collector<DeviceCheckInEvent> out)
throws Exception 
                        {
                                DeviceCheckInEvent deviceCheckInEvent = new 
DeviceCheckInEvent();
                                
deviceCheckInEvent.setEntityType(value.getDeviceType());
                                
deviceCheckInEvent.setDeviceMacAddress(value.getMac());
                                
deviceCheckInEvent.setEventType(value.getType());
                                
deviceCheckInEvent.setSerialNum(value.getSource());
                                
deviceCheckInEvent.setDeviceModel(value.getModel());
                        
deviceCheckInEvent.setStreamType(StreamType.getStreamType(value.getType()));
                                
deviceCheckInEvent.setEventTime(value.getTimeStamp());
                                
deviceCheckInEvent.setTenantId(value.getTenantId());
                                deviceCheckInEvent.setSiteid(value.getSiteId());
                                out.collect(deviceCheckInEvent);
                        }
                });


Below is my pattern detection code:

Pattern<DeviceCheckInEvent, ?> connectEventPattern1 =
Pattern.<DeviceCheckInEvent>begin("first")
                                                                                
.where(new IterativeCondition<DeviceCheckInEvent>() {
                                                                                
                        
                                                                                
                        @Override
                                                                                
                        public boolean filter(DeviceCheckInEvent value,
Context<DeviceCheckInEvent> ctx) throws Exception {
                                                                                
                                return value.getEventType() == 
EventType.ApConnect;  
                                                                                
                        }
                                                                                
                }).times(4).within(Time.minutes(6));

                DataStream<Event> frequentEventStream =  
CEP.pattern(inputStream.keyBy(new KeySelector<DeviceCheckInEvent, String>()
{

                        @Override
                        public String getKey(DeviceCheckInEvent arg0) throws 
Exception {
                                return arg0.getSerialNum();
                        }
                }), connectEventPattern1).flatSelect(new
PatternFlatSelectFunction<DeviceCheckInEvent, Event>() {

                        @Override
                        public void flatSelect(Map<String, 
List&lt;DeviceCheckInEvent>> pattern,
Collector<Event> out) throws Exception
                        {
                                List<DeviceCheckInEvent> connectOccurrences = 
pattern.get("first");

                                DeviceCheckInEvent freqConnEvent =
connectOccurrences.get(connectOccurrences.size()-1); 
                                Event event = new 
Event(EventType.FrequentConnects,
freqConnEvent.getEntityType(), freqConnEvent.getTenantId(), 
                                                freqConnEvent.getSerialNum(), 
freqConnEvent.getSiteid(),
freqConnEvent.getMac(), freqConnEvent.getEventTime());
                        
event.setEventDescr(EMEventMessage.frequentconnects.getMsgFormat(freqConnEvent.getEntityType(),
freqConnEvent.getSerialNum(), count, timeElapsed));
                                                                                
                                
                                logger.warn("Detected Frequent Connects:"+ 
builder.toString());  
                                out.collect(event);
                        }
                });



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

Reply via email to