[ https://issues.apache.org/jira/browse/BAHIR-144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266043#comment-16266043 ]
ASF GitHub Bot commented on BAHIR-144: -------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/22 > Add Siddhi CEP integration with Flink streaming > ----------------------------------------------- > > Key: BAHIR-144 > URL: https://issues.apache.org/jira/browse/BAHIR-144 > Project: Bahir > Issue Type: New Feature > Components: Flink Streaming Connectors > Reporter: Hao Chen > > Moved from: > * https://issues.apache.org/jira/browse/FLINK-4520 > * https://github.com/apache/flink/pull/2487 > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > **It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application.** > # Features > - Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > - Filter > - Join > - Aggregation > - Group by > - Having > - Window > - Conditions and Expressions > - Pattern processing > - Sequence processing > - Event Tables > ... > - Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > - Register Flink DataStream associating native type information with Siddhi > Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > - Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > - Return output stream as DataStream with type intelligently inferred from > Siddhi Stream Schema > - Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > - Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > # Test Cases > - [`org.apache.flink.contrib.siddhi. > SiddhiCEPITCase`](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) > # Example > ``` > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name > as name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > ``` -- This message was sent by Atlassian JIRA (v6.4.14#64029)