http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java new file mode 100644 index 0000000..c851631 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSchema.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Generic Field-based Stream Schema + * + * @param <T> Stream element type + */ +public class StreamSchema<T> implements Serializable { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamSchema.class); + private final TypeInformation<T> typeInfo; + private final int[] fieldIndexes; + private final String[] fieldNames; + private TypeInformation[] fieldTypes; + private final StreamSerializer<T> streamSerializer; + private TypeSerializer<T> typeSerializer; + + public StreamSchema(TypeInformation<T> typeInfo, String... fieldNames) { + Preconditions.checkNotNull(fieldNames, "Field name is required"); + this.typeInfo = typeInfo; + this.fieldNames = fieldNames; + this.fieldIndexes = getFieldIndexes(typeInfo, fieldNames); + this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames); + this.streamSerializer = new StreamSerializer<>(this); + } + + public StreamSchema(TypeInformation<T> typeInfo, int[] fieldIndexes, String[] fieldNames) { + this.typeInfo = typeInfo; + this.fieldIndexes = fieldIndexes; + this.fieldNames = fieldNames; + this.fieldTypes = getFieldTypes(typeInfo, fieldIndexes, fieldNames); + this.streamSerializer = new StreamSerializer<>(this); + } + + public boolean isAtomicType() { + return typeInfo instanceof AtomicType; + } + + public boolean isTupleType() { + return typeInfo instanceof TupleTypeInfo; + } + + public boolean isPojoType() { + return typeInfo instanceof PojoTypeInfo; + } + + public boolean isCaseClassType() { + return typeInfo instanceof CaseClassTypeInfo; + } + + public boolean isCompositeType() { + return typeInfo instanceof CompositeType; + } + + private <E> int[] getFieldIndexes(TypeInformation<E> typeInfo, String... fieldNames) { + int[] result; + if (isAtomicType()) { + result = new int[]{0}; + } else if (isTupleType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + result[i] = i; + } + } else if (isPojoType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + int index = ((PojoTypeInfo) typeInfo).getFieldIndex(fieldNames[i]); + if (index < 0) { + throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo); + } + result[i] = index; + } + } else if (isCaseClassType()) { + result = new int[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + int index = ((CaseClassTypeInfo) typeInfo).getFieldIndex(fieldNames[i]); + if (index < 0) { + throw new IllegalArgumentException(fieldNames[i] + " is not a field of type " + typeInfo); + } + result[i] = index; + } + } else { + throw new IllegalArgumentException("Failed to get field index from " + typeInfo); + } + return result; + } + + + private <E> TypeInformation[] getFieldTypes(TypeInformation<E> typeInfo, int[] fieldIndexes, String[] fieldNames) { + TypeInformation[] fieldTypes; + if (isCompositeType()) { + CompositeType cType = (CompositeType) typeInfo; + if (fieldNames.length != cType.getArity()) { + // throw new IllegalArgumentException("Arity of type (" + cType.getFieldNames().length+ ") " + + // "not equal to number of field names " + fieldNames.length + "."); + LOGGER.warn("Arity of type (" + cType.getFieldNames().length + ") " + + "not equal to number of field names " + fieldNames.length + "."); + } + fieldTypes = new TypeInformation[fieldIndexes.length]; + for (int i = 0; i < fieldIndexes.length; i++) { + fieldTypes[i] = cType.getTypeAt(fieldIndexes[i]); + } + } else if (isAtomicType()) { + if (fieldIndexes.length != 1 || fieldIndexes[0] != 0) { + throw new IllegalArgumentException( + "Non-composite input type may have only a single field and its index must be 0."); + } + fieldTypes = new TypeInformation[]{typeInfo}; + } else { + throw new IllegalArgumentException( + "Illegal input type info" + ); + } + return fieldTypes; + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + public int[] getFieldIndexes() { + return fieldIndexes; + } + + public String[] getFieldNames() { + return fieldNames; + } + + public TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + public StreamSerializer<T> getStreamSerializer() { + return streamSerializer; + } + + public TypeSerializer<T> getTypeSerializer() { + return typeSerializer; + } + + public void setTypeSerializer(TypeSerializer<T> typeSerializer) { + this.typeSerializer = typeSerializer; + } +}
http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java new file mode 100644 index 0000000..760afbe --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/schema/StreamSerializer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.lang.reflect.Field; + +/** + * Stream Serialization and Field Extraction Methods. + */ +public class StreamSerializer<T> implements Serializable { + private final StreamSchema<T> schema; + + public StreamSerializer(StreamSchema<T> schema) { + this.schema = schema; + } + + public Object[] getRow(T input) { + Preconditions.checkArgument(input.getClass() == schema.getTypeInfo().getTypeClass(), + "Invalid input type: " + input + ", expected: " + schema.getTypeInfo()); + + Object[] data; + if (schema.isAtomicType()) { + data = new Object[]{input}; + } else if (schema.isTupleType()) { + Tuple tuple = (Tuple) input; + data = new Object[schema.getFieldIndexes().length]; + for (int i = 0; i < schema.getFieldIndexes().length; i++) { + data[i] = tuple.getField(schema.getFieldIndexes()[i]); + } + } else if (schema.isPojoType() || schema.isCaseClassType()) { + data = new Object[schema.getFieldIndexes().length]; + for (int i = 0; i < schema.getFieldNames().length; i++) { + data[i] = getFieldValue(schema.getFieldNames()[i], input); + } + } else { + throw new IllegalArgumentException("Failed to get field values from " + schema.getTypeInfo()); + } + return data; + } + + private Object getFieldValue(String fieldName, T input) { + // TODO: Cache Field Accessor + Field field = TypeExtractor.getDeclaredField(schema.getTypeInfo().getTypeClass(), fieldName); + if (field == null) { + throw new IllegalArgumentException(fieldName + " is not found in " + schema.getTypeInfo()); + } + if (!field.isAccessible()) { + field.setAccessible(true); + } + try { + return field.get(input); + } catch (IllegalAccessException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java new file mode 100644 index 0000000..20ca535 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiStreamFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.streaming.siddhi.operator.SiddhiStreamOperator; +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Convert SiddhiCEPExecutionPlan to SiddhiCEP Operator and build output DataStream + */ +public class SiddhiStreamFactory { + @SuppressWarnings("unchecked") + public static <OUT> DataStream<OUT> createDataStream(SiddhiOperatorContext context, DataStream<Tuple2<String, Object>> namedStream) { + return namedStream.transform(context.getName(), context.getOutputStreamType(), new SiddhiStreamOperator(context)); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java new file mode 100644 index 0000000..88c15eb --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.util.Preconditions; + +/** + * Siddhi Tuple Utility methods + */ +public class SiddhiTupleFactory { + /** + * Convert object array to type of Tuple{N} where N is between 0 to 25. + * + * @throws IllegalArgumentException if rows's length > 25 + */ + public static <T extends Tuple> T newTuple(Object[] row) { + Preconditions.checkNotNull(row, "Tuple row is null"); + switch (row.length) { + case 0: + return setTupleValue(new Tuple0(), row); + case 1: + return setTupleValue(new Tuple1(), row); + case 2: + return setTupleValue(new Tuple2(), row); + case 3: + return setTupleValue(new Tuple3(), row); + case 4: + return setTupleValue(new Tuple4(), row); + case 5: + return setTupleValue(new Tuple5(), row); + case 6: + return setTupleValue(new Tuple6(), row); + case 7: + return setTupleValue(new Tuple7(), row); + case 8: + return setTupleValue(new Tuple8(), row); + case 9: + return setTupleValue(new Tuple9(), row); + case 10: + return setTupleValue(new Tuple10(), row); + case 11: + return setTupleValue(new Tuple11(), row); + case 12: + return setTupleValue(new Tuple12(), row); + case 13: + return setTupleValue(new Tuple13(), row); + case 14: + return setTupleValue(new Tuple14(), row); + case 15: + return setTupleValue(new Tuple15(), row); + case 16: + return setTupleValue(new Tuple16(), row); + case 17: + return setTupleValue(new Tuple17(), row); + case 18: + return setTupleValue(new Tuple18(), row); + case 19: + return setTupleValue(new Tuple19(), row); + case 20: + return setTupleValue(new Tuple20(), row); + case 21: + return setTupleValue(new Tuple21(), row); + case 22: + return setTupleValue(new Tuple22(), row); + case 23: + return setTupleValue(new Tuple23(), row); + case 24: + return setTupleValue(new Tuple24(), row); + case 25: + return setTupleValue(new Tuple25(), row); + default: + throw new IllegalArgumentException("Too long row: " + row.length + ", unable to convert to Tuple"); + } + } + + @SuppressWarnings("unchecked") + public static <T extends Tuple> T setTupleValue(Tuple tuple, Object[] row) { + if (row.length != tuple.getArity()) { + throw new IllegalArgumentException("Row length" + row.length + " is not equal with tuple's arity: " + tuple.getArity()); + } + for (int i = 0; i < row.length; i++) { + tuple.setField(row[i], i); + } + return (T) tuple; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java new file mode 100644 index 0000000..22405c9 --- /dev/null +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.utils; + +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.wso2.siddhi.core.SiddhiAppRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Siddhi Type Utils for conversion between Java Type, Siddhi Field Type, Stream Definition, and Flink Type Information. + */ +public class SiddhiTypeFactory { + private static final Map<Class<?>, Attribute.Type> JAVA_TO_SIDDHI_TYPE = new HashMap<>(); + private static final Map<Attribute.Type, Class<?>> SIDDHI_TO_JAVA_TYPE = new HashMap<>(); + + static { + registerType(String.class, Attribute.Type.STRING); + registerType(Integer.class, Attribute.Type.INT); + registerType(int.class, Attribute.Type.INT); + registerType(Long.class, Attribute.Type.LONG); + registerType(long.class, Attribute.Type.LONG); + registerType(Float.class, Attribute.Type.FLOAT); + registerType(float.class, Attribute.Type.FLOAT); + registerType(Double.class, Attribute.Type.DOUBLE); + registerType(double.class, Attribute.Type.DOUBLE); + registerType(Boolean.class, Attribute.Type.BOOL); + registerType(boolean.class, Attribute.Type.BOOL); + } + + public static void registerType(Class<?> javaType, Attribute.Type siddhiType) { + if (JAVA_TO_SIDDHI_TYPE.containsKey(javaType)) { + throw new IllegalArgumentException("Java type: " + javaType + " or siddhi type: " + siddhiType + " were already registered"); + } + JAVA_TO_SIDDHI_TYPE.put(javaType, siddhiType); + SIDDHI_TO_JAVA_TYPE.put(siddhiType, javaType); + } + + public static AbstractDefinition getStreamDefinition(String executionPlan, String streamId) { + SiddhiManager siddhiManager = null; + SiddhiAppRuntime runtime = null; + try { + siddhiManager = new SiddhiManager(); + runtime = siddhiManager.createSiddhiAppRuntime(executionPlan); + Map<String, StreamDefinition> definitionMap = runtime.getStreamDefinitionMap(); + if (definitionMap.containsKey(streamId)) { + return definitionMap.get(streamId); + } else { + throw new IllegalArgumentException("Unknown stream id" + streamId); + } + } finally { + if (runtime != null) { + runtime.shutdown(); + } + if (siddhiManager != null) { + siddhiManager.shutdown(); + } + } + } + + public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) { + int tupleSize = definition.getAttributeList().size(); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("Tuple").append(tupleSize); + stringBuilder.append("<"); + List<String> attributeTypes = new ArrayList<>(); + for (Attribute attribute : definition.getAttributeList()) { + attributeTypes.add(getJavaType(attribute.getType()).getName()); + } + stringBuilder.append(StringUtils.join(attributeTypes, ",")); + stringBuilder.append(">"); + try { + return TypeInfoParser.parse(stringBuilder.toString()); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex); + } + } + + public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(String executionPlan, String streamId) { + return getTupleTypeInformation(getStreamDefinition(executionPlan, streamId)); + } + + @SuppressWarnings("unchecked") + private static final TypeInformation<?> MAP_TYPE_INFORMATION = TypeExtractor.createTypeInfo(new HashMap<String, Object>().getClass()); + + public static TypeInformation<Map<String, Object>> getMapTypeInformation() { + return (TypeInformation<Map<String, Object>>) MAP_TYPE_INFORMATION; + } + + public static <F> Attribute.Type getAttributeType(TypeInformation<F> fieldType) { + if (JAVA_TO_SIDDHI_TYPE.containsKey(fieldType.getTypeClass())) { + return JAVA_TO_SIDDHI_TYPE.get(fieldType.getTypeClass()); + } else { + return Attribute.Type.OBJECT + ; + } + } + + public static Class<?> getJavaType(Attribute.Type attributeType) { + if (!SIDDHI_TO_JAVA_TYPE.containsKey(attributeType)) { + throw new IllegalArgumentException("Unable to get java type for siddhi attribute type: " + attributeType); + } + return SIDDHI_TO_JAVA_TYPE.get(attributeType); + } + + public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) { + return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">"); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java new file mode 100755 index 0000000..5c16c71 --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/SiddhiCEPITCase.java @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi; + +import java.io.IOException; +import java.io.Serializable; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.siddhi.exception.UndefinedStreamException; +import org.apache.flink.streaming.siddhi.extension.CustomPlusFunctionExtension; +import org.apache.flink.streaming.siddhi.source.Event; +import org.apache.flink.streaming.siddhi.source.RandomEventSource; +import org.apache.flink.streaming.siddhi.source.RandomTupleSource; +import org.apache.flink.streaming.siddhi.source.RandomWordSource; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertEquals; + +/** + * Flink-siddhi library integration test cases + */ +public class SiddhiCEPITCase extends StreamingMultipleProgramsTestBase implements Serializable { + + @Rule + public transient TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testSimpleWriteAndRead() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.fromElements( + Event.of(1, "start", 1.0), + Event.of(2, "middle", 2.0), + Event.of(3, "end", 3.0), + Event.of(4, "start", 4.0), + Event.of(5, "middle", 5.0), + Event.of(6, "end", 6.0) + ); + + String path = tempFolder.newFile().toURI().toString(); + input.transform("transformer", TypeInformation.of(Event.class), new StreamMap<>(new MapFunction<Event, Event>() { + @Override + public Event map(Event event) throws Exception { + return event; + } + })).writeAsText(path); + env.execute(); + Assert.assertEquals(6, getLineCount(path)); + } + + @Test + public void testSimplePojoStreamAndReturnPojo() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.fromElements( + Event.of(1, "start", 1.0), + Event.of(2, "middle", 2.0), + Event.of(3, "end", 3.0), + Event.of(4, "start", 4.0), + Event.of(5, "middle", 5.0), + Event.of(6, "end", 6.0) + ); + + DataStream<Event> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price") + .cql("from inputStream insert into outputStream") + .returns("outputStream", Event.class); + String path = tempFolder.newFile().toURI().toString(); + output.print(); + env.execute(); + } + + @Test + public void testUnboundedPojoSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.addSource(new RandomEventSource(5)); + + DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + DataStream<Integer> following = output.map(new MapFunction<Tuple4<Long, Integer, String, Double>, Integer>() { + @Override + public Integer map(Tuple4<Long, Integer, String, Double> value) throws Exception { + return value.f1; + } + }); + String resultPath = tempFolder.newFile().toURI().toString(); + following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedTupleSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Tuple4<Integer, String, Double, Long>> input = env + .addSource(new RandomTupleSource(5).closeDelay(1500)).keyBy(1); + + DataStream<Tuple4<Long, Integer, String, Double>> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedPrimitiveTypeSourceAndReturnTuple() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<String> input = env.addSource(new RandomWordSource(5).closeDelay(1500)); + + DataStream<Tuple1<String>> output = SiddhiCEP + .define("wordStream", input, "words") + .cql("from wordStream select words insert into outputStream") + .returns("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test(expected = InvalidTypesException.class) + public void testUnboundedPojoSourceButReturnInvalidTupleType() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.addSource(new RandomEventSource(5).closeDelay(1500)); + + DataStream<Tuple5<Long, Integer, String, Double, Long>> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream"); + + DataStream<Long> following = output.map(new MapFunction<Tuple5<Long, Integer, String, Double, Long>, Long>() { + @Override + public Long map(Tuple5<Long, Integer, String, Double, Long> value) throws Exception { + return value.f0; + } + }); + + String resultPath = tempFolder.newFile().toURI().toString(); + following.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + env.execute(); + } + + @Test + public void testUnboundedPojoStreamAndReturnMap() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); + DataStream<Event> input = env.addSource(new RandomEventSource(5)); + + DataStream<Map<String, Object>> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, price insert into outputStream") + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testUnboundedPojoStreamAndReturnPojo() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.addSource(new RandomEventSource(5)); + input.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() { + @Override + public long extractAscendingTimestamp(Event element) { + return element.getTimestamp(); + } + }); + + DataStream<Event> output = SiddhiCEP + .define("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, price insert into outputStream") + .returns("outputStream", Event.class); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + + @Test + public void testMultipleUnboundedPojoStreamSimpleUnion() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(2), "input1"); + DataStream<Event> input2 = env.addSource(new RandomEventSource(2), "input2"); + DataStream<Event> input3 = env.addSource(new RandomEventSource(2), "input2"); + DataStream<Event> output = SiddhiCEP + .define("inputStream1", input1, "id", "name", "price", "timestamp") + .union("inputStream2", input2, "id", "name", "price", "timestamp") + .union("inputStream3", input3, "id", "name", "price", "timestamp") + .cql( + "from inputStream1 select timestamp, id, name, price insert into outputStream;" + + "from inputStream2 select timestamp, id, name, price insert into outputStream;" + + "from inputStream3 select timestamp, id, name, price insert into outputStream;" + ) + .returns("outputStream", Event.class); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(6, getLineCount(resultPath)); + } + + /** + * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Joins</a> + */ + @Test + public void testMultipleUnboundedPojoStreamUnionAndJoinWithWindow() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1"); + DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2"); + + DataStream<? extends Map> output = SiddhiCEP + .define("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp") + .union("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp") + .cql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returnAsMap("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + /** + * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Patterns</a> + */ + @Test + public void testUnboundedPojoStreamSimplePatternMatch() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1"); + DataStream<Event> input2 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input2"); + + DataStream<Map<String, Object>> output = SiddhiCEP + .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp") + .union("inputStream2", input2.keyBy("name"), "id", "name", "price", "timestamp") + .cql( + "from every s1 = inputStream1[id == 2] " + + " -> s2 = inputStream2[id == 3] " + + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 " + + "insert into outputStream" + ) + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(1, getLineCount(resultPath)); + compareResultsByLinesInMemory("{id_1=2, name_1=test_event, id_2=3, name_2=test_event}", resultPath); + } + + /** + * @see <a href="https://docs.wso2.com/display/CEP300/Joins">https://docs.wso2.com/display/CEP300/Sequences</a> + */ + @Test + public void testUnboundedPojoStreamSimpleSequences() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5).closeDelay(1500), "input1"); + DataStream<Map<String, Object>> output = SiddhiCEP + .define("inputStream1", input1.keyBy("name"), "id", "name", "price", "timestamp") + .union("inputStream2", input1.keyBy("name"), "id", "name", "price", "timestamp") + .cql( + "from every s1 = inputStream1[id == 2]+ , " + + "s2 = inputStream2[id == 3]? " + + "within 1000 second " + + "select s1[0].name as n1, s2.name as n2 " + + "insert into outputStream" + ) + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(1, getLineCount(resultPath)); + } + + private static int getLineCount(String resPath) throws IOException { + List<String> result = new LinkedList<>(); + readAllResultLines(result, resPath); + return result.size(); + } + + @Test + public void testCustomizeSiddhiFunctionExtension() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input = env.addSource(new RandomEventSource(5)); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class); + + DataStream<Map<String, Object>> output = cep + .from("inputStream", input, "id", "name", "price", "timestamp") + .cql("from inputStream select timestamp, id, name, custom:plus(price,price) as doubled_price insert into outputStream") + .returnAsMap("outputStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test + public void testRegisterStreamAndExtensionWithSiddhiCEPEnvironment() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1"); + DataStream<Event> input2 = env.addSource(new RandomEventSource(5), "input2"); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerExtension("custom:plus", CustomPlusFunctionExtension.class); + + cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp"); + cep.registerStream("inputStream2", input2.keyBy("id"), "id", "name", "price", "timestamp"); + + DataStream<Tuple4<Long, String, Double, Double>> output = cep + .from("inputStream1").union("inputStream2") + .cql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returns("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + assertEquals(5, getLineCount(resultPath)); + } + + @Test(expected = UndefinedStreamException.class) + public void testTriggerUndefinedStreamException() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Event> input1 = env.addSource(new RandomEventSource(5), "input1"); + + SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); + cep.registerStream("inputStream1", input1.keyBy("id"), "id", "name", "price", "timestamp"); + + DataStream<Map<String, Object>> output = cep + .from("inputStream1").union("inputStream2") + .cql( + "from inputStream1#window.length(5) as s1 " + + "join inputStream2#window.time(500) as s2 " + + "on s1.id == s2.id " + + "select s1.timestamp as t, s1.name as n, s1.price as p1, s2.price as p2 " + + "insert into JoinStream;" + ) + .returnAsMap("JoinStream"); + + String resultPath = tempFolder.newFile().toURI().toString(); + output.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java new file mode 100644 index 0000000..582f1cd --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/extension/CustomPlusFunctionExtension.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.extension; + +import java.util.HashMap; +import java.util.Map; + +import org.wso2.siddhi.core.config.SiddhiAppContext; +import org.wso2.siddhi.core.exception.SiddhiAppCreationException; +import org.wso2.siddhi.core.executor.ExpressionExecutor; +import org.wso2.siddhi.core.executor.function.FunctionExecutor; +import org.wso2.siddhi.core.util.config.ConfigReader; +import org.wso2.siddhi.query.api.definition.Attribute; + +public class CustomPlusFunctionExtension extends FunctionExecutor { + private Attribute.Type returnType; + + /** + * The initialization method for FunctionExecutor, this method will be called before the other methods + */ + @Override + protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) { + for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) { + Attribute.Type attributeType = expressionExecutor.getReturnType(); + if (attributeType == Attribute.Type.DOUBLE) { + returnType = attributeType; + + } else if ((attributeType == Attribute.Type.STRING) || (attributeType == Attribute.Type.BOOL)) { + throw new SiddhiAppCreationException("Plus cannot have parameters with types String or Bool"); + } else { + returnType = Attribute.Type.LONG; + } + } + } + + /** + * The main execution method which will be called upon event arrival + * when there are more then one function parameter + * + * @param data the runtime values of function parameters + * @return the function result + */ + @Override + protected Object execute(Object[] data) { + if (returnType == Attribute.Type.DOUBLE) { + double total = 0; + for (Object aObj : data) { + total += Double.parseDouble(String.valueOf(aObj)); + } + + return total; + } else { + long total = 0; + for (Object aObj : data) { + total += Long.parseLong(String.valueOf(aObj)); + } + return total; + } + } + + /** + * The main execution method which will be called upon event arrival + * when there are zero or one function parameter + * + * @param data null if the function parameter count is zero or + * runtime data value of the function parameter + * @return the function result + */ + @Override + protected Object execute(Object data) { + if (returnType == Attribute.Type.DOUBLE) { + return Double.parseDouble(String.valueOf(data)); + } else { + return Long.parseLong(String.valueOf(data)); + } + } + + @Override + public Attribute.Type getReturnType() { + return returnType; + } + + @Override + public Map<String, Object> currentState() { + return new HashMap<>(); + } + + @Override + public void restoreState(Map<String, Object> map) { + + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java new file mode 100644 index 0000000..d271c89 --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/operator/SiddhiSyntaxTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.operator; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.wso2.siddhi.core.SiddhiAppRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.event.Event; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.core.stream.output.StreamCallback; + +import java.util.ArrayList; +import java.util.List; + +public class SiddhiSyntaxTest { + + private SiddhiManager siddhiManager; + + @Before + public void setUp() { + siddhiManager = new SiddhiManager(); + } + + @After + public void after() { + siddhiManager = new SiddhiManager(); + } + + @Test + public void testSimplePlan() throws InterruptedException { + SiddhiAppRuntime runtime = siddhiManager.createSiddhiAppRuntime( + "define stream inStream (name string, value double);" + + "from inStream insert into outStream"); + runtime.start(); + + final List<Object[]> received = new ArrayList<>(3); + InputHandler inputHandler = runtime.getInputHandler("inStream"); + Assert.assertNotNull(inputHandler); + + try { + runtime.getInputHandler("unknownStream"); + Assert.fail("Should throw exception for getting input handler for unknown streamId."); + } catch (Exception ex) { + // Expected exception for getting input handler for illegal streamId. + } + + runtime.addCallback("outStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + for (Event event : events) { + received.add(event.getData()); + } + } + }); + + inputHandler.send(new Object[]{"a", 1.1}); + inputHandler.send(new Object[]{"b", 1.2}); + inputHandler.send(new Object[]{"c", 1.3}); + Thread.sleep(100); + Assert.assertEquals(3, received.size()); + Assert.assertArrayEquals(received.get(0), new Object[]{"a", 1.1}); + Assert.assertArrayEquals(received.get(1), new Object[]{"b", 1.2}); + Assert.assertArrayEquals(received.get(2), new Object[]{"c", 1.3}); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java new file mode 100644 index 0000000..db05e9d --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/SiddhiExecutionPlanSchemaTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.siddhi.source.Event; +import org.junit.Test; +import org.wso2.siddhi.query.api.definition.Attribute; +import org.wso2.siddhi.query.api.definition.StreamDefinition; + +import static org.junit.Assert.*; + +public class SiddhiExecutionPlanSchemaTest { + @Test + public void testStreamSchemaWithPojo() { + TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + + SiddhiStreamSchema<Event> schema = new SiddhiStreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(4, schema.getFieldIndexes().length); + + StreamDefinition streamDefinition = schema.getStreamDefinition("test_stream"); + assertArrayEquals(new String[]{"id", "timestamp", "name", "price"}, streamDefinition.getAttributeNameArray()); + + assertEquals(Attribute.Type.INT, streamDefinition.getAttributeType("id")); + assertEquals(Attribute.Type.LONG, streamDefinition.getAttributeType("timestamp")); + assertEquals(Attribute.Type.STRING, streamDefinition.getAttributeType("name")); + assertEquals(Attribute.Type.DOUBLE, streamDefinition.getAttributeType("price")); + + assertEquals("define stream test_stream (id int,timestamp long,name string,price double);", schema.getStreamDefinitionExpression("test_stream")); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java new file mode 100644 index 0000000..f876b2b --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.apache.flink.streaming.siddhi.source.Event; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class StreamSchemaTest { + @Test + public void testStreamSchemaWithPojo() { + TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(4, schema.getFieldIndexes().length); + assertEquals(Event.class, schema.getTypeInfo().getTypeClass()); + } + + @Test + public void testStreamSchemaWithTuple() { + TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>"); + StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + assertEquals(4, schema.getFieldIndexes().length); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + } + + @Test + public void testStreamSchemaWithPrimitive() { + TypeInformation<String> typeInfo = TypeInfoParser.parse("String"); + StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words"); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + assertEquals(1, schema.getFieldIndexes().length); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + } + + @Test(expected = IllegalArgumentException.class) + public void testStreamSchemaWithPojoAndUnknownField() { + TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class); + new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price", "unknown"); + } + + @Test + public void testStreamTupleSerializerWithPojo() { + TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class); + assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); + StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Event.class, schema.getTypeInfo().getTypeClass()); + + TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString()); + } + + @Test + public void testStreamTupleSerializerWithTuple() { + TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>"); + StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); + assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); + TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString()); + } + + @Test + public void testStreamTupleSerializerWithPrimitive() { + TypeInformation<String> typeInfo = TypeInfoParser.parse("String"); + StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words"); + assertEquals(String.class, schema.getTypeInfo().getTypeClass()); + TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString()); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java new file mode 100644 index 0000000..190208c --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSerializerTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.schema; + +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.siddhi.source.Event; +import org.junit.Assert; +import org.junit.Test; + +public class StreamSerializerTest { + private static final long CURRENT = System.currentTimeMillis(); + + @Test + public void testSimplePojoRead() { + Event event = new Event(); + event.setId(1); + event.setName("test"); + event.setPrice(56.7); + event.setTimestamp(CURRENT); + + StreamSchema<Event> schema = new StreamSchema<>(TypeExtractor.createTypeInfo(Event.class), "id", "name", "price", "timestamp"); + StreamSerializer<Event> reader = new StreamSerializer<>(schema); + Assert.assertArrayEquals(new Object[]{1, "test", 56.7, CURRENT}, reader.getRow(event)); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java new file mode 100644 index 0000000..357e1d2 --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/Event.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.source; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.util.Objects; + +public class Event { + private long timestamp; + private String name; + private double price; + private int id; + + public double getPrice() { + return price; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "Event(" + id + ", " + name + ", " + price + ", " + timestamp + ")"; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Event) { + Event other = (Event) obj; + + return name.equals(other.name) && price == other.price && id == other.id && timestamp == other.timestamp; + } else { + return false; + } + } + + public static Event of(int id, String name, double price) { + Event event = new Event(); + event.setId(id); + event.setName(name); + event.setPrice(price); + event.setTimestamp(System.currentTimeMillis()); + return event; + } + + public static Event of(int id, String name, double price, long timestamp) { + Event event = new Event(); + event.setId(id); + event.setName(name); + event.setPrice(price); + event.setTimestamp(timestamp); + return event; + } + + @Override + public int hashCode() { + return Objects.hash(name, price, id); + } + + public static TypeSerializer<Event> createTypeSerializer() { + TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class); + + return typeInformation.createSerializer(new ExecutionConfig()); + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public void setPrice(double price) { + this.price = price; + } + + public void setId(int id) { + this.id = id; + } + + public void setName(String name) { + this.name = name; + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java new file mode 100644 index 0000000..bb95fdd --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomEventSource.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.source; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomEventSource implements SourceFunction<Event> { + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private volatile long closeDelayTimestamp = 1000; + + public RandomEventSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomEventSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomEventSource(int count) { + this(count, System.currentTimeMillis()); + } + + public RandomEventSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext<Event> ctx) throws Exception { + while (isRunning) { + ctx.collect(Event.of(number, "test_event", random.nextDouble(), initialTimestamp + 1000 * number)); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java new file mode 100644 index 0000000..35121f7 --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomTupleSource.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.source; + +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomTupleSource implements SourceFunction<Tuple4<Integer, String, Double, Long>> { + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private long closeDelayTimestamp; + + public RandomTupleSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomTupleSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomTupleSource(int count) { + this(count, System.currentTimeMillis()); + } + + + public RandomTupleSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext<Tuple4<Integer, String, Double, Long>> ctx) throws Exception { + while (isRunning) { + ctx.collect(Tuple4.of(number, "test_tuple", random.nextDouble(), initialTimestamp + 1000 * number)); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(this.closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java new file mode 100644 index 0000000..19d904f --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/source/RandomWordSource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.source; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.Random; + +public class RandomWordSource implements SourceFunction<String> { + private static final String[] WORDS = new String[] { + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles,", + "And by opposing end them?--To die,--to sleep,--", + "No more; and by a sleep to say we end", + "The heartache, and the thousand natural shocks", + "That flesh is heir to,--'tis a consummation", + "Devoutly to be wish'd. To die,--to sleep;--", + "To sleep! perchance to dream:--ay, there's the rub;", + "For in that sleep of death what dreams may come,", + "When we have shuffled off this mortal coil,", + "Must give us pause: there's the respect", + "That makes calamity of so long life;", + "For who would bear the whips and scorns of time,", + "The oppressor's wrong, the proud man's contumely,", + "The pangs of despis'd love, the law's delay,", + "The insolence of office, and the spurns", + "That patient merit of the unworthy takes,", + "When he himself might his quietus make", + "With a bare bodkin? who would these fardels bear,", + "To grunt and sweat under a weary life,", + "But that the dread of something after death,--", + "The undiscover'd country, from whose bourn", + "No traveller returns,--puzzles the will,", + "And makes us rather bear those ills we have", + "Than fly to others that we know not of?", + "Thus conscience does make cowards of us all;", + "And thus the native hue of resolution", + "Is sicklied o'er with the pale cast of thought;", + "And enterprises of great pith and moment,", + "With this regard, their currents turn awry,", + "And lose the name of action.--Soft you now!", + "The fair Ophelia!--Nymph, in thy orisons", + "Be all my sins remember'd." + }; + + private final int count; + private final Random random; + private final long initialTimestamp; + + private volatile boolean isRunning = true; + private volatile int number = 0; + private long closeDelayTimestamp; + + public RandomWordSource(int count, long initialTimestamp) { + this.count = count; + this.random = new Random(); + this.initialTimestamp = initialTimestamp; + } + + public RandomWordSource() { + this(Integer.MAX_VALUE, System.currentTimeMillis()); + } + + public RandomWordSource(int count) { + this(count, System.currentTimeMillis()); + } + + + public RandomWordSource closeDelay(long delayTimestamp) { + this.closeDelayTimestamp = delayTimestamp; + return this; + } + + @Override + public void run(SourceContext<String> ctx) throws Exception { + while (isRunning) { + ctx.collectWithTimestamp(WORDS[random.nextInt(WORDS.length)], initialTimestamp + 1000 * number); + number++; + if (number >= this.count) { + cancel(); + } + } + } + + @Override + public void cancel() { + this.isRunning = false; + try { + Thread.sleep(this.closeDelayTimestamp); + } catch (InterruptedException e) { + // ignored + } + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java new file mode 100644 index 0000000..4753a3f --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTupleFactoryTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.utils; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class SiddhiTupleFactoryTest { + @Test + public void testConvertObjectArrayToTuple() { + Object[] row = new Object[]{1, "message", 1234567L, true, new Object()}; + Tuple5 tuple5 = SiddhiTupleFactory.newTuple(row); + assertEquals(5, tuple5.getArity()); + assertArrayEquals(row, new Object[]{ + tuple5.f0, + tuple5.f1, + tuple5.f2, + tuple5.f3, + tuple5.f4 + }); + } + + @Test(expected = IllegalArgumentException.class) + public void testConvertTooLongObjectArrayToTuple() { + Object[] row = new Object[26]; + SiddhiTupleFactory.newTuple(row); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java new file mode 100644 index 0000000..c4a1e8c --- /dev/null +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.siddhi.utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.typeutils.TypeInfoParser; +import org.junit.Assert; +import org.junit.Test; + +public class SiddhiTypeFactoryTest { + @Test + public void testTypeInfoParser() { + TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>"); + Assert.assertNotNull(type1); + TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">"); + Assert.assertNotNull(type2); + } + + public static class InnerPojo { + } + + @Test + public void testBuildTypeInformationForSiddhiStream() { + String query = "define stream inputStream (timestamp long, name string, value double);" + + "from inputStream select name, value insert into outputStream;"; + TypeInformation<Tuple3<Long, String, Double>> inputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "inputStream"); + TypeInformation<Tuple2<String, Double>> outputStreamType = SiddhiTypeFactory.getTupleTypeInformation(query, "outputStream"); + + Assert.assertNotNull(inputStreamType); + Assert.assertNotNull(outputStreamType); + } +} http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/resources/log4j-test.properties b/flink-library-siddhi/src/test/resources/log4j-test.properties new file mode 100755 index 0000000..5b1e4ed --- /dev/null +++ b/flink-library-siddhi/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=INFO, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/50f3f125/flink-library-siddhi/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/resources/logback-test.xml b/flink-library-siddhi/src/test/resources/logback-test.xml new file mode 100644 index 0000000..b7a5793 --- /dev/null +++ b/flink-library-siddhi/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/> + <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/> + <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/> + <logger name="org.apache.flink.configuration.Configuration" level="OFF"/> +</configuration>