[
https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256346#comment-17256346
]
huqingwen commented on FLINK-20814:
-----------------------------------
I have the same problem。
> The CEP code is not running properly
> ------------------------------------
>
> Key: FLINK-20814
> URL: https://issues.apache.org/jira/browse/FLINK-20814
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.12.0
> Environment: flink1.12.0
> jdk1.8
> Reporter: little-tomato
> Priority: Blocker
>
> The cep code is running properly on flink1.11.2,but it is not working
> properly on flink1.12.0.
> Can somebody help me?
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // DataStream : source
> DataStream<TemperatureEvent> input = env.fromElements(new
> TemperatureEvent(1,"Device01", 22.0),
> new TemperatureEvent(1,"Device01", 27.1), new
> TemperatureEvent(2,"Device01", 28.1),
> new TemperatureEvent(1,"Device01", 22.2), new
> TemperatureEvent(3,"Device01", 22.1),
> new TemperatureEvent(1,"Device02", 22.3), new
> TemperatureEvent(4,"Device02", 22.1),
> new TemperatureEvent(1,"Device02", 22.4), new
> TemperatureEvent(5,"Device02", 22.7),
> new TemperatureEvent(1,"Device02", 27.0), new
> TemperatureEvent(6,"Device02", 30.0));
>
> Pattern<TemperatureEvent, ?> warningPattern =
> Pattern.<TemperatureEvent>begin("start")
> .subtype(TemperatureEvent.class)
> .where(new SimpleCondition<TemperatureEvent>() {
> @Override
> public boolean filter(TemperatureEvent subEvent) {
> if (subEvent.getTemperature() >= 26.0) {
> return true;
> }
> return false;
> }
> }).where(new SimpleCondition<TemperatureEvent>() {
> @Override
> public boolean filter(TemperatureEvent subEvent) {
> if (subEvent.getMachineName().equals("Device02")) {
> return true;
> }
> return false;
> }
> }).within(Time.seconds(10));
> DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
> .select(
> new RichPatternSelectFunction<TemperatureEvent,
> Alert>() {
> /**
> *
> */
> private static final
> long serialVersionUID = 1L;
> @Override
> public void
> open(Configuration parameters) throws Exception {
>
> System.out.println(getRuntimeContext().getUserCodeClassLoader());
> }
> @Override
> public Alert select(Map<String,
> List<TemperatureEvent>> event) throws Exception {
>
> return new Alert("Temperature Rise Detected:
> " + event.get("start") + " on machine name: " + event.get("start"));
> }
> });
> patternStream.print();
> env.execute("CEP on Temperature Sensor");
> it should be output(on flink1.11.2):
> Alert [message=Temperature Rise Detected: [TemperatureEvent
> [getTemperature()=27.0, getMachineName=Device02]] on machine name:
> [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
> Alert [message=Temperature Rise Detected: [TemperatureEvent
> [getTemperature()=30.0, getMachineName=Device02]] on machine name:
> [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)