Repository: bahir-flink
Updated Branches:
  refs/heads/master 567792b26 -> 58c55f306


[BAHIR-174] Removed using TypeInfoParser from flink-library-siddhi

Closes #30


Project: http://git-wip-us.apache.org/repos/asf/bahir-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir-flink/commit/58c55f30
Tree: http://git-wip-us.apache.org/repos/asf/bahir-flink/tree/58c55f30
Diff: http://git-wip-us.apache.org/repos/asf/bahir-flink/diff/58c55f30

Branch: refs/heads/master
Commit: 58c55f3064d37dfbffeedd62014e87fba5e6c010
Parents: 567792b
Author: Dominik.Wosinski <b...@wp.pl>
Authored: Mon Jul 30 10:04:46 2018 +0200
Committer: Luciano Resende <lrese...@apache.org>
Committed: Mon Aug 6 12:24:11 2018 -0700

----------------------------------------------------------------------
 .../siddhi/utils/SiddhiTypeFactory.java         | 25 ++++++++------------
 .../siddhi/schema/StreamSchemaTest.java         | 18 +++++++-------
 .../siddhi/utils/SiddhiTypeFactoryTest.java     |  8 ++++---
 3 files changed, 24 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
index 22405c9..84ff453 100644
--- 
a/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
+++ 
b/flink-library-siddhi/src/main/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactory.java
@@ -17,19 +17,17 @@
 
 package org.apache.flink.streaming.siddhi.utils;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.wso2.siddhi.core.SiddhiAppRuntime;
 import org.wso2.siddhi.core.SiddhiManager;
 import org.wso2.siddhi.query.api.definition.AbstractDefinition;
 import org.wso2.siddhi.query.api.definition.Attribute;
 import org.wso2.siddhi.query.api.definition.StreamDefinition;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -87,19 +85,16 @@ public class SiddhiTypeFactory {
 
     public static <T extends Tuple> TypeInformation<T> 
getTupleTypeInformation(AbstractDefinition definition) {
         int tupleSize = definition.getAttributeList().size();
-        StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("Tuple").append(tupleSize);
-        stringBuilder.append("<");
-        List<String> attributeTypes = new ArrayList<>();
-        for (Attribute attribute : definition.getAttributeList()) {
-            attributeTypes.add(getJavaType(attribute.getType()).getName());
-        }
-        stringBuilder.append(StringUtils.join(attributeTypes, ","));
-        stringBuilder.append(">");
+        TypeInformation[] typeInformations = new TypeInformation[tupleSize];
+        List<Attribute> attributes = definition.getAttributeList();
         try {
-            return TypeInfoParser.parse(stringBuilder.toString());
+            for (int i = 0; i < attributes.size() ; i++) {
+                Class<?> clazz = getJavaType(attributes.get(i).getType());
+                typeInformations[i] = TypeInformation.of(clazz);
+            }
+            return Types.TUPLE(typeInformations);
         } catch (IllegalArgumentException ex) {
-            throw new IllegalArgumentException("Unable to parse " + 
stringBuilder.toString(), ex);
+            throw new IllegalArgumentException("Failed to get Type 
Information.", ex);
         }
     }
 
@@ -131,6 +126,6 @@ public class SiddhiTypeFactory {
     }
 
     public static <T> TypeInformation<Tuple2<String, T>> 
getStreamTupleTypeInformation(TypeInformation<T> typeInformation) {
-        return TypeInfoParser.parse("Tuple2<String," + 
typeInformation.getTypeClass().getName() + ">");
+        return Types.TUPLE(Types.STRING, typeInformation);
     }
 }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
index f876b2b..b9dcac7 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/schema/StreamSchemaTest.java
@@ -18,11 +18,11 @@
 package org.apache.flink.streaming.siddhi.schema;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.siddhi.source.Event;
 import org.junit.Test;
 
@@ -41,7 +41,7 @@ public class StreamSchemaTest {
 
     @Test
     public void testStreamSchemaWithTuple() {
-        TypeInformation<Tuple4> typeInfo = 
TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        TypeInformation<Tuple4> typeInfo = Types.TUPLE(Types.INT, Types.LONG, 
Types.STRING, Types.DOUBLE);
         StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", 
"timestamp", "name", "price");
         assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
         assertEquals(4, schema.getFieldIndexes().length);
@@ -50,7 +50,7 @@ public class StreamSchemaTest {
 
     @Test
     public void testStreamSchemaWithPrimitive() {
-        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        TypeInformation<String> typeInfo = Types.STRING;
         StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
         assertEquals(String.class, schema.getTypeInfo().getTypeClass());
         assertEquals(1, schema.getFieldIndexes().length);
@@ -65,30 +65,30 @@ public class StreamSchemaTest {
 
     @Test
     public void testStreamTupleSerializerWithPojo() {
-        TypeInformation<Event> typeInfo = 
TypeExtractor.createTypeInfo(Event.class);
+        TypeInformation<Event> typeInfo = TypeInformation.of(Event.class);
         assertTrue("Type information should be PojoTypeInfo", typeInfo 
instanceof PojoTypeInfo);
         StreamSchema<Event> schema = new StreamSchema<>(typeInfo, "id", 
"timestamp", "name", "price");
         assertEquals(Event.class, schema.getTypeInfo().getTypeClass());
 
-        TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = 
TypeInfoParser.parse("Tuple2<String," + 
schema.getTypeInfo().getTypeClass().getName() + ">");
+        TypeInformation<Tuple2<String, Event>> tuple2TypeInformation = 
Types.TUPLE(Types.STRING, Types.GENERIC(schema.getTypeInfo().getTypeClass()));
         assertEquals("Java Tuple2<String, GenericType<" + 
Event.class.getName() + ">>", tuple2TypeInformation.toString());
     }
 
     @Test
     public void testStreamTupleSerializerWithTuple() {
-        TypeInformation<Tuple4> typeInfo = 
TypeInfoParser.parse("Tuple4<Integer,Long,String,Double>");
+        TypeInformation<Tuple4> typeInfo = Types.GENERIC(Tuple4.class);
         StreamSchema<Tuple4> schema = new StreamSchema<>(typeInfo, "id", 
"timestamp", "name", "price");
         assertEquals(Tuple4.class, schema.getTypeInfo().getTypeClass());
-        TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = 
TypeInfoParser.parse("Tuple2<String," + 
schema.getTypeInfo().getTypeClass().getName() + ">");
+        TypeInformation<Tuple2<String, Tuple4>> tuple2TypeInformation = 
Types.TUPLE(Types.STRING, schema.getTypeInfo());
         assertEquals("Java Tuple2<String, GenericType<" + 
Tuple4.class.getName() + ">>", tuple2TypeInformation.toString());
     }
 
     @Test
     public void testStreamTupleSerializerWithPrimitive() {
-        TypeInformation<String> typeInfo = TypeInfoParser.parse("String");
+        TypeInformation<String> typeInfo = Types.STRING;
         StreamSchema<String> schema = new StreamSchema<>(typeInfo, "words");
         assertEquals(String.class, schema.getTypeInfo().getTypeClass());
-        TypeInformation<Tuple2<String, String>> tuple2TypeInformation = 
TypeInfoParser.parse("Tuple2<String," + 
schema.getTypeInfo().getTypeClass().getName() + ">");
+        TypeInformation<Tuple2<String, String>> tuple2TypeInformation = 
Types.TUPLE(Types.STRING, schema.getTypeInfo());
         assertEquals("Java Tuple2<String, String>", 
tuple2TypeInformation.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/58c55f30/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
index c4a1e8c..6891b90 100644
--- 
a/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
+++ 
b/flink-library-siddhi/src/test/java/org/apache/flink/streaming/siddhi/utils/SiddhiTypeFactoryTest.java
@@ -18,19 +18,21 @@
 package org.apache.flink.streaming.siddhi.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class SiddhiTypeFactoryTest {
     @Test
     public void testTypeInfoParser() {
-        TypeInformation<Tuple3<String, Long, Object>> type1 = 
TypeInfoParser.parse("Tuple3<String,Long,java.lang.Object>");
+        TypeInformation<Tuple3<String, Long, Object>> type1 =
+                Types.TUPLE(Types.STRING, Types.LONG, 
Types.GENERIC(Object.class));
         Assert.assertNotNull(type1);
-        TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 = 
TypeInfoParser.parse("Tuple4<" + String.class.getName() + ", " + 
Long.class.getName() + ", " + java.lang.Object.class.getName() + "," + 
InnerPojo.class.getName() + ">");
+        TypeInformation<Tuple4<String, Long, Object, InnerPojo>> type2 =
+                Types.TUPLE(Types.STRING, Types.LONG, 
Types.GENERIC(Object.class), Types.GENERIC(InnerPojo.class));
         Assert.assertNotNull(type2);
     }
 

Reply via email to