This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new f299361 [BAHIR-280] Update siddhi to 5.1.19
f299361 is described below
commit f2993613ad62e816bdc1dcc013dbb57904f6b64e
Author: Joao Boto <[email protected]>
AuthorDate: Tue Sep 14 16:43:17 2021 +0200
[BAHIR-280] Update siddhi to 5.1.19
---
flink-library-siddhi/pom.xml | 6 ++--
.../apache/flink/streaming/siddhi/SiddhiCEP.java | 6 ++--
.../flink/streaming/siddhi/SiddhiStream.java | 4 +--
.../siddhi/operator/AbstractSiddhiOperator.java | 39 ++++++++++------------
.../siddhi/operator/SiddhiOperatorContext.java | 4 +--
.../siddhi/operator/SiddhiStreamOperator.java | 10 +++---
.../siddhi/operator/StreamInMemOutputHandler.java | 18 +++++-----
.../siddhi/operator/StreamOutputHandler.java | 10 +++---
.../siddhi/schema/SiddhiStreamSchema.java | 4 +--
.../siddhi/utils/SiddhiStreamFactory.java | 2 +-
.../streaming/siddhi/utils/SiddhiTupleFactory.java | 28 +---------------
.../streaming/siddhi/utils/SiddhiTypeFactory.java | 10 +++---
.../flink/streaming/siddhi/SiddhiCEPITCase.java | 24 ++++++-------
.../extension/CustomPlusFunctionExtension.java | 33 +++++++++---------
.../siddhi/operator/SiddhiSyntaxTest.java | 10 +++---
.../schema/SiddhiExecutionPlanSchemaTest.java | 4 +--
16 files changed, 91 insertions(+), 121 deletions(-)
diff --git a/flink-library-siddhi/pom.xml b/flink-library-siddhi/pom.xml
index 32b541d..44e1ec2 100644
--- a/flink-library-siddhi/pom.xml
+++ b/flink-library-siddhi/pom.xml
@@ -33,13 +33,13 @@
<packaging>jar</packaging>
<properties>
- <siddhi.version>4.3.14</siddhi.version>
+ <siddhi.version>5.1.19</siddhi.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
- <groupId>org.wso2.siddhi</groupId>
+ <groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>${siddhi.version}</version>
<exclusions>
@@ -50,7 +50,7 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.wso2.siddhi</groupId>
+ <groupId>io.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>${siddhi.version}</version>
<exclusions>
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
index a63dbf6..0e67fd8 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiCEP.java
@@ -18,12 +18,12 @@
package org.apache.flink.streaming.siddhi;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.siddhi.exception.DuplicatedStreamException;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
+import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.apache.flink.util.Preconditions;
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
index ca61a0a..147c548 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/SiddhiStream.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.streaming.siddhi.utils.SiddhiStreamFactory;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
index 79df6ac..b796a88 100755
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/AbstractSiddhiOperator.java
@@ -17,22 +17,18 @@
package org.apache.flink.streaming.siddhi.operator;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.query.api.SiddhiApp;
+import io.siddhi.query.api.annotation.Annotation;
+import io.siddhi.query.api.annotation.Element;
+import io.siddhi.query.api.definition.AbstractDefinition;
+import io.siddhi.query.compiler.SiddhiCompiler;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
@@ -48,16 +44,15 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.query.api.SiddhiApp;
-import org.wso2.siddhi.query.api.annotation.Annotation;
-import org.wso2.siddhi.query.api.annotation.Element;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
/**
* <h1>Siddhi Runtime Operator</h1>
@@ -66,13 +61,13 @@ import org.wso2.siddhi.query.compiler.SiddhiCompiler;
*
* <ul>
* <li>
- * Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according
predefined execution plan and integrate with Flink Stream Operator lifecycle.
+ * Create Siddhi {@link io.siddhi.core.SiddhiAppRuntime} according predefined
execution plan and integrate with Flink Stream Operator lifecycle.
* </li>
* <li>
* Connect Flink DataStreams with predefined Siddhi Stream according to unique
streamId
* </li>
* <li>
- * Convert native {@link StreamRecord} to Siddhi {@link
org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send
to Siddhi Runtime.
+ * Convert native {@link StreamRecord} to Siddhi {@link
io.siddhi.core.event.Event} according to {@link StreamSchema}, and send to
Siddhi Runtime.
* </li>
* <li>
* Listen output callback event and convert as expected output type according
to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then
output as typed DataStream.
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
index f760938..0e90781 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiOperatorContext.java
@@ -17,14 +17,14 @@
package org.apache.flink.streaming.siddhi.operator;
+import io.siddhi.core.SiddhiManager;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.streaming.siddhi.schema.StreamSchema;
-import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.util.Preconditions;
-import org.wso2.siddhi.core.SiddhiManager;
import java.io.Serializable;
import java.util.ArrayList;
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
index 0ce719c..45e4b32 100755
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/SiddhiStreamOperator.java
@@ -17,19 +17,19 @@
package org.apache.flink.streaming.siddhi.operator;
-import java.io.IOException;
-import java.util.PriorityQueue;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.siddhi.schema.StreamSchema;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.schema.StreamSchema;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
/**
* Wrap input event in generic type of <code>IN</code> as Tuple2<String,IN>
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
index 49c27c2..5962d98 100755
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamInMemOutputHandler.java
@@ -17,22 +17,22 @@
package org.apache.flink.streaming.siddhi.operator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.AbstractDefinition;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
/**
* Siddhi Stream output callback handler and conver siddhi {@link Event} to
required output type,
@@ -86,7 +86,7 @@ public class StreamInMemOutputHandler<R> extends
StreamCallback {
this.collectedRecords.clear();
}
- private Map<String, Object> toMap(Event event) {
+ public Map<String, Object> toMap(Event event) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
map.put(definition.getAttributeNameArray()[i], event.getData(i));
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
index 1c1096c..18bcca8 100755
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/operator/StreamOutputHandler.java
@@ -17,19 +17,19 @@
package org.apache.flink.streaming.siddhi.operator;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.output.StreamCallback;
+import io.siddhi.query.api.definition.AbstractDefinition;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.siddhi.utils.SiddhiTupleFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -87,7 +87,7 @@ public class StreamOutputHandler<R> extends StreamCallback {
super.stopProcessing();
}
- private Map<String, Object> toMap(Event event) {
+ public Map<String, Object> toMap(Event event) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
map.put(definition.getAttributeNameArray()[i], event.getData(i));
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
index 2a3a04c..29a9fb4 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/SiddhiStreamSchema.java
@@ -17,12 +17,12 @@
package org.apache.flink.streaming.siddhi.schema;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.util.Preconditions;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
import java.util.ArrayList;
import java.util.List;
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
index d11f029..cbbbc4d 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.siddhi.utils;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator;
-import org.apache.flink.streaming.api.datastream.DataStream;
/**
* Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output
DataStream
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
index 6e96345..b2df547 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java
@@ -17,33 +17,7 @@
package org.apache.flink.streaming.siddhi.utils;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.api.java.tuple.Tuple9;
-import org.apache.flink.api.java.tuple.Tuple10;
-import org.apache.flink.api.java.tuple.Tuple11;
-import org.apache.flink.api.java.tuple.Tuple12;
-import org.apache.flink.api.java.tuple.Tuple13;
-import org.apache.flink.api.java.tuple.Tuple14;
-import org.apache.flink.api.java.tuple.Tuple15;
-import org.apache.flink.api.java.tuple.Tuple16;
-import org.apache.flink.api.java.tuple.Tuple17;
-import org.apache.flink.api.java.tuple.Tuple18;
-import org.apache.flink.api.java.tuple.Tuple19;
-import org.apache.flink.api.java.tuple.Tuple20;
-import org.apache.flink.api.java.tuple.Tuple21;
-import org.apache.flink.api.java.tuple.Tuple22;
-import org.apache.flink.api.java.tuple.Tuple23;
-import org.apache.flink.api.java.tuple.Tuple24;
-import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.*;
import org.apache.flink.util.Preconditions;
/**
diff --git
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
index 84ff453..a01c5bb 100644
---
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
+++
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -17,16 +17,16 @@
package org.apache.flink.streaming.siddhi.utils;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.query.api.definition.AbstractDefinition;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
import java.util.HashMap;
import java.util.List;
diff --git
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
index 9b38539..a0d9935 100755
---
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
+++
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java
@@ -17,36 +17,36 @@
package org.apache.flink.streaming.siddhi;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
-import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
-import org.apache.flink.streaming.siddhi.source.Event;
-import org.apache.flink.streaming.siddhi.source.RandomEventSource;
-import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
-import org.apache.flink.streaming.siddhi.source.RandomWordSource;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException;
+import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension;
+import org.apache.flink.streaming.siddhi.source.Event;
+import org.apache.flink.streaming.siddhi.source.RandomEventSource;
+import org.apache.flink.streaming.siddhi.source.RandomTupleSource;
+import org.apache.flink.streaming.siddhi.source.RandomWordSource;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
import static org.junit.Assert.assertEquals;
/**
diff --git
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
index 582f1cd..c0ffcfc 100644
---
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
+++
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java
@@ -17,15 +17,14 @@
package org.apache.flink.streaming.siddhi.extension;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.wso2.siddhi.core.config.SiddhiAppContext;
-import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
-import org.wso2.siddhi.core.executor.ExpressionExecutor;
-import org.wso2.siddhi.core.executor.function.FunctionExecutor;
-import org.wso2.siddhi.core.util.config.ConfigReader;
-import org.wso2.siddhi.query.api.definition.Attribute;
+import io.siddhi.core.config.SiddhiQueryContext;
+import io.siddhi.core.exception.SiddhiAppCreationException;
+import io.siddhi.core.executor.ExpressionExecutor;
+import io.siddhi.core.executor.function.FunctionExecutor;
+import io.siddhi.core.util.config.ConfigReader;
+import io.siddhi.core.util.snapshot.state.State;
+import io.siddhi.core.util.snapshot.state.StateFactory;
+import io.siddhi.query.api.definition.Attribute;
public class CustomPlusFunctionExtension extends FunctionExecutor {
private Attribute.Type returnType;
@@ -34,7 +33,7 @@ public class CustomPlusFunctionExtension extends
FunctionExecutor {
* The initialization method for FunctionExecutor, this method will be
called before the other methods
*/
@Override
- protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader
configReader, SiddhiAppContext siddhiAppContext) {
+ protected StateFactory init(ExpressionExecutor[] expressionExecutors,
ConfigReader configReader, SiddhiQueryContext siddhiQueryContext) {
for (ExpressionExecutor expressionExecutor :
attributeExpressionExecutors) {
Attribute.Type attributeType = expressionExecutor.getReturnType();
if (attributeType == Attribute.Type.DOUBLE) {
@@ -46,6 +45,7 @@ public class CustomPlusFunctionExtension extends
FunctionExecutor {
returnType = Attribute.Type.LONG;
}
}
+ return null;
}
/**
@@ -91,17 +91,18 @@ public class CustomPlusFunctionExtension extends
FunctionExecutor {
}
@Override
- public Attribute.Type getReturnType() {
- return returnType;
+ protected Object execute(Object[] objects, State state) {
+ return null;
}
@Override
- public Map<String, Object> currentState() {
- return new HashMap<>();
+ protected Object execute(Object o, State state) {
+ return null;
}
@Override
- public void restoreState(Map<String, Object> map) {
-
+ public Attribute.Type getReturnType() {
+ return returnType;
}
+
}
diff --git
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
index d271c89..05b5aa7 100644
---
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
+++
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java
@@ -17,15 +17,15 @@
package org.apache.flink.streaming.siddhi.operator;
+import io.siddhi.core.SiddhiAppRuntime;
+import io.siddhi.core.SiddhiManager;
+import io.siddhi.core.event.Event;
+import io.siddhi.core.stream.input.InputHandler;
+import io.siddhi.core.stream.output.StreamCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.wso2.siddhi.core.SiddhiAppRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
import java.util.ArrayList;
import java.util.List;
diff --git
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
index db05e9d..74b6766 100644
---
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
+++
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java
@@ -17,13 +17,13 @@
package org.apache.flink.streaming.siddhi.schema;
+import io.siddhi.query.api.definition.Attribute;
+import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.siddhi.source.Event;
import org.junit.Test;
-import org.wso2.siddhi.query.api.definition.Attribute;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
import static org.junit.Assert.*;