little-tomato created FLINK-20814:
-------------------------------------
Summary: The CEP code is not running properly
Key: FLINK-20814
URL: https://issues.apache.org/jira/browse/FLINK-20814
Project: Flink
Issue Type: Bug
Components: Library / CEP
Affects Versions: 1.12.0
Environment: flink1.12.0
jdk1.8
Reporter: little-tomato
The cep code is running properly on flink1.11.2,but it is not working properly
on flink1.12.0.
Can somebody help me?
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// DataStream : source
DataStream<TemperatureEvent> input = env.fromElements(new
TemperatureEvent(1,"Device01", 22.0),
new TemperatureEvent(1,"Device01", 27.1), new
TemperatureEvent(2,"Device01", 28.1),
new TemperatureEvent(1,"Device01", 22.2), new
TemperatureEvent(3,"Device01", 22.1),
new TemperatureEvent(1,"Device02", 22.3), new
TemperatureEvent(4,"Device02", 22.1),
new TemperatureEvent(1,"Device02", 22.4), new
TemperatureEvent(5,"Device02", 22.7),
new TemperatureEvent(1,"Device02", 27.0), new
TemperatureEvent(6,"Device02", 30.0));
Pattern<TemperatureEvent, ?> warningPattern =
Pattern.<TemperatureEvent>begin("start")
.subtype(TemperatureEvent.class)
.where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getTemperature() >= 26.0) {
return true;
}
return false;
}
}).where(new SimpleCondition<TemperatureEvent>() {
@Override
public boolean filter(TemperatureEvent subEvent) {
if (subEvent.getMachineName().equals("Device02")) {
return true;
}
return false;
}
}).within(Time.seconds(10));
DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
.select(
new RichPatternSelectFunction<TemperatureEvent,
Alert>() {
/**
*
*/
private static final
long serialVersionUID = 1L;
@Override
public void
open(Configuration parameters) throws Exception {
System.out.println(getRuntimeContext().getUserCodeClassLoader());
}
@Override
public Alert select(Map<String,
List<TemperatureEvent>> event) throws Exception {
return new Alert("Temperature Rise Detected: "
+ event.get("start") + " on machine name: " + event.get("start"));
}
});
patternStream.print();
env.execute("CEP on Temperature Sensor");
it should be output(on flink1.11.2):
Alert [message=Temperature Rise Detected: [TemperatureEvent
[getTemperature()=27.0, getMachineName=Device02]] on machine name:
[TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
Alert [message=Temperature Rise Detected: [TemperatureEvent
[getTemperature()=30.0, getMachineName=Device02]] on machine name:
[TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)