[
https://issues.apache.org/jira/browse/FLINK-38566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
xingyuan cheng updated FLINK-38566:
-----------------------------------
Description:
In IoT and crowd selection scenarios, complex CEP expressions are difficult for
users to flexibly configure. Therefore, a simplified set of conditional
expressions is needed to express CEP operators. This feature change aims to
provide a convenient way to implement flexible and configurable expressions by
configuring conditional expressions. I translated the syntax parsing logic from
DSL to CEP operators using Anltr4 syntax.
*Migration from Pattern API*
*Before (Pattern API)*
{code:java}
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getValue() > 100;
}
})
.next("middle")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getValue() < 50;
}
}); {code}
*After (DSL)*
{code:java}
PatternStream<Event> pattern = DslCompiler.compile(
"start(value > 100) middle(value < 50)",
dataStream
); {code}
The following is a simple demonstration use case:
{code:java}
package org.apache.flink.cep.dsl;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.dsl.api.DslCompiler;
import org.apache.flink.cep.dsl.api.EventAdapter;
import org.apache.flink.cep.dsl.util.MapEventAdapter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Example usage of the CEP DSL.
*
* <p>This class demonstrates various ways to use the DSL with different event
types and patterns.
* These are examples only and not executable tests.
*/
public class DslExampleUsage {
// Example 1: Simple POJO events with basic pattern
public static void simplePojoExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Create sample data
DataStream<SensorReading> sensorData =
env.fromElements(
new SensorReading("sensor1", 95.0,
System.currentTimeMillis()),
new SensorReading("sensor1", 105.0,
System.currentTimeMillis()),
new SensorReading("sensor1", 110.0,
System.currentTimeMillis()));
// Define pattern using DSL
PatternStream<SensorReading> pattern =
DslCompiler.compile("HighTemp(temperature > 100)", sensorData);
// Process matches
pattern.select(
match -> {
SensorReading reading =
match.get("HighTemp").get(0);
return String.format(
"High temperature alert: Sensor %s at
%.1f°C",
reading.id, reading.temperature);
})
.print();
env.execute("Simple POJO Example");
}
// Example 2: Event correlation
public static void eventCorrelationExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> sensorData =
env.fromElements(
new SensorReading("sensor1", 95.0, 1000L),
new SensorReading("sensor1", 105.0, 2000L),
new SensorReading("sensor2", 110.0, 3000L));
// Pattern with event correlation
String dsl = "Start(id = 'sensor1' and temperature > 90) -> " +
"End(id = Start.id and temperature > Start.temperature)";
PatternStream<SensorReading> pattern = DslCompiler.compile(dsl,
sensorData);
pattern.select(
match -> {
SensorReading start = match.get("Start").get(0);
SensorReading end = match.get("End").get(0);
return String.format(
"Temperature rise detected: %.1f -> %.1f",
start.temperature, end.temperature);
})
.print();
env.execute("Event Correlation Example");
}
// Example 3: Map-based events
public static void mapEventExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Create Map events
Map<String, Object> event1 = new HashMap<>();
event1.put("_eventType", "Alert");
event1.put("severity", 7);
event1.put("message", "High CPU usage");
Map<String, Object> event2 = new HashMap<>();
event2.put("_eventType", "Alert");
event2.put("severity", 9);
event2.put("message", "Critical error");
DataStream<Map<String, Object>> alerts = env.fromElements(event1,
event2);
// Use MapEventAdapter
PatternStream<Map<String, Object>> pattern =
DslCompiler.compile(
"Alert(severity > 5)", alerts, new MapEventAdapter());
pattern.select(match ->
match.get("Alert").get(0).get("message")).print();
env.execute("Map Event Example");
}
// Example 4: Complex pattern with quantifiers and time window
public static void complexPatternExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserEvent> userEvents =
env.fromElements(
new UserEvent("user1", "login", 1000L),
new UserEvent("user1", "browse", 2000L),
new UserEvent("user1", "browse", 3000L),
new UserEvent("user1", "purchase", 4000L));
// Complex pattern: login -> multiple browses -> purchase, within 30
seconds
String dsl =
"%SKIP_TO_LAST['Login'] "
+ "Login(action = 'login') -> "
+ "Browse{1,5}(action = 'browse' and userId =
Login.userId) -> "
+ "Purchase(action = 'purchase' and userId =
Login.userId) "
+ "within 30s";
PatternStream<UserEvent> pattern = DslCompiler.compile(dsl, userEvents);
pattern.select(
match -> {
UserEvent login = match.get("Login").get(0);
List<UserEvent> browses = match.get("Browse");
UserEvent purchase = match.get("Purchase").get(0);
return String.format(
"User %s: login -> %d browses -> purchase",
login.userId, browses.size());
})
.print();
env.execute("Complex Pattern Example");
}
// Example 5: Builder API with custom adapter
public static void builderApiExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<CustomEvent> events = env.fromElements(new CustomEvent());
// Custom event adapter
EventAdapter<CustomEvent> customAdapter =
new EventAdapter<CustomEvent>() {
@Override
public java.util.Optional<Object> getAttribute(
CustomEvent event, String attributeName) {
return
java.util.Optional.ofNullable(event.getField(attributeName));
}
@Override
public String getEventType(CustomEvent event) {
return event.getTypeName();
}
};
// Use builder API
PatternStream<CustomEvent> pattern =
DslCompiler.<CustomEvent>builder()
.withStrictTypeMatching()
.withEventAdapter(customAdapter)
.compile("MyEvent(value > 100)", events);
pattern.select(match -> "Matched: " +
match.get("MyEvent").get(0)).print();
env.execute("Builder API Example");
}
// Example event classes
/** Simple sensor reading POJO. */
public static class SensorReading {
public String id;
public double temperature;
public long timestamp;
public SensorReading(String id, double temperature, long timestamp) {
this.id = id;
this.temperature = temperature;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public long getTimestamp() {
return timestamp;
}
}
/** User event POJO. */
public static class UserEvent {
public String userId;
public String action;
public long timestamp;
public UserEvent(String userId, String action, long timestamp) {
this.userId = userId;
this.action = action;
this.timestamp = timestamp;
}
public String getUserId() {
return userId;
}
public String getAction() {
return action;
}
public long getTimestamp() {
return timestamp;
}
}
/** Custom event type. */
public static class CustomEvent {
public Object getField(String name) {
return null; // Implementation omitted
}
public String getTypeName() {
return "MyEvent";
}
}
}{code}
was:
In IoT and crowd selection scenarios, complex CEP expressions are difficult for
users to flexibly configure. Therefore, a simplified set of conditional
expressions is needed to express CEP operators. This feature change aims to
provide a convenient way to implement flexible and configurable expressions by
configuring conditional expressions. I translated the syntax parsing logic from
DSL to CEP operators using ALT4 syntax.
*Migration from Pattern API*
*Before (Pattern API)*
{code:java}
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getValue() > 100;
}
})
.next("middle")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getValue() < 50;
}
}); {code}
*After (DSL)*
{code:java}
PatternStream<Event> pattern = DslCompiler.compile(
"start(value > 100) middle(value < 50)",
dataStream
); {code}
The following is a simple demonstration use case:
{code:java}
package org.apache.flink.cep.dsl;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.dsl.api.DslCompiler;
import org.apache.flink.cep.dsl.api.EventAdapter;
import org.apache.flink.cep.dsl.util.MapEventAdapter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Example usage of the CEP DSL.
*
* <p>This class demonstrates various ways to use the DSL with different event
types and patterns.
* These are examples only and not executable tests.
*/
public class DslExampleUsage {
// Example 1: Simple POJO events with basic pattern
public static void simplePojoExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Create sample data
DataStream<SensorReading> sensorData =
env.fromElements(
new SensorReading("sensor1", 95.0,
System.currentTimeMillis()),
new SensorReading("sensor1", 105.0,
System.currentTimeMillis()),
new SensorReading("sensor1", 110.0,
System.currentTimeMillis()));
// Define pattern using DSL
PatternStream<SensorReading> pattern =
DslCompiler.compile("HighTemp(temperature > 100)", sensorData);
// Process matches
pattern.select(
match -> {
SensorReading reading =
match.get("HighTemp").get(0);
return String.format(
"High temperature alert: Sensor %s at
%.1f°C",
reading.id, reading.temperature);
})
.print();
env.execute("Simple POJO Example");
}
// Example 2: Event correlation
public static void eventCorrelationExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> sensorData =
env.fromElements(
new SensorReading("sensor1", 95.0, 1000L),
new SensorReading("sensor1", 105.0, 2000L),
new SensorReading("sensor2", 110.0, 3000L));
// Pattern with event correlation
String dsl = "Start(id = 'sensor1' and temperature > 90) -> " +
"End(id = Start.id and temperature > Start.temperature)";
PatternStream<SensorReading> pattern = DslCompiler.compile(dsl,
sensorData);
pattern.select(
match -> {
SensorReading start = match.get("Start").get(0);
SensorReading end = match.get("End").get(0);
return String.format(
"Temperature rise detected: %.1f -> %.1f",
start.temperature, end.temperature);
})
.print();
env.execute("Event Correlation Example");
}
// Example 3: Map-based events
public static void mapEventExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Create Map events
Map<String, Object> event1 = new HashMap<>();
event1.put("_eventType", "Alert");
event1.put("severity", 7);
event1.put("message", "High CPU usage");
Map<String, Object> event2 = new HashMap<>();
event2.put("_eventType", "Alert");
event2.put("severity", 9);
event2.put("message", "Critical error");
DataStream<Map<String, Object>> alerts = env.fromElements(event1,
event2);
// Use MapEventAdapter
PatternStream<Map<String, Object>> pattern =
DslCompiler.compile(
"Alert(severity > 5)", alerts, new MapEventAdapter());
pattern.select(match ->
match.get("Alert").get(0).get("message")).print();
env.execute("Map Event Example");
}
// Example 4: Complex pattern with quantifiers and time window
public static void complexPatternExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<UserEvent> userEvents =
env.fromElements(
new UserEvent("user1", "login", 1000L),
new UserEvent("user1", "browse", 2000L),
new UserEvent("user1", "browse", 3000L),
new UserEvent("user1", "purchase", 4000L));
// Complex pattern: login -> multiple browses -> purchase, within 30
seconds
String dsl =
"%SKIP_TO_LAST['Login'] "
+ "Login(action = 'login') -> "
+ "Browse{1,5}(action = 'browse' and userId =
Login.userId) -> "
+ "Purchase(action = 'purchase' and userId =
Login.userId) "
+ "within 30s";
PatternStream<UserEvent> pattern = DslCompiler.compile(dsl, userEvents);
pattern.select(
match -> {
UserEvent login = match.get("Login").get(0);
List<UserEvent> browses = match.get("Browse");
UserEvent purchase = match.get("Purchase").get(0);
return String.format(
"User %s: login -> %d browses -> purchase",
login.userId, browses.size());
})
.print();
env.execute("Complex Pattern Example");
}
// Example 5: Builder API with custom adapter
public static void builderApiExample() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<CustomEvent> events = env.fromElements(new CustomEvent());
// Custom event adapter
EventAdapter<CustomEvent> customAdapter =
new EventAdapter<CustomEvent>() {
@Override
public java.util.Optional<Object> getAttribute(
CustomEvent event, String attributeName) {
return
java.util.Optional.ofNullable(event.getField(attributeName));
}
@Override
public String getEventType(CustomEvent event) {
return event.getTypeName();
}
};
// Use builder API
PatternStream<CustomEvent> pattern =
DslCompiler.<CustomEvent>builder()
.withStrictTypeMatching()
.withEventAdapter(customAdapter)
.compile("MyEvent(value > 100)", events);
pattern.select(match -> "Matched: " +
match.get("MyEvent").get(0)).print();
env.execute("Builder API Example");
}
// Example event classes
/** Simple sensor reading POJO. */
public static class SensorReading {
public String id;
public double temperature;
public long timestamp;
public SensorReading(String id, double temperature, long timestamp) {
this.id = id;
this.temperature = temperature;
this.timestamp = timestamp;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public long getTimestamp() {
return timestamp;
}
}
/** User event POJO. */
public static class UserEvent {
public String userId;
public String action;
public long timestamp;
public UserEvent(String userId, String action, long timestamp) {
this.userId = userId;
this.action = action;
this.timestamp = timestamp;
}
public String getUserId() {
return userId;
}
public String getAction() {
return action;
}
public long getTimestamp() {
return timestamp;
}
}
/** Custom event type. */
public static class CustomEvent {
public Object getField(String name) {
return null; // Implementation omitted
}
public String getTypeName() {
return "MyEvent";
}
}
}{code}
> Support CEP DSL
> ---------------
>
> Key: FLINK-38566
> URL: https://issues.apache.org/jira/browse/FLINK-38566
> Project: Flink
> Issue Type: New Feature
> Components: Library / CEP
> Reporter: xingyuan cheng
> Priority: Major
> Labels: pull-request-available
>
> In IoT and crowd selection scenarios, complex CEP expressions are difficult
> for users to flexibly configure. Therefore, a simplified set of conditional
> expressions is needed to express CEP operators. This feature change aims to
> provide a convenient way to implement flexible and configurable expressions
> by configuring conditional expressions. I translated the syntax parsing logic
> from DSL to CEP operators using Anltr4 syntax.
>
> *Migration from Pattern API*
> *Before (Pattern API)*
> {code:java}
> Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
> .where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event event) {
> return event.getValue() > 100;
> }
> })
> .next("middle")
> .where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event event) {
> return event.getValue() < 50;
> }
> }); {code}
> *After (DSL)*
> {code:java}
> PatternStream<Event> pattern = DslCompiler.compile(
> "start(value > 100) middle(value < 50)",
> dataStream
> ); {code}
>
>
> The following is a simple demonstration use case:
>
> {code:java}
> package org.apache.flink.cep.dsl;
> import org.apache.flink.cep.PatternStream;
> import org.apache.flink.cep.dsl.api.DslCompiler;
> import org.apache.flink.cep.dsl.api.EventAdapter;
> import org.apache.flink.cep.dsl.util.MapEventAdapter;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
> /**
> * Example usage of the CEP DSL.
> *
> * <p>This class demonstrates various ways to use the DSL with different
> event types and patterns.
> * These are examples only and not executable tests.
> */
> public class DslExampleUsage {
> // Example 1: Simple POJO events with basic pattern
> public static void simplePojoExample() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // Create sample data
> DataStream<SensorReading> sensorData =
> env.fromElements(
> new SensorReading("sensor1", 95.0,
> System.currentTimeMillis()),
> new SensorReading("sensor1", 105.0,
> System.currentTimeMillis()),
> new SensorReading("sensor1", 110.0,
> System.currentTimeMillis()));
> // Define pattern using DSL
> PatternStream<SensorReading> pattern =
> DslCompiler.compile("HighTemp(temperature > 100)",
> sensorData);
> // Process matches
> pattern.select(
> match -> {
> SensorReading reading =
> match.get("HighTemp").get(0);
> return String.format(
> "High temperature alert: Sensor %s at
> %.1f°C",
> reading.id, reading.temperature);
> })
> .print();
> env.execute("Simple POJO Example");
> }
> // Example 2: Event correlation
> public static void eventCorrelationExample() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<SensorReading> sensorData =
> env.fromElements(
> new SensorReading("sensor1", 95.0, 1000L),
> new SensorReading("sensor1", 105.0, 2000L),
> new SensorReading("sensor2", 110.0, 3000L));
> // Pattern with event correlation
> String dsl = "Start(id = 'sensor1' and temperature > 90) -> " +
> "End(id = Start.id and temperature > Start.temperature)";
> PatternStream<SensorReading> pattern = DslCompiler.compile(dsl,
> sensorData);
> pattern.select(
> match -> {
> SensorReading start = match.get("Start").get(0);
> SensorReading end = match.get("End").get(0);
> return String.format(
> "Temperature rise detected: %.1f -> %.1f",
> start.temperature, end.temperature);
> })
> .print();
> env.execute("Event Correlation Example");
> }
> // Example 3: Map-based events
> public static void mapEventExample() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // Create Map events
> Map<String, Object> event1 = new HashMap<>();
> event1.put("_eventType", "Alert");
> event1.put("severity", 7);
> event1.put("message", "High CPU usage");
> Map<String, Object> event2 = new HashMap<>();
> event2.put("_eventType", "Alert");
> event2.put("severity", 9);
> event2.put("message", "Critical error");
> DataStream<Map<String, Object>> alerts = env.fromElements(event1,
> event2);
> // Use MapEventAdapter
> PatternStream<Map<String, Object>> pattern =
> DslCompiler.compile(
> "Alert(severity > 5)", alerts, new MapEventAdapter());
> pattern.select(match ->
> match.get("Alert").get(0).get("message")).print();
> env.execute("Map Event Example");
> }
> // Example 4: Complex pattern with quantifiers and time window
> public static void complexPatternExample() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<UserEvent> userEvents =
> env.fromElements(
> new UserEvent("user1", "login", 1000L),
> new UserEvent("user1", "browse", 2000L),
> new UserEvent("user1", "browse", 3000L),
> new UserEvent("user1", "purchase", 4000L));
> // Complex pattern: login -> multiple browses -> purchase, within 30
> seconds
> String dsl =
> "%SKIP_TO_LAST['Login'] "
> + "Login(action = 'login') -> "
> + "Browse{1,5}(action = 'browse' and userId =
> Login.userId) -> "
> + "Purchase(action = 'purchase' and userId =
> Login.userId) "
> + "within 30s";
> PatternStream<UserEvent> pattern = DslCompiler.compile(dsl,
> userEvents);
> pattern.select(
> match -> {
> UserEvent login = match.get("Login").get(0);
> List<UserEvent> browses = match.get("Browse");
> UserEvent purchase = match.get("Purchase").get(0);
> return String.format(
> "User %s: login -> %d browses ->
> purchase",
> login.userId, browses.size());
> })
> .print();
> env.execute("Complex Pattern Example");
> }
> // Example 5: Builder API with custom adapter
> public static void builderApiExample() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<CustomEvent> events = env.fromElements(new CustomEvent());
> // Custom event adapter
> EventAdapter<CustomEvent> customAdapter =
> new EventAdapter<CustomEvent>() {
> @Override
> public java.util.Optional<Object> getAttribute(
> CustomEvent event, String attributeName) {
> return
> java.util.Optional.ofNullable(event.getField(attributeName));
> }
> @Override
> public String getEventType(CustomEvent event) {
> return event.getTypeName();
> }
> };
> // Use builder API
> PatternStream<CustomEvent> pattern =
> DslCompiler.<CustomEvent>builder()
> .withStrictTypeMatching()
> .withEventAdapter(customAdapter)
> .compile("MyEvent(value > 100)", events);
> pattern.select(match -> "Matched: " +
> match.get("MyEvent").get(0)).print();
> env.execute("Builder API Example");
> }
> // Example event classes
> /** Simple sensor reading POJO. */
> public static class SensorReading {
> public String id;
> public double temperature;
> public long timestamp;
> public SensorReading(String id, double temperature, long timestamp) {
> this.id = id;
> this.temperature = temperature;
> this.timestamp = timestamp;
> }
> public String getId() {
> return id;
> }
> public double getTemperature() {
> return temperature;
> }
> public long getTimestamp() {
> return timestamp;
> }
> }
> /** User event POJO. */
> public static class UserEvent {
> public String userId;
> public String action;
> public long timestamp;
> public UserEvent(String userId, String action, long timestamp) {
> this.userId = userId;
> this.action = action;
> this.timestamp = timestamp;
> }
> public String getUserId() {
> return userId;
> }
> public String getAction() {
> return action;
> }
> public long getTimestamp() {
> return timestamp;
> }
> }
> /** Custom event type. */
> public static class CustomEvent {
> public Object getField(String name) {
> return null; // Implementation omitted
> }
> public String getTypeName() {
> return "MyEvent";
> }
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)