This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f299361  [BAHIR-280] Update siddhi to 5.1.19
f299361 is described below

commit f2993613ad62e816bdc1dcc013dbb57904f6b64e
Author: Joao Boto <[email protected]>
AuthorDate: Tue Sep 14 16:43:17 2021 +0200

    [BAHIR-280] Update siddhi to 5.1.19
---
 flink-library-siddhi/pom.xml                       |  6 ++--
 .../apache/flink/streaming/siddhi/SiddhiCEP.java   |  6 ++--
 .../flink/streaming/siddhi/SiddhiStream.java       |  4 +--
 .../siddhi/operator/AbstractSiddhiOperator.java    | 39 ++++++++++------------
 .../siddhi/operator/SiddhiOperatorContext.java     |  4 +--
 .../siddhi/operator/SiddhiStreamOperator.java      | 10 +++---
 .../siddhi/operator/StreamInMemOutputHandler.java  | 18 +++++-----
 .../siddhi/operator/StreamOutputHandler.java       | 10 +++---
 .../siddhi/schema/SiddhiStreamSchema.java          |  4 +--
 .../siddhi/utils/SiddhiStreamFactory.java          |  2 +-
 .../streaming/siddhi/utils/SiddhiTupleFactory.java | 28 +---------------
 .../streaming/siddhi/utils/SiddhiTypeFactory.java  | 10 +++---
 .../flink/streaming/siddhi/SiddhiCEPITCase.java    | 24 ++++++-------
 .../extension/CustomPlusFunctionExtension.java     | 33 +++++++++---------
 .../siddhi/operator/SiddhiSyntaxTest.java          | 10 +++---
 .../schema/SiddhiExecutionPlanSchemaTest.java      |  4 +--
 16 files changed, 91 insertions(+), 121 deletions(-)

diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 32b541d..44e1ec2 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -33,13 +33,13 @@
     <packaging>jar</packaging>
 
     <properties>
-        <siddhi.version>4.3.14</siddhi.version>
+        <siddhi.version>5.1.19</siddhi.version>
     </properties>
 
     <dependencies>
         <!-- core dependencies -->
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-core</artifactId>
             <version>${siddhi.version}</version>
             <exclusions>
@@ -50,7 +50,7 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.wso2.siddhi</groupId>
+            <groupId>io.siddhi</groupId>
             <artifactId>siddhi-query-api</artifactId>
             <version>${siddhi.version}</version>
             <exclusions>
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
index a63dbf6..0e67fd8 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -18,12 +18,12 @@
 package org.apache.flink.streaming.siddhi;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
 import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
 import org.apache.flink.streaming.siddhi.schema.StreamSchema;
 import org.apache.flink.util.Preconditions;
 
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
index ca61a0a..147c548 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
 import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
 import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index 79df6ac..b796a88 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -17,22 +17,18 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.query.api.SiddhiApp;
+import io.siddhi.query.api.annotation.Annotation;
+import io.siddhi.query.api.annotation.Element;
+import io.siddhi.query.api.definition.AbstractDefinition;
+import io.siddhi.query.compiler.SiddhiCompiler;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.schema.StreamSchema;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
@@ -48,16 +44,15 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.SiddhiApp;
-import org.wso2.siddhi.query.api.annotation.Annotation;
-import org.wso2.siddhi.query.api.annotation.Element;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
 
 /**
  * <h1>Siddhi Runtime Operator</h1>
@@ -66,13 +61,13 @@ import org.wso2.siddhi.query.compiler.SiddhiCompiler;
  *
  * <ul>
  * <li>
- * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according 
predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * Create Siddhi {@link io.siddhi.core.SiddhiAppRuntime} according predefined 
execution plan and integrate with Flink Stream Operator lifecycle.
  * </li>
  * <li>
  * Connect Flink DataStreams with predefined Siddhi Stream according to unique 
streamId
  * </li>
  * <li>
- * Convert native {@link StreamRecord} to Siddhi {@link 
org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send 
to Siddhi Runtime.
+ * Convert native {@link StreamRecord} to Siddhi {@link 
io.siddhi.core.event.Event} according to {@link StreamSchema}, and send to 
Siddhi Runtime.
  * </li>
  * <li>
  * Listen output callback event and convert as expected output type according 
to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then 
output as typed DataStream.
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
index f760938..0e90781 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
+import io.siddhi.core.SiddhiManager;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
 import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
 import org.apache.flink.streaming.siddhi.schema.StreamSchema;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.util.Preconditions;
-import org.wso2.siddhi.core.SiddhiManager;
 
 import java.io.Serializable;
 import java.util.ArrayList;
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
index 0ce719c..45e4b32 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -17,19 +17,19 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
-import java.io.IOException;
-import java.util.PriorityQueue;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.siddhi.schema.StreamSchema;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
 
 /**
  * Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
index 49c27c2..5962d98 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -17,22 +17,22 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.AbstractDefinition;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
 
 /**
  * Siddhi Stream output callback handler and conver siddhi {@link Event} to 
required output type,
@@ -86,7 +86,7 @@ public class StreamInMemOutputHandler<R> extends 
StreamCallback {
         this.collectedRecords.clear();
     }
 
-    private Map<String, Object> toMap(Event event) {
+    public Map<String, Object> toMap(Event event) {
         Map<String, Object> map = new LinkedHashMap<>();
         for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
             map.put(definition.getAttributeNameArray()[i], event.getData(i));
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
index 1c1096c..18bcca8 100755
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -17,19 +17,19 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.AbstractDefinition;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -87,7 +87,7 @@ public class StreamOutputHandler<R> extends StreamCallback {
         super.stopProcessing();
     }
 
-    private Map<String, Object> toMap(Event event) {
+    public Map<String, Object> toMap(Event event) {
         Map<String, Object> map = new LinkedHashMap<>();
         for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
             map.put(definition.getAttributeNameArray()[i], event.getData(i));
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
index 2a3a04c..29a9fb4 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -17,12 +17,12 @@
 
 package org.apache.flink.streaming.siddhi.schema;
 
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
 import org.apache.flink.util.Preconditions;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
index d11f029..cbbbc4d 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.siddhi.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
 import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
-import org.apache.flink.streaming.api.datastream.DataStream;
 
 /**
  * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output 
DataStream
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
index 6e96345..b2df547 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -17,33 +17,7 @@
 
 package org.apache.flink.streaming.siddhi.utils;
 
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.*;
 import org.apache.flink.util.Preconditions;
 
 /**
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
index 84ff453..a01c5bb 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -17,16 +17,16 @@
 
 package org.apache.flink.streaming.siddhi.utils;
 
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.query.api.definition.AbstractDefinition;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
 
 import java.util.HashMap;
 import java.util.List;
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index 9b38539..a0d9935 100755
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -17,36 +17,36 @@
 
 package org.apache.flink.streaming.siddhi;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
-import org.apache.flink.streaming.siddhi.source.Event;
-import org.apache.flink.streaming.siddhi.source.RandomEventSource;
-import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
-import org.apache.flink.streaming.siddhi.source.RandomWordSource;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 /**
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
index 582f1cd..c0ffcfc 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
@@ -17,15 +17,14 @@
 
 package org.apache.flink.streaming.siddhi.extension;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.wso2.siddhi.core.config.SiddhiAppContext;
-import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.core.util.config.ConfigReader;
-import org.wso2.siddhi.query.api.definition.Attribute;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.exception.SiddhiAppCreationException;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
 
 public class CustomPlusFunctionExtension extends FunctionExecutor {
     private Attribute.Type returnType;
@@ -34,7 +33,7 @@ public class CustomPlusFunctionExtension extends 
FunctionExecutor {
      * The initialization method for FunctionExecutor, this method will be 
called before the other methods
      */
     @Override
-    protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader 
configReader, SiddhiAppContext siddhiAppContext) {
+    protected StateFactory init(ExpressionExecutor[] expressionExecutors, 
ConfigReader configReader, SiddhiQueryContext siddhiQueryContext) {
         for (ExpressionExecutor expressionExecutor : 
attributeExpressionExecutors) {
             Attribute.Type attributeType = expressionExecutor.getReturnType();
             if (attributeType == Attribute.Type.DOUBLE) {
@@ -46,6 +45,7 @@ public class CustomPlusFunctionExtension extends 
FunctionExecutor {
                 returnType = Attribute.Type.LONG;
             }
         }
+        return null;
     }
 
     /**
@@ -91,17 +91,18 @@ public class CustomPlusFunctionExtension extends 
FunctionExecutor {
     }
 
     @Override
-    public Attribute.Type getReturnType() {
-        return returnType;
+    protected Object execute(Object[] objects, State state) {
+        return null;
     }
 
     @Override
-    public Map<String, Object> currentState() {
-        return new HashMap<>();
+    protected Object execute(Object o, State state) {
+        return null;
     }
 
     @Override
-    public void restoreState(Map<String, Object> map) {
-
+    public Attribute.Type getReturnType() {
+        return returnType;
     }
+
 }
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
index d271c89..05b5aa7 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.streaming.siddhi.operator;
 
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
 
 import java.util.ArrayList;
 import java.util.List;
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
index db05e9d..74b6766 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.flink.streaming.siddhi.schema;
 
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.siddhi.source.Event;
 import org.junit.Test;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
 
 import static org.junit.Assert.*;
 

Reply via email to