This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a5aaa0e [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format a5aaa0e is described below commit a5aaa0e605eb4966b5d73c987ea4d9d0bc852c6d Author: Leonard Xu <xbjt...@gmail.com> AuthorDate: Wed Mar 24 17:10:23 2021 +0800 [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format This closes #15356 --- flink-formats/flink-csv/pom.xml | 6 ++ .../formats/csv/CsvRowDeserializationSchema.java | 11 +++- .../flink/formats/csv/CsvRowSchemaConverter.java | 3 +- .../formats/csv/CsvRowSerializationSchema.java | 23 ++++---- .../flink/formats/csv/CsvToRowDataConverters.java | 20 ++++--- .../flink/formats/csv/RowDataToCsvConverters.java | 53 +++++++++-------- .../formats/csv/CsvRowDataSerDeSchemaTest.java | 66 +++++++++++++++++++--- .../csv/CsvRowDeSerializationSchemaTest.java | 49 ++++++++++++++-- flink-formats/flink-format-common/pom.xml | 44 +++++++++++++++ .../apache/flink/formats/common}/TimeFormats.java | 22 +++++--- .../flink/formats/common}/TimestampFormat.java | 2 +- flink-formats/flink-json/pom.xml | 6 ++ .../flink/formats/json/JsonFormatFactory.java | 1 + .../org/apache/flink/formats/json/JsonOptions.java | 1 + .../json/JsonRowDataDeserializationSchema.java | 1 + .../json/JsonRowDataSerializationSchema.java | 1 + .../formats/json/JsonRowDeserializationSchema.java | 4 +- .../formats/json/JsonRowSerializationSchema.java | 4 +- .../formats/json/JsonToRowDataConverters.java | 11 ++-- .../formats/json/RowDataToJsonConverters.java | 11 ++-- .../json/canal/CanalJsonDecodingFormat.java | 2 +- .../json/canal/CanalJsonDeserializationSchema.java | 2 +- .../formats/json/canal/CanalJsonFormatFactory.java | 2 +- .../json/canal/CanalJsonSerializationSchema.java | 2 +- .../json/debezium/DebeziumJsonDecodingFormat.java | 2 +- .../DebeziumJsonDeserializationSchema.java | 2 +- .../json/debezium/DebeziumJsonFormatFactory.java | 2 +- .../debezium/DebeziumJsonSerializationSchema.java | 2 +- .../maxwell/MaxwellJsonDeserializationSchema.java | 2 +- .../json/maxwell/MaxwellJsonFormatFactory.java | 2 +- .../maxwell/MaxwellJsonSerializationSchema.java | 2 +- .../flink/formats/json/JsonFormatFactoryTest.java | 1 + .../formats/json/JsonRowDataSerDeSchemaTest.java | 1 + .../json/canal/CanalJsonFormatFactoryTest.java | 2 +- .../json/canal/CanalJsonSerDeSchemaTest.java | 2 +- .../debezium/DebeziumJsonFormatFactoryTest.java | 2 +- .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 2 +- .../json/maxwell/MaxwellJsonFormatFactoryTest.java | 2 +- .../json/maxwell/MaxwellJsonSerDerTest.java | 2 +- flink-formats/pom.xml | 3 +- 40 files changed, 276 insertions(+), 102 deletions(-) diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml index f458f15..3db3076 100644 --- a/flink-formats/flink-csv/pom.xml +++ b/flink-formats/flink-csv/pom.xml @@ -36,6 +36,12 @@ under the License. <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-format-common</artifactId> + <version>${project.version}</version> + </dependency> + <!-- core dependencies --> <dependency> diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java index 3c9d18f..535767a 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java @@ -42,9 +42,14 @@ import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.Objects; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; + /** * Deserialization schema from CSV to Flink types. * @@ -316,7 +321,11 @@ public final class CsvRowDeserializationSchema implements DeserializationSchema< } else if (info.equals(Types.LOCAL_TIME)) { return (node) -> Time.valueOf(node.asText()).toLocalTime(); } else if (info.equals(Types.LOCAL_DATE_TIME)) { - return (node) -> Timestamp.valueOf(node.asText()).toLocalDateTime(); + return (node) -> LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_FORMAT); + } else if (info.equals(Types.INSTANT)) { + return (node) -> + LocalDateTime.parse(node.asText(), SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT) + .toInstant(ZoneOffset.UTC); } else if (info instanceof RowTypeInfo) { final RowTypeInfo rowTypeInfo = (RowTypeInfo) info; return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, false); diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java index 9cca85c..352e74a 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java @@ -105,7 +105,8 @@ public final class CsvRowSchemaConverter { Types.SQL_TIMESTAMP, Types.LOCAL_DATE, Types.LOCAL_TIME, - Types.LOCAL_DATE_TIME)); + Types.LOCAL_DATE_TIME, + Types.INSTANT)); /** * Types that can be converted to ColumnType.STRING. diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java index 3ab7fa5..60dc48f 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java @@ -40,14 +40,14 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv import java.io.Serializable; import java.math.BigDecimal; import java.math.BigInteger; +import java.time.Instant; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; +import java.time.ZoneId; import java.util.Arrays; import java.util.Objects; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; -import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; /** * Serialization schema that serializes an object of Flink types into a CSV bytes. @@ -209,14 +209,6 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row> // -------------------------------------------------------------------------------------------- - private static final DateTimeFormatter DATE_TIME_FORMATTER = - new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .append(ISO_LOCAL_DATE) - .appendLiteral(' ') - .append(ISO_LOCAL_TIME) - .toFormatter(); - private interface RuntimeConverter extends Serializable { JsonNode convert(CsvMapper csvMapper, ContainerNode<?> container, Object obj); } @@ -319,7 +311,12 @@ public final class CsvRowSerializationSchema implements SerializationSchema<Row> return (csvMapper, container, obj) -> container.textNode(obj.toString()); } else if (info.equals(Types.LOCAL_DATE_TIME)) { return (csvMapper, container, obj) -> - container.textNode(DATE_TIME_FORMATTER.format((LocalDateTime) obj)); + container.textNode(SQL_TIMESTAMP_FORMAT.format((LocalDateTime) obj)); + } else if (info.equals(Types.INSTANT)) { + return (csvMapper, container, obj) -> + container.textNode( + LocalDateTime.ofInstant((Instant) obj, ZoneId.of("UTC")) + .format(SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)); } else if (info instanceof RowTypeInfo) { return createRowRuntimeConverter((RowTypeInfo) info, false); } else if (info instanceof BasicArrayTypeInfo) { diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java index 82ed7a6..ce19755 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java @@ -40,8 +40,12 @@ import java.io.Serializable; import java.lang.reflect.Array; import java.math.BigDecimal; import java.sql.Date; -import java.sql.Timestamp; +import java.time.LocalDateTime; import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; /** Tool class used to convert from CSV-format {@link JsonNode} to {@link RowData}. * */ @Internal @@ -141,9 +145,11 @@ public class CsvToRowDataConverters implements Serializable { return this::convertToDate; case TIME_WITHOUT_TIME_ZONE: return convertToTime((TimeType) type); - case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - return this::convertToTimestamp; + return jsonNode -> convertToTimestamp(jsonNode, SQL_TIMESTAMP_FORMAT); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return jsonNode -> + convertToTimestamp(jsonNode, SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT); case FLOAT: return this::convertToFloat; case DOUBLE: @@ -245,10 +251,10 @@ public class CsvToRowDataConverters implements Serializable { }; } - private TimestampData convertToTimestamp(JsonNode jsonNode) { - // csv currently is using Timestamp.valueOf() to parse timestamp string - Timestamp timestamp = Timestamp.valueOf(jsonNode.asText()); - return TimestampData.fromTimestamp(timestamp); + private TimestampData convertToTimestamp( + JsonNode jsonNode, DateTimeFormatter dateTimeFormatter) { + return TimestampData.fromLocalDateTime( + LocalDateTime.parse(jsonNode.asText(), dateTimeFormatter)); } private StringData convertToString(JsonNode jsonNode) { diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java index f8ee46d..69f56d42 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java @@ -40,11 +40,12 @@ import java.io.Serializable; import java.time.LocalDate; import java.time.LocalTime; import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; import java.util.Arrays; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; /** Tool class used to convert from {@link RowData} to CSV-format {@link JsonNode}. * */ @Internal @@ -132,15 +133,21 @@ public class RowDataToCsvConverters implements Serializable { return (csvMapper, container, row, pos) -> convertDate(row.getInt(pos), container); case TIME_WITHOUT_TIME_ZONE: return (csvMapper, container, row, pos) -> convertTime(row.getInt(pos), container); - case TIMESTAMP_WITH_TIME_ZONE: - final int zonedTimestampPrecision = - ((LocalZonedTimestampType) fieldType).getPrecision(); - return (csvMapper, container, row, pos) -> - convertTimestamp(row.getTimestamp(pos, zonedTimestampPrecision), container); case TIMESTAMP_WITHOUT_TIME_ZONE: final int timestampPrecision = ((TimestampType) fieldType).getPrecision(); return (csvMapper, container, row, pos) -> - convertTimestamp(row.getTimestamp(pos, timestampPrecision), container); + convertTimestamp( + row.getTimestamp(pos, timestampPrecision), + container, + SQL_TIMESTAMP_FORMAT); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int zonedTimestampPrecision = + ((LocalZonedTimestampType) fieldType).getPrecision(); + return (csvMapper, container, row, pos) -> + convertTimestamp( + row.getTimestamp(pos, zonedTimestampPrecision), + container, + SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT); case DECIMAL: return createDecimalRowFieldConverter((DecimalType) fieldType); case ARRAY: @@ -207,16 +214,21 @@ public class RowDataToCsvConverters implements Serializable { case TIME_WITHOUT_TIME_ZONE: return (csvMapper, container, array, pos) -> convertTime(array.getInt(pos), container); - case TIMESTAMP_WITH_TIME_ZONE: - final int zonedTimestampPrecision = - ((LocalZonedTimestampType) fieldType).getPrecision(); - return (csvMapper, container, array, pos) -> - convertTimestamp( - array.getTimestamp(pos, zonedTimestampPrecision), container); case TIMESTAMP_WITHOUT_TIME_ZONE: final int timestampPrecision = ((TimestampType) fieldType).getPrecision(); return (csvMapper, container, array, pos) -> - convertTimestamp(array.getTimestamp(pos, timestampPrecision), container); + convertTimestamp( + array.getTimestamp(pos, timestampPrecision), + container, + SQL_TIMESTAMP_FORMAT); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int localZonedTimestampPrecision = + ((LocalZonedTimestampType) fieldType).getPrecision(); + return (csvMapper, container, array, pos) -> + convertTimestamp( + array.getTimestamp(pos, localZonedTimestampPrecision), + container, + SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT); case DECIMAL: return createDecimalArrayElementConverter((DecimalType) fieldType); // we don't support ARRAY and ROW in an ARRAY, see @@ -268,8 +280,9 @@ public class RowDataToCsvConverters implements Serializable { return container.textNode(ISO_LOCAL_TIME.format(time)); } - private static JsonNode convertTimestamp(TimestampData timestamp, ContainerNode<?> container) { - return container.textNode(DATE_TIME_FORMATTER.format(timestamp.toLocalDateTime())); + private static JsonNode convertTimestamp( + TimestampData timestamp, ContainerNode<?> container, DateTimeFormatter formatter) { + return container.textNode(formatter.format(timestamp.toLocalDateTime())); } private static RowFieldConverter createArrayRowFieldConverter(ArrayType type) { @@ -307,12 +320,4 @@ public class RowDataToCsvConverters implements Serializable { return arrayNode; }; } - - private static final DateTimeFormatter DATE_TIME_FORMATTER = - new DateTimeFormatterBuilder() - .parseCaseInsensitive() - .append(ISO_LOCAL_DATE) - .appendLiteral(' ') - .append(ISO_LOCAL_TIME) - .toFormatter(); } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java index 0fc2df2..3909b20 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java @@ -34,7 +34,8 @@ import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; import java.sql.Time; -import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; import java.time.LocalTime; import java.util.function.Consumer; @@ -54,8 +55,11 @@ import static org.apache.flink.table.api.DataTypes.SMALLINT; import static org.apache.flink.table.api.DataTypes.STRING; import static org.apache.flink.table.api.DataTypes.TIME; import static org.apache.flink.table.api.DataTypes.TIMESTAMP; +import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ; import static org.apache.flink.table.api.DataTypes.TINYINT; import static org.apache.flink.table.data.StringData.fromString; +import static org.apache.flink.table.data.TimestampData.fromInstant; +import static org.apache.flink.table.data.TimestampData.fromLocalDateTime; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -89,7 +93,16 @@ public class CsvRowDataSerDeSchemaTest { testNullableField(DATE(), "2018-10-12", Date.valueOf("2018-10-12")); testNullableField(TIME(0), "12:12:12", Time.valueOf("12:12:12")); testNullableField( - TIMESTAMP(0), "\"2018-10-12 12:12:12\"", Timestamp.valueOf("2018-10-12 12:12:12")); + TIMESTAMP(0), + "\"2018-10-12 12:12:12\"", + LocalDateTime.parse("2018-10-12T12:12:12")); + testNullableField( + TIMESTAMP(0), + "\"2018-10-12 12:12:12.123\"", + LocalDateTime.parse("2018-10-12T12:12:12.123")); + testNullableField(TIMESTAMP_LTZ(0), "\"1970-01-01 00:02:03Z\"", Instant.ofEpochSecond(123)); + testNullableField( + TIMESTAMP_LTZ(0), "\"1970-01-01 00:02:03.456Z\"", Instant.ofEpochMilli(123456)); testNullableField( ROW(FIELD("f0", STRING()), FIELD("f1", INT()), FIELD("f2", BOOLEAN())), "Hello;42;false", @@ -180,7 +193,7 @@ public class CsvRowDataSerDeSchemaTest { int precision = 5; try { testFieldDeserialization( - TIME(5), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";"); + TIME(precision), "12:12:12.45", LocalTime.parse("12:12:12"), deserConfig, ";"); fail(); } catch (Exception e) { assertEquals( @@ -275,9 +288,19 @@ public class CsvRowDataSerDeSchemaTest { @Test public void testSerializeDeserializeNestedTypes() throws Exception { DataType subDataType0 = - ROW(FIELD("f0c0", STRING()), FIELD("f0c1", INT()), FIELD("f0c2", STRING())); + ROW( + FIELD("f0c0", STRING()), + FIELD("f0c1", INT()), + FIELD("f0c2", STRING()), + FIELD("f0c3", TIMESTAMP()), + FIELD("f0c4", TIMESTAMP_LTZ())); DataType subDataType1 = - ROW(FIELD("f1c0", STRING()), FIELD("f1c1", INT()), FIELD("f1c2", STRING())); + ROW( + FIELD("f1c0", STRING()), + FIELD("f1c1", INT()), + FIELD("f1c2", STRING()), + FIELD("f0c3", TIMESTAMP()), + FIELD("f0c4", TIMESTAMP_LTZ())); DataType dataType = ROW(FIELD("f0", subDataType0), FIELD("f1", subDataType1)); RowType rowType = (RowType) dataType.getLogicalType(); @@ -290,12 +313,29 @@ public class CsvRowDataSerDeSchemaTest { RowData normalRow = GenericRowData.of( - rowData("hello", 1, "This is 1st top column"), - rowData("world", 2, "This is 2nd top column")); + rowData( + "hello", + 1, + "This is 1st top column", + LocalDateTime.parse("1970-01-01T01:02:03"), + Instant.ofEpochMilli(1000)), + rowData( + "world", + 2, + "This is 2nd top column", + LocalDateTime.parse("1970-01-01T01:02:04"), + Instant.ofEpochMilli(2000))); testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder); RowData nullRow = - GenericRowData.of(null, rowData("world", 2, "This is 2nd top column after null")); + GenericRowData.of( + null, + rowData( + "world", + 2, + "This is 2nd top column after null", + LocalDateTime.parse("1970-01-01T01:02:05"), + Instant.ofEpochMilli(3000))); testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder); } @@ -424,4 +464,14 @@ public class CsvRowDataSerDeSchemaTest { private static RowData rowData(String str1, int integer, String str2) { return GenericRowData.of(fromString(str1), integer, fromString(str2)); } + + private static RowData rowData( + String str1, int integer, String str2, LocalDateTime localDateTime, Instant instant) { + return GenericRowData.of( + fromString(str1), + integer, + fromString(str2), + fromLocalDateTime(localDateTime), + fromInstant(instant)); + } } diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java index 4f0ba29..3e064ea 100644 --- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java +++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java @@ -31,6 +31,8 @@ import java.math.BigInteger; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.time.Instant; +import java.time.LocalDateTime; import java.util.function.Consumer; import static org.junit.Assert.assertArrayEquals; @@ -77,7 +79,12 @@ public class CsvRowDeSerializationSchemaTest { testNullableField( Types.LOCAL_DATE_TIME, "\"2018-10-12 12:12:12\"", - Timestamp.valueOf("2018-10-12 12:12:12").toLocalDateTime()); + LocalDateTime.parse("2018-10-12T12:12:12")); + testNullableField( + Types.INSTANT, + "\"1970-01-01 00:00:01.123456789Z\"", + Instant.ofEpochMilli(1123).plusNanos(456789)); + testNullableField(Types.INSTANT, "\"1970-01-01 00:00:12Z\"", Instant.ofEpochSecond(12)); testNullableField( Types.ROW(Types.STRING, Types.INT, Types.BOOLEAN), "Hello;42;false", @@ -232,8 +239,20 @@ public class CsvRowDeSerializationSchemaTest { @Test public void testSerializeDeserializeNestedTypes() throws Exception { - final TypeInformation<Row> subDataType0 = Types.ROW(Types.STRING, Types.INT, Types.STRING); - final TypeInformation<Row> subDataType1 = Types.ROW(Types.STRING, Types.INT, Types.STRING); + final TypeInformation<Row> subDataType0 = + Types.ROW( + Types.STRING, + Types.INT, + Types.STRING, + Types.LOCAL_DATE_TIME, + Types.INSTANT); + final TypeInformation<Row> subDataType1 = + Types.ROW( + Types.STRING, + Types.INT, + Types.STRING, + Types.LOCAL_DATE_TIME, + Types.INSTANT); final TypeInformation<Row> rowInfo = Types.ROW(subDataType0, subDataType1); // serialization @@ -245,11 +264,29 @@ public class CsvRowDeSerializationSchemaTest { Row normalRow = Row.of( - Row.of("hello", 1, "This is 1st top column"), - Row.of("world", 2, "This is 2nd top column")); + Row.of( + "hello", + 1, + "This is 1st top column", + LocalDateTime.parse("1970-01-01T01:02:03"), + Instant.ofEpochMilli(1000)), + Row.of( + "world", + 2, + "This is 2nd top column", + LocalDateTime.parse("1970-01-01T01:02:04"), + Instant.ofEpochMilli(2000))); testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder); - Row nullRow = Row.of(null, Row.of("world", 2, "This is 2nd top column after null")); + Row nullRow = + Row.of( + null, + Row.of( + "world", + 2, + "This is 2nd top column after null", + LocalDateTime.parse("1970-01-01T01:02:05"), + Instant.ofEpochMilli(3000))); testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder); } diff --git a/flink-formats/flink-format-common/pom.xml b/flink-formats/flink-format-common/pom.xml new file mode 100644 index 0000000..ef30524 --- /dev/null +++ b/flink-formats/flink-format-common/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <parent> + <artifactId>flink-formats</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.13-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-format-common</artifactId> + <name>Flink : Format : Common</name> + <description> + This module contains common utils for different formats. + </description> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-annotations</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java similarity index 82% rename from flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java rename to flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java index c81aa20..402b6fe 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java +++ b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java @@ -16,7 +16,9 @@ * limitations under the License. */ -package org.apache.flink.formats.json; +package org.apache.flink.formats.common; + +import org.apache.flink.annotation.Internal; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; @@ -26,10 +28,11 @@ import java.time.temporal.ChronoField; * Time formats and timestamp formats respecting the RFC3339 specification, ISO-8601 specification * and SQL specification. */ -class TimeFormats { +@Internal +public class TimeFormats { /** Formatter for RFC 3339-compliant string representation of a time value. */ - static final DateTimeFormatter RFC3339_TIME_FORMAT = + public static final DateTimeFormatter RFC3339_TIME_FORMAT = new DateTimeFormatterBuilder() .appendPattern("HH:mm:ss") .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) @@ -40,7 +43,7 @@ class TimeFormats { * Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC * timezone). */ - static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT = + public static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE) .appendLiteral('T') @@ -48,10 +51,11 @@ class TimeFormats { .toFormatter(); /** Formatter for ISO8601 string representation of a timestamp value (without UTC timezone). */ - static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT = DateTimeFormatter.ISO_LOCAL_DATE_TIME; + public static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT = + DateTimeFormatter.ISO_LOCAL_DATE_TIME; /** Formatter for ISO8601 string representation of a timestamp value (with UTC timezone). */ - static final DateTimeFormatter ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = + public static final DateTimeFormatter ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE) .appendLiteral('T') @@ -60,14 +64,14 @@ class TimeFormats { .toFormatter(); /** Formatter for SQL string representation of a time value. */ - static final DateTimeFormatter SQL_TIME_FORMAT = + public static final DateTimeFormatter SQL_TIME_FORMAT = new DateTimeFormatterBuilder() .appendPattern("HH:mm:ss") .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true) .toFormatter(); /** Formatter for SQL string representation of a timestamp value (without UTC timezone). */ - static final DateTimeFormatter SQL_TIMESTAMP_FORMAT = + public static final DateTimeFormatter SQL_TIMESTAMP_FORMAT = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE) .appendLiteral(' ') @@ -75,7 +79,7 @@ class TimeFormats { .toFormatter(); /** Formatter for SQL string representation of a timestamp value (with UTC timezone). */ - static final DateTimeFormatter SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = + public static final DateTimeFormatter SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT = new DateTimeFormatterBuilder() .append(DateTimeFormatter.ISO_LOCAL_DATE) .appendLiteral(' ') diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java similarity index 97% rename from flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java rename to flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java index 8dacfc1..0e3655d 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java +++ b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.formats.json; +package org.apache.flink.formats.common; import org.apache.flink.annotation.Internal; diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index f7c653f..a5217b7 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -36,6 +36,12 @@ under the License. <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-format-common</artifactId> + <version>${project.version}</version> + </dependency> + <!-- core dependencies --> <dependency> diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 51338b2..8d69ec2 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java index 3ffa18b..ae62430 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.json; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.ValidationException; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index c54cefd..83d1b2d 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -21,6 +21,7 @@ package org.apache.flink.formats.json; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index b2afe62..1b5ee58 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -20,6 +20,7 @@ package org.apache.flink.formats.json; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index f6aa03b..6cfd84c 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -64,8 +64,8 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; -import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT; import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java index 9c50fc6..607d4ac 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java @@ -52,8 +52,8 @@ import java.util.stream.Collectors; import static java.lang.String.format; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; -import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java index b5d7ec8..e2c4f57 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.json; import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; @@ -58,11 +59,11 @@ import java.util.Iterator; import java.util.Map; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; -import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT; /** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */ @Internal diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java index c30690a1c9..36caaf9 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.json; import org.apache.flink.annotation.Internal; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; @@ -47,11 +48,11 @@ import java.time.ZoneOffset; import java.util.Arrays; import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE; -import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; -import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT; +import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT; /** Tool class used to convert from {@link RowData} to {@link JsonNode}. * */ @Internal diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java index 791dd59..f958c8d 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java @@ -20,7 +20,7 @@ package org.apache.flink.formats.json.canal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.MetadataConverter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java index acdaecf..d1bd3a3 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java @@ -21,8 +21,8 @@ package org.apache.flink.formats.json.canal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.ArrayData; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java index e43dd1c..a749808 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java index e80de2b..b15fb4a 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java @@ -19,9 +19,9 @@ package org.apache.flink.formats.json.canal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java index 22de3c9..5db2f64 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java @@ -20,7 +20,7 @@ package org.apache.flink.formats.json.debezium; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.json.TimestampFormat; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java index 628e053..d12b7cc 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java @@ -21,8 +21,8 @@ package org.apache.flink.formats.json.debezium; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java index bed3dd6..dddb52b 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java index 9bde53d..64e0105 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java @@ -19,9 +19,9 @@ package org.apache.flink.formats.json.debezium; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java index 07387089..8301210 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java @@ -20,8 +20,8 @@ package org.apache.flink.formats.json.maxwell; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonRowDataDeserializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java index c8d47f0..e03aa73 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java @@ -23,8 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java index 66e534b..27b9da4 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java @@ -19,9 +19,9 @@ package org.apache.flink.formats.json.maxwell; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java index 036fdbe..aae25c2 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java @@ -20,6 +20,7 @@ package org.apache.flink.formats.json; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index 84a3c70..6c748ce 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -19,6 +19,7 @@ package org.apache.flink.formats.json; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java index 8b47736..4968294 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java @@ -20,8 +20,8 @@ package org.apache.flink.formats.json.canal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java index 3a19384..73b34d9 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.json.canal; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java index bdd014a..466365f 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java @@ -20,8 +20,8 @@ package org.apache.flink.formats.json.debezium; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java index db98d89..a628dd6 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.json.debezium; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java index f2fc7d6..7fb1ba9 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java @@ -20,8 +20,8 @@ package org.apache.flink.formats.json.maxwell; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java index 8ae8143..f9f17eb 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.json.maxwell; +import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonOptions; -import org.apache.flink.formats.json.TimestampFormat; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml index d0d8479..8c02709 100644 --- a/flink-formats/pom.xml +++ b/flink-formats/pom.xml @@ -50,7 +50,8 @@ under the License. <module>flink-orc-nohive</module> <module>flink-hadoop-bulk</module> <module>flink-avro-glue-schema-registry</module> - </modules> + <module>flink-format-common</module> + </modules> <!-- override these root dependencies as 'provided', so they don't end up in the jars-with-dependencies (uber jars) of formats and