Repository: flink
Updated Branches:
  refs/heads/master 19040a632 -> c34c7e412


http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 88c70c6..e0a66d0 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.table.runtime.batch;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
+import org.apache.flink.formats.avro.generated.Fixed2;
 import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
@@ -34,15 +35,22 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for interoperability with Avro types.
@@ -72,6 +80,14 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
                                                        .setCity("Berlin")
                                                        .setState("Berlin")
                                                        
.setZip("12049").build())
+                       .setTypeBytes(ByteBuffer.allocate(10))
+                       .setTypeDate(LocalDate.parse("2014-03-01"))
+                       .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+                       .setTypeTimeMicros(123456)
+                       
.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+                       .setTypeTimestampMicros(123456L)
+                       
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 
2).unscaledValue().toByteArray()))
+                       .setTypeDecimalFixed(new 
Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
                        .build();
 
        private static final User USER_2 = User.newBuilder()
@@ -88,7 +104,14 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
                        .setTypeMap(new HashMap<>())
                        .setTypeFixed(new Fixed16())
                        .setTypeUnion(null)
-                       .setTypeNested(null)
+                       
.setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01"))
+                       .setTypeBytes(ByteBuffer.allocate(10))
+                       .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+                       .setTypeTimeMicros(123456)
+                       
.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+                       .setTypeTimestampMicros(123456L)
+                       
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 
2).unscaledValue().toByteArray()))
+                       .setTypeDecimalFixed(new 
Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
                        .build();
 
        private static final User USER_3 = User.newBuilder()
@@ -106,26 +129,16 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
                        .setTypeFixed(new Fixed16())
                        .setTypeUnion(null)
                        .setTypeNested(null)
+                       .setTypeBytes(ByteBuffer.allocate(10))
+                       .setTypeDate(LocalDate.parse("2014-03-01"))
+                       .setTypeTimeMillis(LocalTime.parse("12:12:12"))
+                       .setTypeTimeMicros(123456)
+                       
.setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z"))
+                       .setTypeTimestampMicros(123456L)
+                       
.setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 
2).unscaledValue().toByteArray()))
+                       .setTypeDecimalFixed(new 
Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
                        .build();
 
-       private static TypeInformation<Row> rowType = Types.ROW(
-                       Types.GENERIC(Utf8.class),
-                       Types.INT,
-                       Types.GENERIC(Utf8.class),
-                       Types.GENERIC(List.class),
-                       Types.GENERIC(List.class),
-                       Types.GENERIC(Object.class),
-                       Types.DOUBLE,
-                       Types.ENUM(Colors.class),
-                       Types.GENERIC(Fixed16.class),
-                       Types.LONG,
-                       Types.GENERIC(Map.class),
-                       Types.POJO(Address.class),
-                       Types.GENERIC(Object.class),
-                       Types.GENERIC(List.class),
-                       Types.GENERIC(Object.class)
-       );
-
        public AvroTypesITCase(
                        TestExecutionMode executionMode,
                        TableConfigMode tableConfigMode) {
@@ -135,19 +148,29 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
        @Test
        public void testAvroToRow() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, 
AvroKryoSerializerUtils.JodaLocalDateSerializer.class);
+               env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, 
AvroKryoSerializerUtils.JodaLocalTimeSerializer.class);
                BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env, config());
 
                Table t = tEnv.fromDataSet(testData(env));
                Table result = t.select("*");
 
-               List<Row> results = tEnv.toDataSet(result, rowType).collect();
-               String expected = 
"black,null,Whatever,[true],[hello],true,0.0,GREEN," +
-                               "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],42,{},null,null,null,null\n" +
-                               "blue,null,Charlie,[],[],false,1.337,RED," +
-                               "null,1337,{},{\"num\": 42, \"street\": 
\"Bakerstreet\", \"city\": \"Berlin\", " +
-                               "\"state\": \"Berlin\", \"zip\": 
\"12049\"},null,null,null\n" +
-                               
"yellow,null,Terminator,[false],[world],false,0.0,GREEN," +
-                               "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],1,{},null,null,null,null";
+               List<Row> results = tEnv.toDataSet(result, Row.class).collect();
+               String expected =
+                       
"black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10]," +
+                       "2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 
cap=2],[7, -48],0.0,GREEN," +
+                       "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],42,{},null,null,null,123456," +
+                       "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" +
+                       
"blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10],2014-03-01," +
+                       "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, 
-48],1.337,RED,null,1337,{}," +
+                       "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": 
\"Berlin\", \"state\": " +
+                       "\"Berlin\", \"zip\": 
\"12049\"},null,null,123456,12:12:12.000,123456," +
+                       "2014-03-01T12:12:12.321Z,null\n" +
+                       "yellow,null,Terminator,[false],[world],false," +
+                       "java.nio.HeapByteBuffer[pos=0 lim=10 
cap=10],2014-03-01," +
+                       "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, 
-48],0.0,GREEN," +
+                       "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0],1,{},null,null,null,123456," +
+                       "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null";
                TestBaseUtils.compareResultAsText(results, expected);
        }
 
@@ -189,8 +212,8 @@ public class AvroTypesITCase extends 
TableProgramsClusterTestBase {
                Table result = t.select("*");
 
                List<User> results = tEnv.toDataSet(result, 
Types.POJO(User.class)).collect();
-               String expected = USER_1 + "\n" + USER_2 + "\n" + USER_3;
-               TestBaseUtils.compareResultAsText(results, expected);
+               List<User> expected = Arrays.asList(USER_1, USER_2, USER_3);
+               assertEquals(expected, results);
        }
 
        private DataSet<User> testData(ExecutionEnvironment env) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc 
b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index f493d1f..70f8e95 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -28,9 +28,17 @@
      {"name": "type_map", "type": {"type": "map", "values": "long"}},
      {"name": "type_fixed",
                  "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
+                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}]},
      {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
+     {"name": "type_nested", "type": ["null", "Address"]},
+     {"name": "type_bytes", "type": "bytes"},
+     {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
+     {"name": "type_time_millis", "type": {"type": "int", "logicalType": 
"time-millis"}},
+     {"name": "type_time_micros", "type": {"type": "int", "logicalType": 
"time-micros"}},
+     {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": 
"timestamp-millis"}},
+     {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": 
"timestamp-micros"}},
+     {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": 
"decimal", "precision": 4, "scale": 2}},
+     {"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, 
"type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}}
  ]
 },
  {"namespace": "org.apache.flink.formats.avro.generated",
@@ -40,4 +48,55 @@
       {"name": "name", "type": "string"},
       {"name": "optionalField",  "type": ["null", "int"], "default": null}
   ]
+},
+/**
+ * The BackwardsCompatibleAvroSerializer does not support custom Kryo
+ * registrations (which logical types require for Avro 1.8 because Kryo does 
not support Joda-Time).
+ * We introduce a simpler user record for pre-Avro 1.8 test cases. This record 
can be dropped when
+ * we drop support for 1.3 savepoints.
+ */
+{"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "SimpleUser",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "type_long_test", "type": ["long", "null"]},
+     {"name": "type_double_test", "type": "double"},
+     {"name": "type_null_test", "type": ["null"]},
+     {"name": "type_bool_test", "type": ["boolean"]},
+     {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},
+     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}},
+     {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
+     {"name": "type_enum", "type": "Colors"},
+     {"name": "type_map", "type": {"type": "map", "values": "long"}},
+     {"name": "type_fixed", "type": ["null", "Fixed16"]},
+     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
+     {"name": "type_nested", "type": ["null", "Address"]},
+     {"name": "type_bytes", "type": "bytes"}
+ ]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+  "type": "record",
+  "name": "SchemaRecord",
+  "fields": [
+      {"name": "field1", "type": ["null", "long"], "default": null},
+      {"name": "field2", "type": ["null", "string"], "default": null},
+      {"name": "time1", "type": "long"},
+      {"name": "time2", "type": "long"},
+      {"name": "field3", "type": ["null", "double"], "default": null}
+  ]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+  "type": "record",
+  "name": "DifferentSchemaRecord",
+  "fields": [
+      {"name": "otherField1", "type": ["null", "long"], "default": null},
+      {"name": "otherField2", "type": ["null", "string"], "default": null},
+      {"name": "otherTime1", "type": "long"},
+      {"name": "otherField3", "type": ["null", "double"], "default": null},
+      {"name": "otherField4", "type": ["null", "float"], "default": null},
+      {"name": "otherField5", "type": ["null", "int"], "default": null}
+  ]
 }]

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
deleted file mode 100644
index 42eaf5d..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
deleted file mode 100644
index 0599305..0000000
Binary files 
a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
new file mode 100644
index 0000000..23853cf
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
new file mode 100644
index 0000000..1474300
Binary files /dev/null and 
b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot
 differ

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index edc4b01..df52851 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -87,7 +87,6 @@ public class JsonRowDeserializationSchema implements 
DeserializationSchema<Row>
                this(JsonSchemaConverter.convert(jsonSchema));
        }
 
-       @SuppressWarnings("unchecked")
        @Override
        public Row deserialize(byte[] message) throws IOException {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
index 253b491..afc6506 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala
@@ -88,11 +88,12 @@ object TypeStringUtils extends JavaTokenParsers with 
PackratParsers {
     (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } |
     (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME }
 
-  lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ 
"\"" ^^ { s =>
-    StringEscapeUtils.unescapeJava(s)
+  lazy val escapedFieldName: PackratParser[String] = stringLiteral ^^ { s =>
+    val unquoted = s.substring(1, s.length - 1)
+    StringEscapeUtils.unescapeJava(unquoted)
   }
 
-  lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral 
| ident
+  lazy val fieldName: PackratParser[String] = escapedFieldName | ident
 
   lazy val field: PackratParser[(String, TypeInformation[_])] =
     fieldName ~ typeInfo ^^ {

http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
index 29d647c..9ea8be0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.typeutils
 
+import java.util
+
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person}
-import org.junit.Assert.assertEquals
-import org.junit.Test
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Assert, Test}
 
 /**
   * Tests for string-based representation of [[TypeInformation]].
@@ -74,24 +76,18 @@ class TypeStringUtilsTest {
         Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
 
     testReadAndWrite(
-      "ROW(\"he llo\" DECIMAL, world TINYINT)",
-      Types.ROW(
-        Array[String]("he llo", "world"),
-        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
-
-    testReadAndWrite(
-      "ROW(\"he         \\nllo\" DECIMAL, world TINYINT)",
-      Types.ROW(
-        Array[String]("he         \nllo", "world"),
-        Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE)))
-
-    testReadAndWrite(
       "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)",
       TypeExtractor.createTypeInfo(classOf[Person]))
 
     testReadAndWrite(
       "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)",
       TypeExtractor.createTypeInfo(classOf[NonPojo]))
+
+    // test escaping
+    assertTrue(
+      TypeStringUtils.readTypeInfo("ROW(\"he         \\nllo\" DECIMAL, world 
TINYINT)")
+        .asInstanceOf[RowTypeInfo].getFieldNames
+        .sameElements(Array[String]("he         \nllo", "world")))
   }
 
   private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): 
Unit = {

Reply via email to