I am using Flink 1.3.2 CEP pattern to detect a frequently occurring
condition. On scale testing this pattern with 10k events per minute, memory
leak happens finally OOM.
I found a related JIRA FLINK-7606 where it mentioned to specifiying
EventTime as streamTimeCharacteristic.
I have also configured the same, Also i am using RMQSource . Still i am
facing the memory leak, can you please let me know whether i am missing
anything.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<ApCheckInEvent> apCheckInReqStreams =
env.addSource(RMQSourceHelper.createAPCheckInSource(parameters))
.assignTimestampsAndWatermarks(new
IngestionTimeExtractor<ApCheckInEvent>());
DataStream<ApConnectEvent> apconnectReqStream =
env.addSource(RMQSourceHelper.createAPConnectSource(parameters))
.assignTimestampsAndWatermarks(new
IngestionTimeExtractor<
ApConnectEvent>());
DataStream<Event> apcheckInEventStream =
apCheckInReqStreams.flatMap(new
APCheckInEventMapper());
DataStream<Event> apconnectEventStream =
apconnectReqStream.flatMap(new
APConnectCheckInEventMapper());
DataStream<Event> unifiedDevicecheckInStream =
apcheckInEventStream.union(apconnectEventStream);
DataStream<DeviceCheckInEvent> deviceCheckInStream =
unifiedDevicecheckInStream .flatMap(new FlatMapFunction<Event,
DeviceCheckInEvent>() {
@Override
public void flatMap(Event value,
Collector<DeviceCheckInEvent> out)
throws Exception
{
DeviceCheckInEvent deviceCheckInEvent = new
DeviceCheckInEvent();
deviceCheckInEvent.setEntityType(value.getDeviceType());
deviceCheckInEvent.setDeviceMacAddress(value.getMac());
deviceCheckInEvent.setEventType(value.getType());
deviceCheckInEvent.setSerialNum(value.getSource());
deviceCheckInEvent.setDeviceModel(value.getModel());
deviceCheckInEvent.setStreamType(StreamType.getStreamType(value.getType()));
deviceCheckInEvent.setEventTime(value.getTimeStamp());
deviceCheckInEvent.setTenantId(value.getTenantId());
deviceCheckInEvent.setSiteid(value.getSiteId());
out.collect(deviceCheckInEvent);
}
});
Below is my pattern detection code:
Pattern<DeviceCheckInEvent, ?> connectEventPattern1 =
Pattern.<DeviceCheckInEvent>begin("first")
.where(new IterativeCondition<DeviceCheckInEvent>() {
@Override
public boolean filter(DeviceCheckInEvent value,
Context<DeviceCheckInEvent> ctx) throws Exception {
return value.getEventType() ==
EventType.ApConnect;
}
}).times(4).within(Time.minutes(6));
DataStream<Event> frequentEventStream =
CEP.pattern(inputStream.keyBy(new KeySelector<DeviceCheckInEvent, String>()
{
@Override
public String getKey(DeviceCheckInEvent arg0) throws
Exception {
return arg0.getSerialNum();
}
}), connectEventPattern1).flatSelect(new
PatternFlatSelectFunction<DeviceCheckInEvent, Event>() {
@Override
public void flatSelect(Map<String,
List<DeviceCheckInEvent>> pattern,
Collector<Event> out) throws Exception
{
List<DeviceCheckInEvent> connectOccurrences =
pattern.get("first");
DeviceCheckInEvent freqConnEvent =
connectOccurrences.get(connectOccurrences.size()-1);
Event event = new
Event(EventType.FrequentConnects,
freqConnEvent.getEntityType(), freqConnEvent.getTenantId(),
freqConnEvent.getSerialNum(),
freqConnEvent.getSiteid(),
freqConnEvent.getMac(), freqConnEvent.getEventTime());
event.setEventDescr(EMEventMessage.frequentconnects.getMsgFormat(freqConnEvent.getEntityType(),
freqConnEvent.getSerialNum(), count, timeElapsed));
logger.warn("Detected Frequent Connects:"+
builder.toString());
out.collect(event);
}
});
--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/