[ 
https://issues.apache.org/jira/browse/FLINK-38566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18032988#comment-18032988
 ] 

xingyuan cheng commented on FLINK-38566:
----------------------------------------

Hi [~trohrmann], do you have time to look at this feature?

> Support CEP DSL
> ---------------
>
>                 Key: FLINK-38566
>                 URL: https://issues.apache.org/jira/browse/FLINK-38566
>             Project: Flink
>          Issue Type: New Feature
>          Components: Library / CEP
>            Reporter: xingyuan cheng
>            Priority: Major
>              Labels: pull-request-available
>
> In IoT and crowd selection scenarios, complex CEP expressions are difficult 
> for users to flexibly configure. Therefore, a simplified set of conditional 
> expressions is needed to express CEP operators. This feature change aims to 
> provide a convenient way to implement flexible and configurable expressions 
> by configuring conditional expressions. I translated the syntax parsing logic 
> from DSL to CEP operators using Anltr4 syntax.
>  
> *Migration from Pattern API*
> *Before (Pattern API)*
> {code:java}
> Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
>     .where(new SimpleCondition<Event>() {
>         @Override
>         public boolean filter(Event event) {
>             return event.getValue() > 100;
>         }
>     })
>     .next("middle")
>     .where(new SimpleCondition<Event>() {
>         @Override
>         public boolean filter(Event event) {
>             return event.getValue() < 50;
>         }
>     }); {code}
> *After (DSL)*
> {code:java}
> PatternStream<Event> pattern = DslCompiler.compile(
>     "start(value > 100) middle(value < 50)",
>     dataStream
> ); {code}
>  
>  
> The following is a simple demonstration use case:
>  
> {code:java}
> package org.apache.flink.cep.dsl;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.dsl.api.DslCompiler;
> import org.apache.flink.cep.dsl.api.EventAdapter;
> import org.apache.flink.cep.dsl.util.MapEventAdapter;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> /**
>  * Example usage of the CEP DSL.
>  *
>  * <p>This class demonstrates various ways to use the DSL with different 
> event types and patterns.
>  * These are examples only and not executable tests.
>  */
> public class DslExampleUsage {
>     // Example 1: Simple POJO events with basic pattern
>     public static void simplePojoExample() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // Create sample data
>         DataStream<SensorReading> sensorData =
>                 env.fromElements(
>                         new SensorReading("sensor1", 95.0, 
> System.currentTimeMillis()),
>                         new SensorReading("sensor1", 105.0, 
> System.currentTimeMillis()),
>                         new SensorReading("sensor1", 110.0, 
> System.currentTimeMillis()));
>         // Define pattern using DSL
>         PatternStream<SensorReading> pattern =
>                 DslCompiler.compile("HighTemp(temperature > 100)", 
> sensorData);
>         // Process matches
>         pattern.select(
>                         match -> {
>                             SensorReading reading = 
> match.get("HighTemp").get(0);
>                             return String.format(
>                                     "High temperature alert: Sensor %s at 
> %.1f°C",
>                                     reading.id, reading.temperature);
>                         })
>                 .print();
>         env.execute("Simple POJO Example");
>     }
>     // Example 2: Event correlation
>     public static void eventCorrelationExample() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<SensorReading> sensorData =
>                 env.fromElements(
>                         new SensorReading("sensor1", 95.0, 1000L),
>                         new SensorReading("sensor1", 105.0, 2000L),
>                         new SensorReading("sensor2", 110.0, 3000L));
>         // Pattern with event correlation
>         String dsl = "Start(id = 'sensor1' and temperature > 90) -> " +
>                     "End(id = Start.id and temperature > Start.temperature)";
>         PatternStream<SensorReading> pattern = DslCompiler.compile(dsl, 
> sensorData);
>         pattern.select(
>                         match -> {
>                             SensorReading start = match.get("Start").get(0);
>                             SensorReading end = match.get("End").get(0);
>                             return String.format(
>                                     "Temperature rise detected: %.1f -> %.1f",
>                                     start.temperature, end.temperature);
>                         })
>                 .print();
>         env.execute("Event Correlation Example");
>     }
>     // Example 3: Map-based events
>     public static void mapEventExample() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // Create Map events
>         Map<String, Object> event1 = new HashMap<>();
>         event1.put("_eventType", "Alert");
>         event1.put("severity", 7);
>         event1.put("message", "High CPU usage");
>         Map<String, Object> event2 = new HashMap<>();
>         event2.put("_eventType", "Alert");
>         event2.put("severity", 9);
>         event2.put("message", "Critical error");
>         DataStream<Map<String, Object>> alerts = env.fromElements(event1, 
> event2);
>         // Use MapEventAdapter
>         PatternStream<Map<String, Object>> pattern =
>                 DslCompiler.compile(
>                         "Alert(severity > 5)", alerts, new MapEventAdapter());
>         pattern.select(match -> 
> match.get("Alert").get(0).get("message")).print();
>         env.execute("Map Event Example");
>     }
>     // Example 4: Complex pattern with quantifiers and time window
>     public static void complexPatternExample() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<UserEvent> userEvents =
>                 env.fromElements(
>                         new UserEvent("user1", "login", 1000L),
>                         new UserEvent("user1", "browse", 2000L),
>                         new UserEvent("user1", "browse", 3000L),
>                         new UserEvent("user1", "purchase", 4000L));
>         // Complex pattern: login -> multiple browses -> purchase, within 30 
> seconds
>         String dsl =
>                 "%SKIP_TO_LAST['Login'] "
>                         + "Login(action = 'login') -> "
>                         + "Browse{1,5}(action = 'browse' and userId = 
> Login.userId) -> "
>                         + "Purchase(action = 'purchase' and userId = 
> Login.userId) "
>                         + "within 30s";
>         PatternStream<UserEvent> pattern = DslCompiler.compile(dsl, 
> userEvents);
>         pattern.select(
>                         match -> {
>                             UserEvent login = match.get("Login").get(0);
>                             List<UserEvent> browses = match.get("Browse");
>                             UserEvent purchase = match.get("Purchase").get(0);
>                             return String.format(
>                                     "User %s: login -> %d browses -> 
> purchase",
>                                     login.userId, browses.size());
>                         })
>                 .print();
>         env.execute("Complex Pattern Example");
>     }
>     // Example 5: Builder API with custom adapter
>     public static void builderApiExample() throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         DataStream<CustomEvent> events = env.fromElements(new CustomEvent());
>         // Custom event adapter
>         EventAdapter<CustomEvent> customAdapter =
>                 new EventAdapter<CustomEvent>() {
>                     @Override
>                     public java.util.Optional<Object> getAttribute(
>                             CustomEvent event, String attributeName) {
>                         return 
> java.util.Optional.ofNullable(event.getField(attributeName));
>                     }
>                     @Override
>                     public String getEventType(CustomEvent event) {
>                         return event.getTypeName();
>                     }
>                 };
>         // Use builder API
>         PatternStream<CustomEvent> pattern =
>                 DslCompiler.<CustomEvent>builder()
>                         .withStrictTypeMatching()
>                         .withEventAdapter(customAdapter)
>                         .compile("MyEvent(value > 100)", events);
>         pattern.select(match -> "Matched: " + 
> match.get("MyEvent").get(0)).print();
>         env.execute("Builder API Example");
>     }
>     // Example event classes
>     /** Simple sensor reading POJO. */
>     public static class SensorReading {
>         public String id;
>         public double temperature;
>         public long timestamp;
>         public SensorReading(String id, double temperature, long timestamp) {
>             this.id = id;
>             this.temperature = temperature;
>             this.timestamp = timestamp;
>         }
>         public String getId() {
>             return id;
>         }
>         public double getTemperature() {
>             return temperature;
>         }
>         public long getTimestamp() {
>             return timestamp;
>         }
>     }
>     /** User event POJO. */
>     public static class UserEvent {
>         public String userId;
>         public String action;
>         public long timestamp;
>         public UserEvent(String userId, String action, long timestamp) {
>             this.userId = userId;
>             this.action = action;
>             this.timestamp = timestamp;
>         }
>         public String getUserId() {
>             return userId;
>         }
>         public String getAction() {
>             return action;
>         }
>         public long getTimestamp() {
>             return timestamp;
>         }
>     }
>     /** Custom event type. */
>     public static class CustomEvent {
>         public Object getField(String name) {
>             return null; // Implementation omitted
>         }
>         public String getTypeName() {
>             return "MyEvent";
>         }
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to