Repository: bahir-flink Updated Branches: refs/heads/master 567792b26 -> 58c55f306
[BAHIR-174] Removed using TypeInfoParser from flink-library-siddhi Closes #30 Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/58c55f30 Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/58c55f30 Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/58c55f30 Branch: refs/heads/master Commit: 58c55f3064d37dfbffeedd62014e87fba5e6c010 Parents: 567792b Author: Dominik.Wosinski <b...@wp.pl> Authored: Mon Jul 30 10:04:46 2018 +0200 Committer: Luciano Resende <lrese...@apache.org> Committed: Mon Aug 6 12:24:11 2018 -0700 ---------------------------------------------------------------------- .../siddhi/utils/SiddhiTypeFactory.java | 25 ++++++++------------ .../siddhi/schema/StreamSchemaTest.java | 18 +++++++------- .../siddhi/utils/SiddhiTypeFactoryTest.java | 8 ++++--- 3 files changed, 24 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java index 22405c9..84ff453 100644 --- a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java +++ b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java @@ -17,19 +17,17 @@ package org.apache.flink.streaming.siddhi.utils; -import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.wso2.siddhi.core.SiddhiAppRuntime; import org.wso2.siddhi.core.SiddhiManager; import org.wso2.siddhi.query.api.definition.AbstractDefinition; import org.wso2.siddhi.query.api.definition.Attribute; import org.wso2.siddhi.query.api.definition.StreamDefinition; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,19 +85,16 @@ public class SiddhiTypeFactory { public static <T extends Tuple> TypeInformation<T> getTupleTypeInformation(AbstractDefinition definition) { int tupleSize = definition.getAttributeList().size(); - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("Tuple").append(tupleSize); - stringBuilder.append("<"); - List<String> attributeTypes = new ArrayList<>(); - for (Attribute attribute : definition.getAttributeList()) { - attributeTypes.add(getJavaType(attribute.getType()).getName()); - } - stringBuilder.append(StringUtils.join(attributeTypes, ",")); - stringBuilder.append(">"); + TypeInformation[] typeInformations = new TypeInformation[tupleSize]; + List<Attribute> attributes = definition.getAttributeList(); try { - return TypeInfoParser.parse(stringBuilder.toString()); + for (int i = 0; i < attributes.size() ; i++) { + Class<?> clazz = getJavaType(attributes.get(i).getType()); + typeInformations[i] = TypeInformation.of(clazz); + } + return Types.TUPLE(typeInformations); } catch (IllegalArgumentException ex) { - throw new IllegalArgumentException("Unable to parse " + stringBuilder.toString(), ex); + throw new IllegalArgumentException("Failed to get Type Information.", ex); } } @@ -131,6 +126,6 @@ public class SiddhiTypeFactory { } public static <T> TypeInformation<Tuple2<String, T>> getStreamTupleTypeInformation(TypeInformation<T> typeInformation) { - return TypeInfoParser.parse("Tuple2<String," + typeInformation.getTypeClass().getName() + ">"); + return Types.TUPLE(Types.STRING, typeInformation); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java index f876b2b..b9dcac7 100644 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java @@ -18,11 +18,11 @@ package org.apache.flink.streaming.siddhi.schema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.siddhi.source.Event; import org.junit.Test; @@ -41,7 +41,7 @@ public class StreamSchemaTest { @Test public void testStreamSchemaWithTuple() { - TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>"); + TypeInformation<Tuple4> typeInfo = Types.TUPLE(Types.INT, Types.LONG, Types.STRING, Types.DOUBLE); StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); assertEquals(4, schema.getFieldIndexes().length); @@ -50,7 +50,7 @@ public class StreamSchemaTest { @Test public void testStreamSchemaWithPrimitive() { - TypeInformation<String> typeInfo = TypeInfoParser.parse("String"); + TypeInformation<String> typeInfo = Types.STRING; StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words"); assertEquals(String.class, schema.getTypeInfo().getTypeClass()); assertEquals(1, schema.getFieldIndexes().length); @@ -65,30 +65,30 @@ public class StreamSchemaTest { @Test public void testStreamTupleSerializerWithPojo() { - TypeInformation<Event> typeInfo = TypeExtractor.createTypeInfo(Event.class); + TypeInformation<Event> typeInfo = TypeInformation.of(Event.class); assertTrue("Type information should be PojoTypeInfo", typeInfo instanceof PojoTypeInfo); StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); assertEquals(Event.class, schema.getTypeInfo().getTypeClass()); - TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = Types.TUPLE(Types.STRING, Types.GENERIC(schema.getTypeInfo().getTypeClass())); assertEquals("Java Tuple2<String, GenericType<" + Event.class.getName() + ">>", tuple2TypeInformation.toString()); } @Test public void testStreamTupleSerializerWithTuple() { - TypeInformation<Tuple4> typeInfo = TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>"); + TypeInformation<Tuple4> typeInfo = Types.GENERIC(Tuple4.class); StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", "timestamp", "name", "price"); assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass()); - TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = Types.TUPLE(Types.STRING, schema.getTypeInfo()); assertEquals("Java Tuple2<String, GenericType<" + Tuple4.class.getName() + ">>", tuple2TypeInformation.toString()); } @Test public void testStreamTupleSerializerWithPrimitive() { - TypeInformation<String> typeInfo = TypeInfoParser.parse("String"); + TypeInformation<String> typeInfo = Types.STRING; StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words"); assertEquals(String.class, schema.getTypeInfo().getTypeClass()); - TypeInformation<Tuple2<String, String>> tuple2TypeInformation = TypeInfoParser.parse("Tuple2<String," + schema.getTypeInfo().getTypeClass().getName() + ">"); + TypeInformation<Tuple2<String, String>> tuple2TypeInformation = Types.TUPLE(Types.STRING, schema.getTypeInfo()); assertEquals("Java Tuple2<String, String>", tuple2TypeInformation.toString()); } } http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java index c4a1e8c..6891b90 100644 --- a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java +++ b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java @@ -18,19 +18,21 @@ package org.apache.flink.streaming.siddhi.utils; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.junit.Assert; import org.junit.Test; public class SiddhiTypeFactoryTest { @Test public void testTypeInfoParser() { - TypeInformation<Tuple3<String, Long, Object>> type1 = TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>"); + TypeInformation<Tuple3<String, Long, Object>> type1 = + Types.TUPLE(Types.STRING, Types.LONG, Types.GENERIC(Object.class)); Assert.assertNotNull(type1); - TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + InnerPojo.class.getName() + ">"); + TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = + Types.TUPLE(Types.STRING, Types.LONG, Types.GENERIC(Object.class), Types.GENERIC(InnerPojo.class)); Assert.assertNotNull(type2); }