This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a5aaa0e  [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format
a5aaa0e is described below

commit a5aaa0e605eb4966b5d73c987ea4d9d0bc852c6d
Author: Leonard Xu <xbjt...@gmail.com>
AuthorDate: Wed Mar 24 17:10:23 2021 +0800

    [FLINK-21947][csv] Support TIMESTAMP_LTZ type in CSV format
    
    This closes #15356
---
 flink-formats/flink-csv/pom.xml                    |  6 ++
 .../formats/csv/CsvRowDeserializationSchema.java   | 11 +++-
 .../flink/formats/csv/CsvRowSchemaConverter.java   |  3 +-
 .../formats/csv/CsvRowSerializationSchema.java     | 23 ++++----
 .../flink/formats/csv/CsvToRowDataConverters.java  | 20 ++++---
 .../flink/formats/csv/RowDataToCsvConverters.java  | 53 +++++++++--------
 .../formats/csv/CsvRowDataSerDeSchemaTest.java     | 66 +++++++++++++++++++---
 .../csv/CsvRowDeSerializationSchemaTest.java       | 49 ++++++++++++++--
 flink-formats/flink-format-common/pom.xml          | 44 +++++++++++++++
 .../apache/flink/formats/common}/TimeFormats.java  | 22 +++++---
 .../flink/formats/common}/TimestampFormat.java     |  2 +-
 flink-formats/flink-json/pom.xml                   |  6 ++
 .../flink/formats/json/JsonFormatFactory.java      |  1 +
 .../org/apache/flink/formats/json/JsonOptions.java |  1 +
 .../json/JsonRowDataDeserializationSchema.java     |  1 +
 .../json/JsonRowDataSerializationSchema.java       |  1 +
 .../formats/json/JsonRowDeserializationSchema.java |  4 +-
 .../formats/json/JsonRowSerializationSchema.java   |  4 +-
 .../formats/json/JsonToRowDataConverters.java      | 11 ++--
 .../formats/json/RowDataToJsonConverters.java      | 11 ++--
 .../json/canal/CanalJsonDecodingFormat.java        |  2 +-
 .../json/canal/CanalJsonDeserializationSchema.java |  2 +-
 .../formats/json/canal/CanalJsonFormatFactory.java |  2 +-
 .../json/canal/CanalJsonSerializationSchema.java   |  2 +-
 .../json/debezium/DebeziumJsonDecodingFormat.java  |  2 +-
 .../DebeziumJsonDeserializationSchema.java         |  2 +-
 .../json/debezium/DebeziumJsonFormatFactory.java   |  2 +-
 .../debezium/DebeziumJsonSerializationSchema.java  |  2 +-
 .../maxwell/MaxwellJsonDeserializationSchema.java  |  2 +-
 .../json/maxwell/MaxwellJsonFormatFactory.java     |  2 +-
 .../maxwell/MaxwellJsonSerializationSchema.java    |  2 +-
 .../flink/formats/json/JsonFormatFactoryTest.java  |  1 +
 .../formats/json/JsonRowDataSerDeSchemaTest.java   |  1 +
 .../json/canal/CanalJsonFormatFactoryTest.java     |  2 +-
 .../json/canal/CanalJsonSerDeSchemaTest.java       |  2 +-
 .../debezium/DebeziumJsonFormatFactoryTest.java    |  2 +-
 .../json/debezium/DebeziumJsonSerDeSchemaTest.java |  2 +-
 .../json/maxwell/MaxwellJsonFormatFactoryTest.java |  2 +-
 .../json/maxwell/MaxwellJsonSerDerTest.java        |  2 +-
 flink-formats/pom.xml                              |  3 +-
 40 files changed, 276 insertions(+), 102 deletions(-)

diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml
index f458f15..3db3076 100644
--- a/flink-formats/flink-csv/pom.xml
+++ b/flink-formats/flink-csv/pom.xml
@@ -36,6 +36,12 @@ under the License.
 
        <dependencies>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-format-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- core dependencies -->
 
                <dependency>
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
index 3c9d18f..535767a 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
@@ -42,9 +42,14 @@ import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.Objects;
 
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+
 /**
  * Deserialization schema from CSV to Flink types.
  *
@@ -316,7 +321,11 @@ public final class CsvRowDeserializationSchema implements 
DeserializationSchema<
         } else if (info.equals(Types.LOCAL_TIME)) {
             return (node) -> Time.valueOf(node.asText()).toLocalTime();
         } else if (info.equals(Types.LOCAL_DATE_TIME)) {
-            return (node) -> 
Timestamp.valueOf(node.asText()).toLocalDateTime();
+            return (node) -> LocalDateTime.parse(node.asText(), 
SQL_TIMESTAMP_FORMAT);
+        } else if (info.equals(Types.INSTANT)) {
+            return (node) ->
+                    LocalDateTime.parse(node.asText(), 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT)
+                            .toInstant(ZoneOffset.UTC);
         } else if (info instanceof RowTypeInfo) {
             final RowTypeInfo rowTypeInfo = (RowTypeInfo) info;
             return createRowRuntimeConverter(rowTypeInfo, ignoreParseErrors, 
false);
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
index 9cca85c..352e74a 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
@@ -105,7 +105,8 @@ public final class CsvRowSchemaConverter {
                             Types.SQL_TIMESTAMP,
                             Types.LOCAL_DATE,
                             Types.LOCAL_TIME,
-                            Types.LOCAL_DATE_TIME));
+                            Types.LOCAL_DATE_TIME,
+                            Types.INSTANT));
 
     /**
      * Types that can be converted to ColumnType.STRING.
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
index 3ab7fa5..60dc48f 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
@@ -40,14 +40,14 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.Csv
 import java.io.Serializable;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.time.Instant;
 import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Objects;
 
-import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
 
 /**
  * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
@@ -209,14 +209,6 @@ public final class CsvRowSerializationSchema implements 
SerializationSchema<Row>
 
     // 
--------------------------------------------------------------------------------------------
 
-    private static final DateTimeFormatter DATE_TIME_FORMATTER =
-            new DateTimeFormatterBuilder()
-                    .parseCaseInsensitive()
-                    .append(ISO_LOCAL_DATE)
-                    .appendLiteral(' ')
-                    .append(ISO_LOCAL_TIME)
-                    .toFormatter();
-
     private interface RuntimeConverter extends Serializable {
         JsonNode convert(CsvMapper csvMapper, ContainerNode<?> container, 
Object obj);
     }
@@ -319,7 +311,12 @@ public final class CsvRowSerializationSchema implements 
SerializationSchema<Row>
             return (csvMapper, container, obj) -> 
container.textNode(obj.toString());
         } else if (info.equals(Types.LOCAL_DATE_TIME)) {
             return (csvMapper, container, obj) ->
-                    
container.textNode(DATE_TIME_FORMATTER.format((LocalDateTime) obj));
+                    
container.textNode(SQL_TIMESTAMP_FORMAT.format((LocalDateTime) obj));
+        } else if (info.equals(Types.INSTANT)) {
+            return (csvMapper, container, obj) ->
+                    container.textNode(
+                            LocalDateTime.ofInstant((Instant) obj, 
ZoneId.of("UTC"))
+                                    
.format(SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT));
         } else if (info instanceof RowTypeInfo) {
             return createRowRuntimeConverter((RowTypeInfo) info, false);
         } else if (info instanceof BasicArrayTypeInfo) {
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
index 82ed7a6..ce19755 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java
@@ -40,8 +40,12 @@ import java.io.Serializable;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
 import java.sql.Date;
-import java.sql.Timestamp;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.format.DateTimeFormatter;
+
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
 
 /** Tool class used to convert from CSV-format {@link JsonNode} to {@link 
RowData}. * */
 @Internal
@@ -141,9 +145,11 @@ public class CsvToRowDataConverters implements 
Serializable {
                 return this::convertToDate;
             case TIME_WITHOUT_TIME_ZONE:
                 return convertToTime((TimeType) type);
-            case TIMESTAMP_WITH_TIME_ZONE:
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                return this::convertToTimestamp;
+                return jsonNode -> convertToTimestamp(jsonNode, 
SQL_TIMESTAMP_FORMAT);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return jsonNode ->
+                        convertToTimestamp(jsonNode, 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT);
             case FLOAT:
                 return this::convertToFloat;
             case DOUBLE:
@@ -245,10 +251,10 @@ public class CsvToRowDataConverters implements 
Serializable {
         };
     }
 
-    private TimestampData convertToTimestamp(JsonNode jsonNode) {
-        // csv currently is using Timestamp.valueOf() to parse timestamp string
-        Timestamp timestamp = Timestamp.valueOf(jsonNode.asText());
-        return TimestampData.fromTimestamp(timestamp);
+    private TimestampData convertToTimestamp(
+            JsonNode jsonNode, DateTimeFormatter dateTimeFormatter) {
+        return TimestampData.fromLocalDateTime(
+                LocalDateTime.parse(jsonNode.asText(), dateTimeFormatter));
     }
 
     private StringData convertToString(JsonNode jsonNode) {
diff --git 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java
 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java
index f8ee46d..69f56d42 100644
--- 
a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java
+++ 
b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/RowDataToCsvConverters.java
@@ -40,11 +40,12 @@ import java.io.Serializable;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
 import java.util.Arrays;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_TIME;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
 
 /** Tool class used to convert from {@link RowData} to CSV-format {@link 
JsonNode}. * */
 @Internal
@@ -132,15 +133,21 @@ public class RowDataToCsvConverters implements 
Serializable {
                 return (csvMapper, container, row, pos) -> 
convertDate(row.getInt(pos), container);
             case TIME_WITHOUT_TIME_ZONE:
                 return (csvMapper, container, row, pos) -> 
convertTime(row.getInt(pos), container);
-            case TIMESTAMP_WITH_TIME_ZONE:
-                final int zonedTimestampPrecision =
-                        ((LocalZonedTimestampType) fieldType).getPrecision();
-                return (csvMapper, container, row, pos) ->
-                        convertTimestamp(row.getTimestamp(pos, 
zonedTimestampPrecision), container);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 final int timestampPrecision = ((TimestampType) 
fieldType).getPrecision();
                 return (csvMapper, container, row, pos) ->
-                        convertTimestamp(row.getTimestamp(pos, 
timestampPrecision), container);
+                        convertTimestamp(
+                                row.getTimestamp(pos, timestampPrecision),
+                                container,
+                                SQL_TIMESTAMP_FORMAT);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int zonedTimestampPrecision =
+                        ((LocalZonedTimestampType) fieldType).getPrecision();
+                return (csvMapper, container, row, pos) ->
+                        convertTimestamp(
+                                row.getTimestamp(pos, zonedTimestampPrecision),
+                                container,
+                                SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT);
             case DECIMAL:
                 return createDecimalRowFieldConverter((DecimalType) fieldType);
             case ARRAY:
@@ -207,16 +214,21 @@ public class RowDataToCsvConverters implements 
Serializable {
             case TIME_WITHOUT_TIME_ZONE:
                 return (csvMapper, container, array, pos) ->
                         convertTime(array.getInt(pos), container);
-            case TIMESTAMP_WITH_TIME_ZONE:
-                final int zonedTimestampPrecision =
-                        ((LocalZonedTimestampType) fieldType).getPrecision();
-                return (csvMapper, container, array, pos) ->
-                        convertTimestamp(
-                                array.getTimestamp(pos, 
zonedTimestampPrecision), container);
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 final int timestampPrecision = ((TimestampType) 
fieldType).getPrecision();
                 return (csvMapper, container, array, pos) ->
-                        convertTimestamp(array.getTimestamp(pos, 
timestampPrecision), container);
+                        convertTimestamp(
+                                array.getTimestamp(pos, timestampPrecision),
+                                container,
+                                SQL_TIMESTAMP_FORMAT);
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int localZonedTimestampPrecision =
+                        ((LocalZonedTimestampType) fieldType).getPrecision();
+                return (csvMapper, container, array, pos) ->
+                        convertTimestamp(
+                                array.getTimestamp(pos, 
localZonedTimestampPrecision),
+                                container,
+                                SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT);
             case DECIMAL:
                 return createDecimalArrayElementConverter((DecimalType) 
fieldType);
                 // we don't support ARRAY and ROW in an ARRAY, see
@@ -268,8 +280,9 @@ public class RowDataToCsvConverters implements Serializable 
{
         return container.textNode(ISO_LOCAL_TIME.format(time));
     }
 
-    private static JsonNode convertTimestamp(TimestampData timestamp, 
ContainerNode<?> container) {
-        return 
container.textNode(DATE_TIME_FORMATTER.format(timestamp.toLocalDateTime()));
+    private static JsonNode convertTimestamp(
+            TimestampData timestamp, ContainerNode<?> container, 
DateTimeFormatter formatter) {
+        return 
container.textNode(formatter.format(timestamp.toLocalDateTime()));
     }
 
     private static RowFieldConverter createArrayRowFieldConverter(ArrayType 
type) {
@@ -307,12 +320,4 @@ public class RowDataToCsvConverters implements 
Serializable {
             return arrayNode;
         };
     }
-
-    private static final DateTimeFormatter DATE_TIME_FORMATTER =
-            new DateTimeFormatterBuilder()
-                    .parseCaseInsensitive()
-                    .append(ISO_LOCAL_DATE)
-                    .appendLiteral(' ')
-                    .append(ISO_LOCAL_TIME)
-                    .toFormatter();
 }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
index 0fc2df2..3909b20 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDataSerDeSchemaTest.java
@@ -34,7 +34,8 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
-import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.function.Consumer;
 
@@ -54,8 +55,11 @@ import static org.apache.flink.table.api.DataTypes.SMALLINT;
 import static org.apache.flink.table.api.DataTypes.STRING;
 import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
+import static org.apache.flink.table.api.DataTypes.TIMESTAMP_LTZ;
 import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.apache.flink.table.data.StringData.fromString;
+import static org.apache.flink.table.data.TimestampData.fromInstant;
+import static org.apache.flink.table.data.TimestampData.fromLocalDateTime;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -89,7 +93,16 @@ public class CsvRowDataSerDeSchemaTest {
         testNullableField(DATE(), "2018-10-12", Date.valueOf("2018-10-12"));
         testNullableField(TIME(0), "12:12:12", Time.valueOf("12:12:12"));
         testNullableField(
-                TIMESTAMP(0), "\"2018-10-12 12:12:12\"", 
Timestamp.valueOf("2018-10-12 12:12:12"));
+                TIMESTAMP(0),
+                "\"2018-10-12 12:12:12\"",
+                LocalDateTime.parse("2018-10-12T12:12:12"));
+        testNullableField(
+                TIMESTAMP(0),
+                "\"2018-10-12 12:12:12.123\"",
+                LocalDateTime.parse("2018-10-12T12:12:12.123"));
+        testNullableField(TIMESTAMP_LTZ(0), "\"1970-01-01 00:02:03Z\"", 
Instant.ofEpochSecond(123));
+        testNullableField(
+                TIMESTAMP_LTZ(0), "\"1970-01-01 00:02:03.456Z\"", 
Instant.ofEpochMilli(123456));
         testNullableField(
                 ROW(FIELD("f0", STRING()), FIELD("f1", INT()), FIELD("f2", 
BOOLEAN())),
                 "Hello;42;false",
@@ -180,7 +193,7 @@ public class CsvRowDataSerDeSchemaTest {
         int precision = 5;
         try {
             testFieldDeserialization(
-                    TIME(5), "12:12:12.45", LocalTime.parse("12:12:12"), 
deserConfig, ";");
+                    TIME(precision), "12:12:12.45", 
LocalTime.parse("12:12:12"), deserConfig, ";");
             fail();
         } catch (Exception e) {
             assertEquals(
@@ -275,9 +288,19 @@ public class CsvRowDataSerDeSchemaTest {
     @Test
     public void testSerializeDeserializeNestedTypes() throws Exception {
         DataType subDataType0 =
-                ROW(FIELD("f0c0", STRING()), FIELD("f0c1", INT()), 
FIELD("f0c2", STRING()));
+                ROW(
+                        FIELD("f0c0", STRING()),
+                        FIELD("f0c1", INT()),
+                        FIELD("f0c2", STRING()),
+                        FIELD("f0c3", TIMESTAMP()),
+                        FIELD("f0c4", TIMESTAMP_LTZ()));
         DataType subDataType1 =
-                ROW(FIELD("f1c0", STRING()), FIELD("f1c1", INT()), 
FIELD("f1c2", STRING()));
+                ROW(
+                        FIELD("f1c0", STRING()),
+                        FIELD("f1c1", INT()),
+                        FIELD("f1c2", STRING()),
+                        FIELD("f0c3", TIMESTAMP()),
+                        FIELD("f0c4", TIMESTAMP_LTZ()));
         DataType dataType = ROW(FIELD("f0", subDataType0), FIELD("f1", 
subDataType1));
         RowType rowType = (RowType) dataType.getLogicalType();
 
@@ -290,12 +313,29 @@ public class CsvRowDataSerDeSchemaTest {
 
         RowData normalRow =
                 GenericRowData.of(
-                        rowData("hello", 1, "This is 1st top column"),
-                        rowData("world", 2, "This is 2nd top column"));
+                        rowData(
+                                "hello",
+                                1,
+                                "This is 1st top column",
+                                LocalDateTime.parse("1970-01-01T01:02:03"),
+                                Instant.ofEpochMilli(1000)),
+                        rowData(
+                                "world",
+                                2,
+                                "This is 2nd top column",
+                                LocalDateTime.parse("1970-01-01T01:02:04"),
+                                Instant.ofEpochMilli(2000)));
         testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
 
         RowData nullRow =
-                GenericRowData.of(null, rowData("world", 2, "This is 2nd top 
column after null"));
+                GenericRowData.of(
+                        null,
+                        rowData(
+                                "world",
+                                2,
+                                "This is 2nd top column after null",
+                                LocalDateTime.parse("1970-01-01T01:02:05"),
+                                Instant.ofEpochMilli(3000)));
         testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
     }
 
@@ -424,4 +464,14 @@ public class CsvRowDataSerDeSchemaTest {
     private static RowData rowData(String str1, int integer, String str2) {
         return GenericRowData.of(fromString(str1), integer, fromString(str2));
     }
+
+    private static RowData rowData(
+            String str1, int integer, String str2, LocalDateTime 
localDateTime, Instant instant) {
+        return GenericRowData.of(
+                fromString(str1),
+                integer,
+                fromString(str2),
+                fromLocalDateTime(localDateTime),
+                fromInstant(instant));
+    }
 }
diff --git 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
index 4f0ba29..3e064ea 100644
--- 
a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvRowDeSerializationSchemaTest.java
@@ -31,6 +31,8 @@ import java.math.BigInteger;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
 import java.util.function.Consumer;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -77,7 +79,12 @@ public class CsvRowDeSerializationSchemaTest {
         testNullableField(
                 Types.LOCAL_DATE_TIME,
                 "\"2018-10-12 12:12:12\"",
-                Timestamp.valueOf("2018-10-12 12:12:12").toLocalDateTime());
+                LocalDateTime.parse("2018-10-12T12:12:12"));
+        testNullableField(
+                Types.INSTANT,
+                "\"1970-01-01 00:00:01.123456789Z\"",
+                Instant.ofEpochMilli(1123).plusNanos(456789));
+        testNullableField(Types.INSTANT, "\"1970-01-01 00:00:12Z\"", 
Instant.ofEpochSecond(12));
         testNullableField(
                 Types.ROW(Types.STRING, Types.INT, Types.BOOLEAN),
                 "Hello;42;false",
@@ -232,8 +239,20 @@ public class CsvRowDeSerializationSchemaTest {
 
     @Test
     public void testSerializeDeserializeNestedTypes() throws Exception {
-        final TypeInformation<Row> subDataType0 = Types.ROW(Types.STRING, 
Types.INT, Types.STRING);
-        final TypeInformation<Row> subDataType1 = Types.ROW(Types.STRING, 
Types.INT, Types.STRING);
+        final TypeInformation<Row> subDataType0 =
+                Types.ROW(
+                        Types.STRING,
+                        Types.INT,
+                        Types.STRING,
+                        Types.LOCAL_DATE_TIME,
+                        Types.INSTANT);
+        final TypeInformation<Row> subDataType1 =
+                Types.ROW(
+                        Types.STRING,
+                        Types.INT,
+                        Types.STRING,
+                        Types.LOCAL_DATE_TIME,
+                        Types.INSTANT);
         final TypeInformation<Row> rowInfo = Types.ROW(subDataType0, 
subDataType1);
 
         // serialization
@@ -245,11 +264,29 @@ public class CsvRowDeSerializationSchemaTest {
 
         Row normalRow =
                 Row.of(
-                        Row.of("hello", 1, "This is 1st top column"),
-                        Row.of("world", 2, "This is 2nd top column"));
+                        Row.of(
+                                "hello",
+                                1,
+                                "This is 1st top column",
+                                LocalDateTime.parse("1970-01-01T01:02:03"),
+                                Instant.ofEpochMilli(1000)),
+                        Row.of(
+                                "world",
+                                2,
+                                "This is 2nd top column",
+                                LocalDateTime.parse("1970-01-01T01:02:04"),
+                                Instant.ofEpochMilli(2000)));
         testSerDeConsistency(normalRow, serSchemaBuilder, deserSchemaBuilder);
 
-        Row nullRow = Row.of(null, Row.of("world", 2, "This is 2nd top column 
after null"));
+        Row nullRow =
+                Row.of(
+                        null,
+                        Row.of(
+                                "world",
+                                2,
+                                "This is 2nd top column after null",
+                                LocalDateTime.parse("1970-01-01T01:02:05"),
+                                Instant.ofEpochMilli(3000)));
         testSerDeConsistency(nullRow, serSchemaBuilder, deserSchemaBuilder);
     }
 
diff --git a/flink-formats/flink-format-common/pom.xml 
b/flink-formats/flink-format-common/pom.xml
new file mode 100644
index 0000000..ef30524
--- /dev/null
+++ b/flink-formats/flink-format-common/pom.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <parent>
+        <artifactId>flink-formats</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.13-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-format-common</artifactId>
+    <name>Flink : Format : Common</name>
+    <description>
+        This module contains common utils for different formats.
+    </description>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-annotations</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
 
b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java
similarity index 82%
rename from 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
rename to 
flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java
index c81aa20..402b6fe 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java
+++ 
b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimeFormats.java
@@ -16,7 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.json;
+package org.apache.flink.formats.common;
+
+import org.apache.flink.annotation.Internal;
 
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
@@ -26,10 +28,11 @@ import java.time.temporal.ChronoField;
  * Time formats and timestamp formats respecting the RFC3339 specification, 
ISO-8601 specification
  * and SQL specification.
  */
-class TimeFormats {
+@Internal
+public class TimeFormats {
 
     /** Formatter for RFC 3339-compliant string representation of a time 
value. */
-    static final DateTimeFormatter RFC3339_TIME_FORMAT =
+    public static final DateTimeFormatter RFC3339_TIME_FORMAT =
             new DateTimeFormatterBuilder()
                     .appendPattern("HH:mm:ss")
                     .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
@@ -40,7 +43,7 @@ class TimeFormats {
      * Formatter for RFC 3339-compliant string representation of a timestamp 
value (with UTC
      * timezone).
      */
-    static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT =
+    public static final DateTimeFormatter RFC3339_TIMESTAMP_FORMAT =
             new DateTimeFormatterBuilder()
                     .append(DateTimeFormatter.ISO_LOCAL_DATE)
                     .appendLiteral('T')
@@ -48,10 +51,11 @@ class TimeFormats {
                     .toFormatter();
 
     /** Formatter for ISO8601 string representation of a timestamp value 
(without UTC timezone). */
-    static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT = 
DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+    public static final DateTimeFormatter ISO8601_TIMESTAMP_FORMAT =
+            DateTimeFormatter.ISO_LOCAL_DATE_TIME;
 
     /** Formatter for ISO8601 string representation of a timestamp value (with 
UTC timezone). */
-    static final DateTimeFormatter 
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
+    public static final DateTimeFormatter 
ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
             new DateTimeFormatterBuilder()
                     .append(DateTimeFormatter.ISO_LOCAL_DATE)
                     .appendLiteral('T')
@@ -60,14 +64,14 @@ class TimeFormats {
                     .toFormatter();
 
     /** Formatter for SQL string representation of a time value. */
-    static final DateTimeFormatter SQL_TIME_FORMAT =
+    public static final DateTimeFormatter SQL_TIME_FORMAT =
             new DateTimeFormatterBuilder()
                     .appendPattern("HH:mm:ss")
                     .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
                     .toFormatter();
 
     /** Formatter for SQL string representation of a timestamp value (without 
UTC timezone). */
-    static final DateTimeFormatter SQL_TIMESTAMP_FORMAT =
+    public static final DateTimeFormatter SQL_TIMESTAMP_FORMAT =
             new DateTimeFormatterBuilder()
                     .append(DateTimeFormatter.ISO_LOCAL_DATE)
                     .appendLiteral(' ')
@@ -75,7 +79,7 @@ class TimeFormats {
                     .toFormatter();
 
     /** Formatter for SQL string representation of a timestamp value (with UTC 
timezone). */
-    static final DateTimeFormatter SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
+    public static final DateTimeFormatter 
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT =
             new DateTimeFormatterBuilder()
                     .append(DateTimeFormatter.ISO_LOCAL_DATE)
                     .appendLiteral(' ')
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
 
b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java
similarity index 97%
rename from 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
rename to 
flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java
index 8dacfc1..0e3655d 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimestampFormat.java
+++ 
b/flink-formats/flink-format-common/src/main/java/org/apache/flink/formats/common/TimestampFormat.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.formats.json;
+package org.apache.flink.formats.common;
 
 import org.apache.flink.annotation.Internal;
 
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index f7c653f..a5217b7 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -36,6 +36,12 @@ under the License.
 
        <dependencies>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-format-common</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- core dependencies -->
 
                <dependency>
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index 51338b2..8d69ec2 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
index 3ffa18b..ae62430 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
index c54cefd..83d1b2d 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.json;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.RowType;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
index b2afe62..1b5ee58 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
index f6aa03b..6cfd84c 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
@@ -64,8 +64,8 @@ import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT;
 import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 import static org.apache.flink.util.Preconditions.checkArgument;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
index 9c50fc6..607d4ac 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
@@ -52,8 +52,8 @@ import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.RFC3339_TIME_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.RFC3339_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.RFC3339_TIME_FORMAT;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
index b5d7ec8..e2c4f57 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonToRowDataConverters.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -58,11 +59,11 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
-import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
-import static 
org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
 
 /** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */
 @Internal
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
index c30690a1c9..36caaf9 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/RowDataToJsonConverters.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.DecimalData;
@@ -47,11 +48,11 @@ import java.time.ZoneOffset;
 import java.util.Arrays;
 
 import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
-import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
-import static 
org.apache.flink.formats.json.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_FORMAT;
-import static 
org.apache.flink.formats.json.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
-import static org.apache.flink.formats.json.TimeFormats.SQL_TIME_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static 
org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
 
 /** Tool class used to convert from {@link RowData} to {@link JsonNode}. * */
 @Internal
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
index 791dd59..f958c8d 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.json.canal;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.formats.common.TimestampFormat;
 import 
org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.MetadataConverter;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
index acdaecf..d1bd3a3 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
@@ -21,8 +21,8 @@ package org.apache.flink.formats.json.canal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import 
org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index e43dd1c..a749808 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -22,8 +22,8 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
index e80de2b..b15fb4a 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java
@@ -19,9 +19,9 @@
 package org.apache.flink.formats.json.canal;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.ArrayData;
 import org.apache.flink.table.data.GenericArrayData;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
index 22de3c9..5db2f64 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDecodingFormat.java
@@ -20,7 +20,7 @@ package org.apache.flink.formats.json.debezium;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.json.TimestampFormat;
+import org.apache.flink.formats.common.TimestampFormat;
 import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.MetadataConverter;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
index 628e053..d12b7cc 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@@ -21,8 +21,8 @@ package org.apache.flink.formats.json.debezium;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import 
org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index bed3dd6..dddb52b 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -22,8 +22,8 @@ import 
org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
index 9bde53d..64e0105 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java
@@ -19,9 +19,9 @@
 package org.apache.flink.formats.json.debezium;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
index 07387089..8301210 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonDeserializationSchema.java
@@ -20,8 +20,8 @@ package org.apache.flink.formats.json.maxwell;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
index c8d47f0..e03aa73 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java
@@ -23,8 +23,8 @@ import 
org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
index 66e534b..27b9da4 100644
--- 
a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java
@@ -19,9 +19,9 @@
 package org.apache.flink.formats.json.maxwell;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index 036fdbe..aae25c2 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.json;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
index 84a3c70..6c748ce 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.json;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index 8b47736..4968294 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.formats.json.canal;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
index 3a19384..73b34d9 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.formats.json.canal;
 
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import 
org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.ReadableMetadata;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index bdd014a..466365f 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.formats.json.debezium;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
index db98d89..a628dd6 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.formats.json.debezium;
 
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import 
org.apache.flink.formats.json.debezium.DebeziumJsonDecodingFormat.ReadableMetadata;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
index f2fc7d6..7fb1ba9 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.formats.json.maxwell;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
index 8ae8143..f9f17eb 100644
--- 
a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.formats.json.maxwell;
 
+import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonOptions;
-import org.apache.flink.formats.json.TimestampFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index d0d8479..8c02709 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -50,7 +50,8 @@ under the License.
                <module>flink-orc-nohive</module>
                <module>flink-hadoop-bulk</module>
                <module>flink-avro-glue-schema-registry</module>
-       </modules>
+    <module>flink-format-common</module>
+  </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up
                in the jars-with-dependencies (uber jars) of formats and

Reply via email to