[
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163002#comment-16163002
]
Matteo Ferrario commented on FLINK-7606:
----------------------------------------
Hi Kostas,
thanks for your reply.
I give you some details about the job we've implemented.
The job processes a stream of messages coming from RabbitMQ.
Each message is deserialize to the custom 'Payload' POJO object, using the
'PayloadSchema' class:
{code:java}
public class Payload implements Serializable {
private static final long serialVersionUID = -7700917163136255068L;
private String deviceCode;
private Long stateTimestamp;
private String phase;
public Payload() {
}
public String getDeviceCode() {
return deviceCode;
}
public void setDeviceCode(String deviceCode) {
this.deviceCode = deviceCode;
}
public Long getStateTimestamp() {
return stateTimestamp;
}
public void setStateTimestamp(Long stateTimestamp) {
this.stateTimestamp = stateTimestamp;
}
public String getPhase() {
return phase;
}
public void setPhase(String phase) {
this.phase = phase;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Payload payload = (Payload) o;
return Objects.equals(deviceCode, payload.deviceCode) &&
Objects.equals(stateTimestamp, payload.stateTimestamp)
&& Objects.equals(phase, payload.phase);
}
@Override
public int hashCode() {
return Objects.hash(deviceCode, stateTimestamp, phase);
}
}
{code}
{code:java}
public class PayloadSchema implements DeserializationSchema<Payload>,
SerializationSchema<Payload> {
private ObjectMapper mapper = new ObjectMapper();
private static final long serialVersionUID = 1L;
@Override
public byte[] serialize(Payload element) {
byte[] out = new byte[0];
try {
out = mapper.writeValueAsBytes(element);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return out;
}
@Override
public Payload deserialize(byte[] message) {
Payload highLevelDeviceStatus = null;
try {
highLevelDeviceStatus = mapper.readValue(message, Payload.class);
} catch (IOException e) {
e.printStackTrace();
}
return highLevelDeviceStatus;
}
@Override
public boolean isEndOfStream(Payload nextElement) {
return false;
}
@Override
public TypeInformation<Payload> getProducedType() {
return TypeExtractor.getForClass(Payload.class);
}
}
{code}
'Payload' elements are ordered according to the field 'stateTimestamp':
{code:java}
DataStream<Payload> dataStreamSource =
env.addSource(new RMQSource<>(connectionConfig,
"input",
new PayloadSchema()))
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Payload>(Time.seconds(10)) {
private static final long serialVersionUID = -1L;
@Override
public long extractTimestamp(Payload element) {
if (element.getStateTimestamp()==null) {
throw new RuntimeException("HighLevelDeviceStatus
Timestamp is null during time ordering for device [" + element.getDeviceCode()
+ "]");
}
Date timestamp =
getTimeFromJsonTimestamp(element.getStateTimestamp());
logger.debug("DeviceCode [" + element.getDeviceCode() +
"] Time [" + df.format(timestamp) + "] Watermark [" + timestamp.getTime() +
"]");
return timestamp.getTime();
}
})
.uid("ciao")
.name("DEVICE_HL_STATUS");
{code}
The job tries to identify a specific pattern in the stream, generate an event
and send it to RabbitMQ:
{code:java}
Pattern<Payload, ?> pattern = Pattern
.<Payload>begin("start")
.subtype(Payload.class)
.where(new SimpleCondition<Payload>() {
@Override
public boolean filter(Payload value) throws Exception {
return !value.getPhase().equals("Start");
}
})
.next("end")
.subtype(Payload.class)
.where(new SimpleCondition<Payload>() {
@Override
public boolean filter(Payload value) throws Exception {
return value.getPhase().equals("Start");
}
})
.within(Time.minutes(5));
PatternFlatSelectFunction<Payload, Map> patternFlatSelectFunction = (statusMap,
collector) -> collector.collect(new Synth().synthesize(statusMap));
PatternStream<Payload> patternStreamStartOfCycle = CEP.pattern(
dataStreamSource.keyBy((KeySelector<Payload, Object>) value ->
value.getDeviceCode()),
pattern);
DataStream<Map> outputStream =
patternStreamStartOfCycle.flatSelect(patternFlatSelectFunction);
outputStream.addSink(new RMQSink(connectionConfig, "events", new
MapSchema())).name("prova");
{code}
> Memory leak on NestedMapsStateTable
> -----------------------------------
>
> Key: FLINK-7606
> URL: https://issues.apache.org/jira/browse/FLINK-7606
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.1
> Reporter: Matteo Ferrario
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two
> incoming messages match the pattern only if they come in a certain time
> window.
> What we've seen is that for every key present in the message, an NFA object
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages
> that don't match the pattern, the memory grows up (I suppose that the state
> of NFA is updated) but it is not cleaned also after the 5 minutes of time
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and
> also the screenshots about the memory leak.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)