Repository: flink Updated Branches: refs/heads/master 19040a632 -> c34c7e412
http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java index 88c70c6..e0a66d0 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java @@ -18,14 +18,15 @@ package org.apache.flink.table.runtime.batch; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.formats.avro.generated.Address; import org.apache.flink.formats.avro.generated.Colors; import org.apache.flink.formats.avro.generated.Fixed16; +import org.apache.flink.formats.avro.generated.Fixed2; import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; @@ -34,15 +35,22 @@ import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; + +import static org.junit.Assert.assertEquals; /** * Tests for interoperability with Avro types. @@ -72,6 +80,14 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { .setCity("Berlin") .setState("Berlin") .setZip("12049").build()) + .setTypeBytes(ByteBuffer.allocate(10)) + .setTypeDate(LocalDate.parse("2014-03-01")) + .setTypeTimeMillis(LocalTime.parse("12:12:12")) + .setTypeTimeMicros(123456) + .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")) + .setTypeTimestampMicros(123456L) + .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) .build(); private static final User USER_2 = User.newBuilder() @@ -88,7 +104,14 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { .setTypeMap(new HashMap<>()) .setTypeFixed(new Fixed16()) .setTypeUnion(null) - .setTypeNested(null) + .setTypeNested(null).setTypeDate(LocalDate.parse("2014-03-01")) + .setTypeBytes(ByteBuffer.allocate(10)) + .setTypeTimeMillis(LocalTime.parse("12:12:12")) + .setTypeTimeMicros(123456) + .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")) + .setTypeTimestampMicros(123456L) + .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) .build(); private static final User USER_3 = User.newBuilder() @@ -106,26 +129,16 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { .setTypeFixed(new Fixed16()) .setTypeUnion(null) .setTypeNested(null) + .setTypeBytes(ByteBuffer.allocate(10)) + .setTypeDate(LocalDate.parse("2014-03-01")) + .setTypeTimeMillis(LocalTime.parse("12:12:12")) + .setTypeTimeMicros(123456) + .setTypeTimestampMillis(DateTime.parse("2014-03-01T12:12:12.321Z")) + .setTypeTimestampMicros(123456L) + .setTypeDecimalBytes(ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) + .setTypeDecimalFixed(new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray())) .build(); - private static TypeInformation<Row> rowType = Types.ROW( - Types.GENERIC(Utf8.class), - Types.INT, - Types.GENERIC(Utf8.class), - Types.GENERIC(List.class), - Types.GENERIC(List.class), - Types.GENERIC(Object.class), - Types.DOUBLE, - Types.ENUM(Colors.class), - Types.GENERIC(Fixed16.class), - Types.LONG, - Types.GENERIC(Map.class), - Types.POJO(Address.class), - Types.GENERIC(Object.class), - Types.GENERIC(List.class), - Types.GENERIC(Object.class) - ); - public AvroTypesITCase( TestExecutionMode executionMode, TableConfigMode tableConfigMode) { @@ -135,19 +148,29 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { @Test public void testAvroToRow() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class); + env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env, config()); Table t = tEnv.fromDataSet(testData(env)); Table result = t.select("*"); - List<Row> results = tEnv.toDataSet(result, rowType).collect(); - String expected = "black,null,Whatever,[true],[hello],true,0.0,GREEN," + - "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,null\n" + - "blue,null,Charlie,[],[],false,1.337,RED," + - "null,1337,{},{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", " + - "\"state\": \"Berlin\", \"zip\": \"12049\"},null,null,null\n" + - "yellow,null,Terminator,[false],[world],false,0.0,GREEN," + - "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,null"; + List<Row> results = tEnv.toDataSet(result, Row.class).collect(); + String expected = + "black,null,Whatever,[true],[hello],true,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]," + + "2014-03-01,java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," + + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],42,{},null,null,null,123456," + + "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null\n" + + "blue,null,Charlie,[],[],false,java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," + + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],1.337,RED,null,1337,{}," + + "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": " + + "\"Berlin\", \"zip\": \"12049\"},null,null,123456,12:12:12.000,123456," + + "2014-03-01T12:12:12.321Z,null\n" + + "yellow,null,Terminator,[false],[world],false," + + "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10],2014-03-01," + + "java.nio.HeapByteBuffer[pos=0 lim=2 cap=2],[7, -48],0.0,GREEN," + + "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],1,{},null,null,null,123456," + + "12:12:12.000,123456,2014-03-01T12:12:12.321Z,null"; TestBaseUtils.compareResultAsText(results, expected); } @@ -189,8 +212,8 @@ public class AvroTypesITCase extends TableProgramsClusterTestBase { Table result = t.select("*"); List<User> results = tEnv.toDataSet(result, Types.POJO(User.class)).collect(); - String expected = USER_1 + "\n" + USER_2 + "\n" + USER_3; - TestBaseUtils.compareResultAsText(results, expected); + List<User> expected = Arrays.asList(USER_1, USER_2, USER_3); + assertEquals(expected, results); } private DataSet<User> testData(ExecutionEnvironment env) { http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc index f493d1f..70f8e95 100644 --- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc +++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc @@ -28,9 +28,17 @@ {"name": "type_map", "type": {"type": "map", "values": "long"}}, {"name": "type_fixed", "size": 16, - "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}]}, {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, - {"name": "type_nested", "type": ["null", "Address"]} + {"name": "type_nested", "type": ["null", "Address"]}, + {"name": "type_bytes", "type": "bytes"}, + {"name": "type_date", "type": {"type": "int", "logicalType": "date"}}, + {"name": "type_time_millis", "type": {"type": "int", "logicalType": "time-millis"}}, + {"name": "type_time_micros", "type": {"type": "int", "logicalType": "time-micros"}}, + {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}}, + {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}}, + {"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}}, + {"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, "type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}} ] }, {"namespace": "org.apache.flink.formats.avro.generated", @@ -40,4 +48,55 @@ {"name": "name", "type": "string"}, {"name": "optionalField", "type": ["null", "int"], "default": null} ] +}, +/** + * The BackwardsCompatibleAvroSerializer does not support custom Kryo + * registrations (which logical types require for Avro 1.8 because Kryo does not support Joda-Time). + * We introduce a simpler user record for pre-Avro 1.8 test cases. This record can be dropped when + * we drop support for 1.3 savepoints. + */ +{"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "SimpleUser", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "type_long_test", "type": ["long", "null"]}, + {"name": "type_double_test", "type": "double"}, + {"name": "type_null_test", "type": ["null"]}, + {"name": "type_bool_test", "type": ["boolean"]}, + {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, + {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, + {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, + {"name": "type_enum", "type": "Colors"}, + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", "type": ["null", "Fixed16"]}, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, + {"name": "type_nested", "type": ["null", "Address"]}, + {"name": "type_bytes", "type": "bytes"} + ] +}, + {"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "SchemaRecord", + "fields": [ + {"name": "field1", "type": ["null", "long"], "default": null}, + {"name": "field2", "type": ["null", "string"], "default": null}, + {"name": "time1", "type": "long"}, + {"name": "time2", "type": "long"}, + {"name": "field3", "type": ["null", "double"], "default": null} + ] +}, + {"namespace": "org.apache.flink.formats.avro.generated", + "type": "record", + "name": "DifferentSchemaRecord", + "fields": [ + {"name": "otherField1", "type": ["null", "long"], "default": null}, + {"name": "otherField2", "type": ["null", "string"], "default": null}, + {"name": "otherTime1", "type": "long"}, + {"name": "otherField3", "type": ["null", "double"], "default": null}, + {"name": "otherField4", "type": ["null", "float"], "default": null}, + {"name": "otherField5", "type": ["null", "int"], "default": null} + ] }] http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data deleted file mode 100644 index 42eaf5d..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot deleted file mode 100644 index 0599305..0000000 Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data new file mode 100644 index 0000000..23853cf Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serialized-data differ http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot ---------------------------------------------------------------------- diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot new file mode 100644 index 0000000..1474300 Binary files /dev/null and b/flink-formats/flink-avro/src/test/resources/flink-1.6-avro-type-serializer-snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index edc4b01..df52851 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -87,7 +87,6 @@ public class JsonRowDeserializationSchema implements DeserializationSchema<Row> this(JsonSchemaConverter.convert(jsonSchema)); } - @SuppressWarnings("unchecked") @Override public Row deserialize(byte[] message) throws IOException { try { http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala index 253b491..afc6506 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeStringUtils.scala @@ -88,11 +88,12 @@ object TypeStringUtils extends JavaTokenParsers with PackratParsers { (TIMESTAMP | SQL_TIMESTAMP) ^^ { e => Types.SQL_TIMESTAMP } | (TIME | SQL_TIME) ^^ { e => Types.SQL_TIME } - lazy val escapedFieldName: PackratParser[String] = "\"" ~> stringLiteral <~ "\"" ^^ { s => - StringEscapeUtils.unescapeJava(s) + lazy val escapedFieldName: PackratParser[String] = stringLiteral ^^ { s => + val unquoted = s.substring(1, s.length - 1) + StringEscapeUtils.unescapeJava(unquoted) } - lazy val fieldName: PackratParser[String] = escapedFieldName | stringLiteral | ident + lazy val fieldName: PackratParser[String] = escapedFieldName | ident lazy val field: PackratParser[(String, TypeInformation[_])] = fieldName ~ typeInfo ^^ { http://git-wip-us.apache.org/repos/asf/flink/blob/c34c7e41/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala index 29d647c..9ea8be0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeStringUtilsTest.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.typeutils +import java.util + import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.{RowTypeInfo, TypeExtractor} import org.apache.flink.table.api.Types import org.apache.flink.table.runtime.utils.CommonTestData.{NonPojo, Person} -import org.junit.Assert.assertEquals -import org.junit.Test +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Assert, Test} /** * Tests for string-based representation of [[TypeInformation]]. @@ -74,24 +76,18 @@ class TypeStringUtilsTest { Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) testReadAndWrite( - "ROW(\"he llo\" DECIMAL, world TINYINT)", - Types.ROW( - Array[String]("he llo", "world"), - Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) - - testReadAndWrite( - "ROW(\"he \\nllo\" DECIMAL, world TINYINT)", - Types.ROW( - Array[String]("he \nllo", "world"), - Array[TypeInformation[_]](Types.DECIMAL, Types.BYTE))) - - testReadAndWrite( "POJO(org.apache.flink.table.runtime.utils.CommonTestData$Person)", TypeExtractor.createTypeInfo(classOf[Person])) testReadAndWrite( "ANY(org.apache.flink.table.runtime.utils.CommonTestData$NonPojo)", TypeExtractor.createTypeInfo(classOf[NonPojo])) + + // test escaping + assertTrue( + TypeStringUtils.readTypeInfo("ROW(\"he \\nllo\" DECIMAL, world TINYINT)") + .asInstanceOf[RowTypeInfo].getFieldNames + .sameElements(Array[String]("he \nllo", "world"))) } private def testReadAndWrite(expected: String, tpe: TypeInformation[_]): Unit = {