[ 
https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-20814.
------------------------------------
    Resolution: Not A Problem

> The CEP code is not running properly
> ------------------------------------
>
>                 Key: FLINK-20814
>                 URL: https://issues.apache.org/jira/browse/FLINK-20814
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0
>         Environment: flink1.12.0
> jdk1.8
>            Reporter: little-tomato
>            Priority: Blocker
>
> The cep code is running properly on flink1.11.2,but it is not working 
> properly on flink1.12.0.
> Can somebody help me?
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // DataStream : source
>               DataStream<TemperatureEvent> input = env.fromElements(new 
> TemperatureEvent(1,"Device01", 22.0),
>                 new TemperatureEvent(1,"Device01", 27.1), new 
> TemperatureEvent(2,"Device01", 28.1),
>                 new TemperatureEvent(1,"Device01", 22.2), new 
> TemperatureEvent(3,"Device01", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.3), new 
> TemperatureEvent(4,"Device02", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.4), new 
> TemperatureEvent(5,"Device02", 22.7),
>                 new TemperatureEvent(1,"Device02", 27.0), new 
> TemperatureEvent(6,"Device02", 30.0));
>         
>         Pattern<TemperatureEvent, ?> warningPattern = 
> Pattern.<TemperatureEvent>begin("start")
>                 .subtype(TemperatureEvent.class)
>                 .where(new SimpleCondition<TemperatureEvent>() {
>                       @Override
>                     public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getTemperature() >= 26.0) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).where(new SimpleCondition<TemperatureEvent>() {
>                       @Override
>                       public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getMachineName().equals("Device02")) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).within(Time.seconds(10));
>         DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
>                 .select(
>                         new RichPatternSelectFunction<TemperatureEvent, 
> Alert>() {
>                             /**
>                                                        * 
>                                                        */
>                                                       private static final 
> long serialVersionUID = 1L;
>                                                       @Override
>                                                       public void 
> open(Configuration parameters) throws Exception {
>                                                               
> System.out.println(getRuntimeContext().getUserCodeClassLoader());
>                                                       }
>                                                       @Override
>                             public Alert select(Map<String, 
> List<TemperatureEvent>> event) throws Exception {
>                               
>                                 return new Alert("Temperature Rise Detected: 
> " + event.get("start") + " on machine name: " + event.get("start"));
>                             }
>                         });
>         patternStream.print();
>         env.execute("CEP on Temperature Sensor");
> it should be output(on flink1.11.2):
> Alert [message=Temperature Rise Detected: [TemperatureEvent 
> [getTemperature()=27.0, getMachineName=Device02]] on machine name: 
> [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
> Alert [message=Temperature Rise Detected: [TemperatureEvent 
> [getTemperature()=30.0, getMachineName=Device02]] on machine name: 
> [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to