This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 533ead6ae94 [FLINK-33198][format/avro] Properly map Avro timestamp 
types in Flink Avro format
533ead6ae94 is described below

commit 533ead6ae946cbc77525d276b6dea965d390181a
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Tue Jan 23 18:43:17 2024 -0800

    [FLINK-33198][format/avro] Properly map Avro timestamp types in Flink Avro 
format
    
    This closes #23511.
---
 .../flink/formats/avro/AvroFileFormatFactory.java  |  12 +-
 .../flink/formats/avro/AvroFormatFactory.java      |  10 +-
 .../flink/formats/avro/AvroFormatOptions.java      |  14 +-
 .../avro/AvroRowDataDeserializationSchema.java     |  28 +-
 .../avro/AvroRowDataSerializationSchema.java       |  17 ++
 .../formats/avro/AvroRowDeserializationSchema.java |  86 ++++--
 .../formats/avro/AvroRowSerializationSchema.java   |  53 +++-
 .../formats/avro/AvroToRowDataConverters.java      |  40 ++-
 .../formats/avro/RowDataToAvroConverters.java      |  75 +++--
 .../avro/typeutils/AvroSchemaConverter.java        | 227 ++++++++++++---
 .../flink/formats/avro/AvroFormatFactoryTest.java  |  94 +++++-
 .../avro/AvroRowDataDeSerializationSchemaTest.java |  95 +++++-
 .../avro/AvroRowDeSerializationSchemaTest.java     | 317 +++++++++++++++------
 .../avro/typeutils/AvroSchemaConverterTest.java    | 115 ++++++++
 .../avro/typeutils/AvroTypeExtractionTest.java     |  15 +-
 .../flink/formats/avro/utils/AvroTestUtils.java    |  50 ++++
 .../flink-avro/src/test/resources/avro/user.avsc   |  13 +-
 17 files changed, 1042 insertions(+), 219 deletions(-)

diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
index 5e128ec8ea3..2bc0fd1c036 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
@@ -57,6 +57,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_OUTPUT_CODEC;
+import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING;
 
 /** Avro format factory for file system. */
 @Internal
@@ -85,7 +86,8 @@ public class AvroFileFormatFactory implements 
BulkReaderFormatFactory, BulkWrite
                     DynamicTableSink.Context context, DataType 
consumedDataType) {
                 return new RowDataAvroWriterFactory(
                         (RowType) consumedDataType.getLogicalType(),
-                        formatOptions.get(AVRO_OUTPUT_CODEC));
+                        formatOptions.get(AVRO_OUTPUT_CODEC),
+                        formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING));
             }
         };
     }
@@ -104,6 +106,7 @@ public class AvroFileFormatFactory implements 
BulkReaderFormatFactory, BulkWrite
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(AVRO_OUTPUT_CODEC);
+        options.add(AVRO_TIMESTAMP_LEGACY_MAPPING);
         return options;
     }
 
@@ -182,7 +185,8 @@ public class AvroFileFormatFactory implements 
BulkReaderFormatFactory, BulkWrite
         private final AvroWriterFactory<GenericRecord> factory;
         private final RowType rowType;
 
-        private RowDataAvroWriterFactory(RowType rowType, String codec) {
+        private RowDataAvroWriterFactory(
+                RowType rowType, String codec, boolean legacyTimestampMapping) 
{
             this.rowType = rowType;
             this.factory =
                     new AvroWriterFactory<>(
@@ -190,7 +194,9 @@ public class AvroFileFormatFactory implements 
BulkReaderFormatFactory, BulkWrite
                                 @Override
                                 public DataFileWriter<GenericRecord> 
createWriter(OutputStream out)
                                         throws IOException {
-                                    Schema schema = 
AvroSchemaConverter.convertToSchema(rowType);
+                                    Schema schema =
+                                            
AvroSchemaConverter.convertToSchema(
+                                                    rowType, 
legacyTimestampMapping);
                                     DatumWriter<GenericRecord> datumWriter =
                                             new GenericDatumWriter<>(schema);
                                     DataFileWriter<GenericRecord> 
dataFileWriter =
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
index 65b3c91badc..eda1cc1a898 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import static org.apache.flink.formats.avro.AvroFormatOptions.AVRO_ENCODING;
+import static 
org.apache.flink.formats.avro.AvroFormatOptions.AVRO_TIMESTAMP_LEGACY_MAPPING;
 
 /**
  * Table format factory for providing configured instances of Avro to RowData 
{@link
@@ -61,6 +62,7 @@ public class AvroFormatFactory implements 
DeserializationFormatFactory, Serializ
         FactoryUtil.validateFactoryOptions(this, formatOptions);
 
         AvroEncoding encoding = formatOptions.get(AVRO_ENCODING);
+        boolean legacyTimestampMapping = 
formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING);
 
         return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() 
{
             @Override
@@ -73,7 +75,8 @@ public class AvroFormatFactory implements 
DeserializationFormatFactory, Serializ
                 final RowType rowType = (RowType) 
producedDataType.getLogicalType();
                 final TypeInformation<RowData> rowDataTypeInfo =
                         context.createTypeInformation(producedDataType);
-                return new AvroRowDataDeserializationSchema(rowType, 
rowDataTypeInfo, encoding);
+                return new AvroRowDataDeserializationSchema(
+                        rowType, rowDataTypeInfo, encoding, 
legacyTimestampMapping);
             }
 
             @Override
@@ -89,13 +92,15 @@ public class AvroFormatFactory implements 
DeserializationFormatFactory, Serializ
         FactoryUtil.validateFactoryOptions(this, formatOptions);
 
         AvroEncoding encoding = formatOptions.get(AVRO_ENCODING);
+        boolean legacyTimestampMapping = 
formatOptions.get(AVRO_TIMESTAMP_LEGACY_MAPPING);
 
         return new EncodingFormat<SerializationSchema<RowData>>() {
             @Override
             public SerializationSchema<RowData> createRuntimeEncoder(
                     DynamicTableSink.Context context, DataType 
consumedDataType) {
                 final RowType rowType = (RowType) 
consumedDataType.getLogicalType();
-                return new AvroRowDataSerializationSchema(rowType, encoding);
+                return new AvroRowDataSerializationSchema(
+                        rowType, encoding, legacyTimestampMapping);
             }
 
             @Override
@@ -119,6 +124,7 @@ public class AvroFormatFactory implements 
DeserializationFormatFactory, Serializ
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(AVRO_ENCODING);
+        options.add(AVRO_TIMESTAMP_LEGACY_MAPPING);
         return options;
     }
 }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java
index a9aa8c3a7a7..6a9d81ea195 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatOptions.java
@@ -36,7 +36,6 @@ public class AvroFormatOptions {
                     .stringType()
                     .defaultValue(SNAPPY_CODEC)
                     .withDescription("The compression codec for avro");
-
     public static final ConfigOption<AvroEncoding> AVRO_ENCODING =
             ConfigOptions.key("encoding")
                     .enumType(AvroEncoding.class)
@@ -71,5 +70,18 @@ public class AvroFormatOptions {
         }
     }
 
+    public static final ConfigOption<Boolean> AVRO_TIMESTAMP_LEGACY_MAPPING =
+            ConfigOptions.key("timestamp_mapping.legacy")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Use the legacy mapping of timestamp in avro. "
+                                    + "Before 1.19, The default behavior of 
Flink wrongly mapped "
+                                    + "both SQL TIMESTAMP and TIMESTAMP_LTZ 
type to AVRO TIMESTAMP. "
+                                    + "The correct behavior is Flink SQL 
TIMESTAMP maps Avro LOCAL "
+                                    + "TIMESTAMP and Flink SQL TIMESTAMP_LTZ 
maps Avro TIMESTAMP, "
+                                    + "you can obtain the correct mapping by 
disable using this legacy mapping."
+                                    + " Use legacy behavior by default for 
compatibility consideration.");
+
     private AvroFormatOptions() {}
 }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
index b692a2f1fb3..799eb90b98f 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
@@ -59,18 +59,18 @@ public class AvroRowDataDeserializationSchema implements 
DeserializationSchema<R
     private final AvroToRowDataConverters.AvroToRowDataConverter 
runtimeConverter;
 
     /**
-     * Creates a Avro deserialization schema for the given logical type.
+     * Creates an Avro deserialization schema for the given logical type.
      *
      * @param rowType The logical type used to deserialize the data.
      * @param typeInfo The TypeInformation to be used by {@link
      *     AvroRowDataDeserializationSchema#getProducedType()}.
      */
     public AvroRowDataDeserializationSchema(RowType rowType, 
TypeInformation<RowData> typeInfo) {
-        this(rowType, typeInfo, AvroEncoding.BINARY);
+        this(rowType, typeInfo, AvroEncoding.BINARY, true);
     }
 
     /**
-     * Creates a Avro deserialization schema for the given logical type.
+     * Creates an Avro deserialization schema for the given logical type.
      *
      * @param rowType The logical type used to deserialize the data.
      * @param typeInfo The TypeInformation to be used by {@link
@@ -86,6 +86,28 @@ public class AvroRowDataDeserializationSchema implements 
DeserializationSchema<R
                 typeInfo);
     }
 
+    /**
+     * Creates an Avro deserialization schema for the given logical type.
+     *
+     * @param rowType The logical type used to deserialize the data.
+     * @param typeInfo The TypeInformation to be used by {@link
+     *     AvroRowDataDeserializationSchema#getProducedType()}.
+     * @param encoding The serialization approach used to deserialize the data.
+     * @param legacyTimestampMapping Whether to use legacy timestamp mapping.
+     */
+    public AvroRowDataDeserializationSchema(
+            RowType rowType,
+            TypeInformation<RowData> typeInfo,
+            AvroEncoding encoding,
+            boolean legacyTimestampMapping) {
+        this(
+                AvroDeserializationSchema.forGeneric(
+                        AvroSchemaConverter.convertToSchema(rowType, 
legacyTimestampMapping),
+                        encoding),
+                AvroToRowDataConverters.createRowConverter(rowType, 
legacyTimestampMapping),
+                typeInfo);
+    }
+
     /**
      * Creates a Avro deserialization schema for the given logical type.
      *
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
index 76c9389998e..b88979cfc88 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataSerializationSchema.java
@@ -72,6 +72,23 @@ public class AvroRowDataSerializationSchema implements 
SerializationSchema<RowDa
                 RowDataToAvroConverters.createConverter(rowType));
     }
 
+    /**
+     * Creates an Avro serialization schema with the given record row type and 
legacy timestamp
+     * mapping flag.
+     *
+     * @param encoding The serialization approach used to serialize the data.
+     * @param legacyTimestampMapping Use the legacy timestamp mapping.
+     */
+    public AvroRowDataSerializationSchema(
+            RowType rowType, AvroEncoding encoding, boolean 
legacyTimestampMapping) {
+        this(
+                rowType,
+                AvroSerializationSchema.forGeneric(
+                        AvroSchemaConverter.convertToSchema(rowType, 
legacyTimestampMapping),
+                        encoding),
+                RowDataToAvroConverters.createConverter(rowType, 
legacyTimestampMapping));
+    }
+
     /**
      * Creates an Avro serialization schema with the given record row type, 
nested schema and
      * runtime converters.
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 8f72d02499d..74429b102f2 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
@@ -58,7 +59,9 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.ZoneOffset;
 import java.time.temporal.ChronoField;
 import java.util.HashMap;
 import java.util.List;
@@ -159,12 +162,17 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
 
     @Override
     public Row deserialize(byte[] message) throws IOException {
+        return deserialize(message, true);
+    }
+
+    @VisibleForTesting
+    Row deserialize(byte[] message, boolean legacyTimestampMapping) throws 
IOException {
         try {
             inputStream.setBuffer(message);
             record = datumReader.read(record, decoder);
-            return convertAvroRecordToRow(schema, typeInfo, record);
+            return convertAvroRecordToRow(schema, typeInfo, record, 
legacyTimestampMapping);
         } catch (Exception e) {
-            throw new IOException("Failed to deserialize Avro record.", e);
+            throw new RuntimeException("Failed to deserialize Avro record.", 
e);
         }
     }
 
@@ -193,19 +201,27 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
 
     // 
--------------------------------------------------------------------------------------------
 
-    private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, 
IndexedRecord record) {
+    private Row convertAvroRecordToRow(
+            Schema schema,
+            RowTypeInfo typeInfo,
+            IndexedRecord record,
+            boolean legacyTimestampMapping) {
         final List<Schema.Field> fields = schema.getFields();
         final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
         final int length = fields.size();
         final Row row = new Row(length);
         for (int i = 0; i < length; i++) {
             final Schema.Field field = fields.get(i);
-            row.setField(i, convertAvroType(field.schema(), fieldInfo[i], 
record.get(i)));
+            row.setField(
+                    i,
+                    convertAvroType(
+                            field.schema(), fieldInfo[i], record.get(i), 
legacyTimestampMapping));
         }
         return row;
     }
 
-    private Object convertAvroType(Schema schema, TypeInformation<?> info, 
Object object) {
+    private Object convertAvroType(
+            Schema schema, TypeInformation<?> info, Object object, boolean 
legacyTimestampMapping) {
         // we perform the conversion based on schema information but enriched 
with pre-computed
         // type information where useful (i.e., for arrays)
 
@@ -216,7 +232,10 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
             case RECORD:
                 if (object instanceof IndexedRecord) {
                     return convertAvroRecordToRow(
-                            schema, (RowTypeInfo) info, (IndexedRecord) 
object);
+                            schema,
+                            (RowTypeInfo) info,
+                            (IndexedRecord) object,
+                            legacyTimestampMapping);
                 }
                 throw new IllegalStateException(
                         "IndexedRecord expected but was: " + 
object.getClass());
@@ -227,11 +246,13 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
                 if (info instanceof BasicArrayTypeInfo) {
                     final TypeInformation<?> elementInfo =
                             ((BasicArrayTypeInfo<?, ?>) 
info).getComponentInfo();
-                    return convertToObjectArray(schema.getElementType(), 
elementInfo, object);
+                    return convertToObjectArray(
+                            schema.getElementType(), elementInfo, object, 
legacyTimestampMapping);
                 } else {
                     final TypeInformation<?> elementInfo =
                             ((ObjectArrayTypeInfo<?, ?>) 
info).getComponentInfo();
-                    return convertToObjectArray(schema.getElementType(), 
elementInfo, object);
+                    return convertToObjectArray(
+                            schema.getElementType(), elementInfo, object, 
legacyTimestampMapping);
                 }
             case MAP:
                 final MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) info;
@@ -243,7 +264,8 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
                             convertAvroType(
                                     schema.getValueType(),
                                     mapTypeInfo.getValueTypeInfo(),
-                                    entry.getValue()));
+                                    entry.getValue(),
+                                    legacyTimestampMapping));
                 }
                 return convertedMap;
             case UNION:
@@ -260,7 +282,7 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
                     // generic type
                     return object;
                 }
-                return convertAvroType(actualSchema, info, object);
+                return convertAvroType(actualSchema, info, object, 
legacyTimestampMapping);
             case FIXED:
                 final byte[] fixedBytes = ((GenericFixed) object).bytes();
                 if (info == Types.BIG_DEC) {
@@ -285,7 +307,11 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
             case LONG:
                 if (info == Types.SQL_TIMESTAMP) {
                     return convertToTimestamp(
-                            object, schema.getLogicalType() == 
LogicalTypes.timestampMicros());
+                            object,
+                            schema.getLogicalType() == 
LogicalTypes.timestampMicros()
+                                    || schema.getLogicalType()
+                                            == 
LogicalTypes.localTimestampMicros(),
+                            legacyTimestampMapping);
                 } else if (info == Types.SQL_TIME) {
                     return convertToTime(object);
                 }
@@ -339,7 +365,8 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
         return new Time(millis - LOCAL_TZ.getOffset(millis));
     }
 
-    private Timestamp convertToTimestamp(Object object, boolean isMicros) {
+    private Timestamp convertToTimestamp(
+            Object object, boolean isMicros, boolean legacyTimestampMapping) {
         final long millis;
         if (object instanceof Long) {
             if (isMicros) {
@@ -362,13 +389,15 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
             }
         } else if (object instanceof Instant) {
             Instant instant = (Instant) object;
-            int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli());
-
-            long seconds = instant.getEpochSecond() - offsetMillis / 1000;
-            int nanos = instant.getNano() - offsetMillis % 1000 * 1000;
-            Timestamp timestamp = new Timestamp(seconds * 1000L);
-            timestamp.setNanos(nanos);
-            return timestamp;
+            return convertToTimestamp(instant);
+        } else if (object instanceof LocalDateTime) {
+            if (legacyTimestampMapping) {
+                throw new IllegalArgumentException(
+                        "Unexpected object type for DATE logical type. 
Received: " + object);
+            } else {
+                Instant instant = ((LocalDateTime) 
object).toInstant(ZoneOffset.UTC);
+                return convertToTimestamp(instant);
+            }
         } else if (jodaConverter != null) {
             millis = jodaConverter.convertTimestamp(object);
         } else {
@@ -378,13 +407,28 @@ public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema<
         return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
     }
 
+    private Timestamp convertToTimestamp(Instant instant) {
+        int offsetMillis = LOCAL_TZ.getOffset(instant.toEpochMilli());
+
+        long seconds = instant.getEpochSecond() - offsetMillis / 1000;
+        int nanos = instant.getNano() - offsetMillis % 1000 * 1000;
+        Timestamp timestamp = new Timestamp(seconds * 1000L);
+        timestamp.setNanos(nanos);
+        return timestamp;
+    }
+
     private Object[] convertToObjectArray(
-            Schema elementSchema, TypeInformation<?> elementInfo, Object 
object) {
+            Schema elementSchema,
+            TypeInformation<?> elementInfo,
+            Object object,
+            boolean legacyTimestampMapping) {
         final List<?> list = (List<?>) object;
         final Object[] convertedArray =
                 (Object[]) Array.newInstance(elementInfo.getTypeClass(), 
list.size());
         for (int i = 0; i < list.size(); i++) {
-            convertedArray[i] = convertAvroType(elementSchema, elementInfo, 
list.get(i));
+            convertedArray[i] =
+                    convertAvroType(
+                            elementSchema, elementInfo, list.get(i), 
legacyTimestampMapping);
         }
         return convertedArray;
     }
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
index 46f2f950958..1e80306538c 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.types.Row;
@@ -60,7 +61,7 @@ import java.util.TimeZone;
 /**
  * Serialization schema that serializes {@link Row} into Avro bytes.
  *
- * <p>Serializes objects that are represented in (nested) Flink rows. It 
support types that are
+ * <p>Serializes objects that are represented in (nested) Flink rows. It 
supports types that are
  * compatible with Flink's Table & SQL API.
  *
  * <p>Note: Changes in this class need to be kept in sync with the 
corresponding runtime class
@@ -130,9 +131,15 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
 
     @Override
     public byte[] serialize(Row row) {
+        return serialize(row, true);
+    }
+
+    @VisibleForTesting
+    byte[] serialize(Row row, boolean legacyTimestampMapping) {
         try {
             // convert to record
-            final GenericRecord record = convertRowToAvroRecord(schema, row);
+            final GenericRecord record =
+                    convertRowToAvroRecord(schema, row, 
legacyTimestampMapping);
             arrayOutputStream.reset();
             datumWriter.write(record, encoder);
             encoder.flush();
@@ -162,25 +169,27 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
 
     // 
--------------------------------------------------------------------------------------------
 
-    private GenericRecord convertRowToAvroRecord(Schema schema, Row row) {
+    private GenericRecord convertRowToAvroRecord(
+            Schema schema, Row row, boolean legacyTimestampMapping) {
         final List<Schema.Field> fields = schema.getFields();
         final int length = fields.size();
         final GenericRecord record = new GenericData.Record(schema);
         for (int i = 0; i < length; i++) {
             final Schema.Field field = fields.get(i);
-            record.put(i, convertFlinkType(field.schema(), row.getField(i)));
+            record.put(
+                    i, convertFlinkType(field.schema(), row.getField(i), 
legacyTimestampMapping));
         }
         return record;
     }
 
-    private Object convertFlinkType(Schema schema, Object object) {
+    private Object convertFlinkType(Schema schema, Object object, boolean 
legacyTimestampMapping) {
         if (object == null) {
             return null;
         }
         switch (schema.getType()) {
             case RECORD:
                 if (object instanceof Row) {
-                    return convertRowToAvroRecord(schema, (Row) object);
+                    return convertRowToAvroRecord(schema, (Row) object, 
legacyTimestampMapping);
                 }
                 throw new IllegalStateException("Row expected but was: " + 
object.getClass());
             case ENUM:
@@ -191,7 +200,8 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
                 final GenericData.Array<Object> convertedArray =
                         new GenericData.Array<>(array.length, schema);
                 for (Object element : array) {
-                    convertedArray.add(convertFlinkType(elementSchema, 
element));
+                    convertedArray.add(
+                            convertFlinkType(elementSchema, element, 
legacyTimestampMapping));
                 }
                 return convertedArray;
             case MAP:
@@ -200,7 +210,10 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
                 for (Map.Entry<?, ?> entry : map.entrySet()) {
                     convertedMap.put(
                             new Utf8(entry.getKey().toString()),
-                            convertFlinkType(schema.getValueType(), 
entry.getValue()));
+                            convertFlinkType(
+                                    schema.getValueType(),
+                                    entry.getValue(),
+                                    legacyTimestampMapping));
                 }
                 return convertedMap;
             case UNION:
@@ -217,7 +230,7 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
                     // generic type
                     return object;
                 }
-                return convertFlinkType(actualSchema, object);
+                return convertFlinkType(actualSchema, object, 
legacyTimestampMapping);
             case FIXED:
                 // check for logical type
                 if (object instanceof BigDecimal) {
@@ -248,9 +261,12 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
             case LONG:
                 // check for logical type
                 if (object instanceof Timestamp) {
-                    return convertFromTimestamp(schema, (Timestamp) object);
+                    return convertFromTimestamp(schema, (Timestamp) object, 
legacyTimestampMapping);
                 } else if (object instanceof LocalDateTime) {
-                    return convertFromTimestamp(schema, 
Timestamp.valueOf((LocalDateTime) object));
+                    return convertFromTimestamp(
+                            schema,
+                            Timestamp.valueOf((LocalDateTime) object),
+                            legacyTimestampMapping);
                 } else if (object instanceof Time) {
                     return convertFromTimeMicros(schema, (Time) object);
                 }
@@ -311,13 +327,22 @@ public class AvroRowSerializationSchema implements 
SerializationSchema<Row> {
         }
     }
 
-    private long convertFromTimestamp(Schema schema, Timestamp date) {
+    private long convertFromTimestamp(
+            Schema schema, Timestamp date, boolean legacyTimestampMapping) {
         final LogicalType logicalType = schema.getLogicalType();
-        if (logicalType == LogicalTypes.timestampMillis()) {
+        if (legacyTimestampMapping
+                && (logicalType == LogicalTypes.localTimestampMillis()
+                        || logicalType == 
LogicalTypes.localTimestampMicros())) {
+            throw new RuntimeException("Unsupported local timestamp type.");
+        }
+
+        if (logicalType == LogicalTypes.timestampMillis()
+                || logicalType == LogicalTypes.localTimestampMillis()) {
             // adopted from Apache Calcite
             final long time = date.getTime();
             return time + (long) LOCAL_TZ.getOffset(time);
-        } else if (logicalType == LogicalTypes.timestampMicros()) {
+        } else if (logicalType == LogicalTypes.timestampMicros()
+                || logicalType == LogicalTypes.localTimestampMicros()) {
             long millis = date.getTime();
             long micros = millis * 1000 + (date.getNanos() % 1_000_000 / 1000);
             long offset = LOCAL_TZ.getOffset(millis) * 1000L;
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index 7f2ca46f1e5..9c63c56c4dc 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -42,6 +42,7 @@ import java.lang.reflect.Array;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoField;
 import java.util.HashMap;
@@ -68,10 +69,15 @@ public class AvroToRowDataConverters {
     // 
-------------------------------------------------------------------------------------
 
     public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+        return createRowConverter(rowType, true);
+    }
+
+    public static AvroToRowDataConverter createRowConverter(
+            RowType rowType, boolean legacyTimestampMapping) {
         final AvroToRowDataConverter[] fieldConverters =
                 rowType.getFields().stream()
                         .map(RowType.RowField::getType)
-                        .map(AvroToRowDataConverters::createNullableConverter)
+                        .map(type -> createNullableConverter(type, 
legacyTimestampMapping))
                         .toArray(AvroToRowDataConverter[]::new);
         final int arity = rowType.getFieldCount();
 
@@ -88,8 +94,9 @@ public class AvroToRowDataConverters {
     }
 
     /** Creates a runtime converter which is null safe. */
-    private static AvroToRowDataConverter createNullableConverter(LogicalType 
type) {
-        final AvroToRowDataConverter converter = createConverter(type);
+    private static AvroToRowDataConverter createNullableConverter(
+            LogicalType type, boolean legacyTimestampMapping) {
+        final AvroToRowDataConverter converter = createConverter(type, 
legacyTimestampMapping);
         return avroObject -> {
             if (avroObject == null) {
                 return null;
@@ -99,7 +106,8 @@ public class AvroToRowDataConverters {
     }
 
     /** Creates a runtime converter which assuming input object is not null. */
-    private static AvroToRowDataConverter createConverter(LogicalType type) {
+    private static AvroToRowDataConverter createConverter(
+            LogicalType type, boolean legacyTimestampMapping) {
         switch (type.getTypeRoot()) {
             case NULL:
                 return avroObject -> null;
@@ -121,6 +129,12 @@ public class AvroToRowDataConverters {
                 return AvroToRowDataConverters::convertToTime;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 return AvroToRowDataConverters::convertToTimestamp;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (legacyTimestampMapping) {
+                    throw new UnsupportedOperationException("Unsupported type: 
" + type);
+                } else {
+                    return AvroToRowDataConverters::convertToTimestamp;
+                }
             case CHAR:
             case VARCHAR:
                 return avroObject -> 
StringData.fromString(avroObject.toString());
@@ -130,12 +144,12 @@ public class AvroToRowDataConverters {
             case DECIMAL:
                 return createDecimalConverter((DecimalType) type);
             case ARRAY:
-                return createArrayConverter((ArrayType) type);
+                return createArrayConverter((ArrayType) type, 
legacyTimestampMapping);
             case ROW:
                 return createRowConverter((RowType) type);
             case MAP:
             case MULTISET:
-                return createMapConverter(type);
+                return createMapConverter(type, legacyTimestampMapping);
             case RAW:
             default:
                 throw new UnsupportedOperationException("Unsupported type: " + 
type);
@@ -160,9 +174,10 @@ public class AvroToRowDataConverters {
         };
     }
 
-    private static AvroToRowDataConverter createArrayConverter(ArrayType 
arrayType) {
+    private static AvroToRowDataConverter createArrayConverter(
+            ArrayType arrayType, boolean legacyTimestampMapping) {
         final AvroToRowDataConverter elementConverter =
-                createNullableConverter(arrayType.getElementType());
+                createNullableConverter(arrayType.getElementType(), 
legacyTimestampMapping);
         final Class<?> elementClass =
                 
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
 
@@ -177,11 +192,12 @@ public class AvroToRowDataConverters {
         };
     }
 
-    private static AvroToRowDataConverter createMapConverter(LogicalType type) 
{
+    private static AvroToRowDataConverter createMapConverter(
+            LogicalType type, boolean legacyTimestampMapping) {
         final AvroToRowDataConverter keyConverter =
-                createConverter(DataTypes.STRING().getLogicalType());
+                createConverter(DataTypes.STRING().getLogicalType(), 
legacyTimestampMapping);
         final AvroToRowDataConverter valueConverter =
-                createNullableConverter(extractValueTypeToAvroMap(type));
+                createNullableConverter(extractValueTypeToAvroMap(type), 
legacyTimestampMapping);
 
         return avroObject -> {
             final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -201,6 +217,8 @@ public class AvroToRowDataConverters {
             millis = (Long) object;
         } else if (object instanceof Instant) {
             millis = ((Instant) object).toEpochMilli();
+        } else if (object instanceof LocalDateTime) {
+            return TimestampData.fromLocalDateTime((LocalDateTime) object);
         } else {
             JodaConverter jodaConverter = JodaConverter.getConverter();
             if (jodaConverter != null) {
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index 9d8d9425aff..af7a936b270 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +72,11 @@ public class RowDataToAvroConverters {
      * Flink Table & SQL internal data structures to corresponding Avro data 
structures.
      */
     public static RowDataToAvroConverter createConverter(LogicalType type) {
+        return createConverter(type, true);
+    }
+
+    public static RowDataToAvroConverter createConverter(
+            LogicalType type, boolean legacyTimestampMapping) {
         final RowDataToAvroConverter converter;
         switch (type.getTypeRoot()) {
             case NULL:
@@ -150,15 +156,45 @@ public class RowDataToAvroConverters {
                         };
                 break;
             case TIMESTAMP_WITHOUT_TIME_ZONE:
-                converter =
-                        new RowDataToAvroConverter() {
-                            private static final long serialVersionUID = 1L;
+                if (legacyTimestampMapping) {
+                    converter =
+                            new RowDataToAvroConverter() {
+                                private static final long serialVersionUID = 
1L;
 
-                            @Override
-                            public Object convert(Schema schema, Object 
object) {
-                                return ((TimestampData) 
object).toInstant().toEpochMilli();
-                            }
-                        };
+                                @Override
+                                public Object convert(Schema schema, Object 
object) {
+                                    return ((TimestampData) 
object).toInstant().toEpochMilli();
+                                }
+                            };
+                } else {
+                    converter =
+                            new RowDataToAvroConverter() {
+                                private static final long serialVersionUID = 
1L;
+
+                                @Override
+                                public Object convert(Schema schema, Object 
object) {
+                                    return ((TimestampData) object)
+                                            .toLocalDateTime()
+                                            .toInstant(ZoneOffset.UTC)
+                                            .toEpochMilli();
+                                }
+                            };
+                }
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (legacyTimestampMapping) {
+                    throw new UnsupportedOperationException("Unsupported type: 
" + type);
+                } else {
+                    converter =
+                            new RowDataToAvroConverter() {
+                                private static final long serialVersionUID = 
1L;
+
+                                @Override
+                                public Object convert(Schema schema, Object 
object) {
+                                    return ((TimestampData) 
object).toInstant().toEpochMilli();
+                                }
+                            };
+                }
                 break;
             case DECIMAL:
                 converter =
@@ -172,14 +208,14 @@ public class RowDataToAvroConverters {
                         };
                 break;
             case ARRAY:
-                converter = createArrayConverter((ArrayType) type);
+                converter = createArrayConverter((ArrayType) type, 
legacyTimestampMapping);
                 break;
             case ROW:
-                converter = createRowConverter((RowType) type);
+                converter = createRowConverter((RowType) type, 
legacyTimestampMapping);
                 break;
             case MAP:
             case MULTISET:
-                converter = createMapConverter(type);
+                converter = createMapConverter(type, legacyTimestampMapping);
                 break;
             case RAW:
             default:
@@ -217,10 +253,11 @@ public class RowDataToAvroConverters {
         };
     }
 
-    private static RowDataToAvroConverter createRowConverter(RowType rowType) {
+    private static RowDataToAvroConverter createRowConverter(
+            RowType rowType, boolean legacyTimestampMapping) {
         final RowDataToAvroConverter[] fieldConverters =
                 rowType.getChildren().stream()
-                        .map(RowDataToAvroConverters::createConverter)
+                        .map(legacyType -> createConverter(legacyType, 
legacyTimestampMapping))
                         .toArray(RowDataToAvroConverter[]::new);
         final LogicalType[] fieldTypes =
                 rowType.getFields().stream()
@@ -259,10 +296,12 @@ public class RowDataToAvroConverters {
         };
     }
 
-    private static RowDataToAvroConverter createArrayConverter(ArrayType 
arrayType) {
+    private static RowDataToAvroConverter createArrayConverter(
+            ArrayType arrayType, boolean legacyTimestampMapping) {
         LogicalType elementType = arrayType.getElementType();
         final ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(elementType);
-        final RowDataToAvroConverter elementConverter = 
createConverter(arrayType.getElementType());
+        final RowDataToAvroConverter elementConverter =
+                createConverter(arrayType.getElementType(), 
legacyTimestampMapping);
 
         return new RowDataToAvroConverter() {
             private static final long serialVersionUID = 1L;
@@ -282,10 +321,12 @@ public class RowDataToAvroConverters {
         };
     }
 
-    private static RowDataToAvroConverter createMapConverter(LogicalType type) 
{
+    private static RowDataToAvroConverter createMapConverter(
+            LogicalType type, boolean legacyTimestampMapping) {
         LogicalType valueType = extractValueTypeToAvroMap(type);
         final ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(valueType);
-        final RowDataToAvroConverter valueConverter = 
createConverter(valueType);
+        final RowDataToAvroConverter valueConverter =
+                createConverter(valueType, legacyTimestampMapping);
 
         return new RowDataToAvroConverter() {
             private static final long serialVersionUID = 1L;
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 042a343af12..178056e145a 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.MapType;
@@ -73,10 +74,24 @@ public class AvroSchemaConverter {
     @SuppressWarnings("unchecked")
     public static <T extends SpecificRecord> TypeInformation<Row> 
convertToTypeInfo(
             Class<T> avroClass) {
+        return convertToTypeInfo(avroClass, true);
+    }
+
+    /**
+     * Converts an Avro class into a nested row structure with deterministic 
field order and data
+     * types that are compatible with Flink's Table & SQL API.
+     *
+     * @param avroClass Avro specific record that contains schema information
+     * @param legacyTimestampMapping legacy mapping of timestamp types
+     * @return type information matching the schema
+     */
+    @SuppressWarnings("unchecked")
+    public static <T extends SpecificRecord> TypeInformation<Row> 
convertToTypeInfo(
+            Class<T> avroClass, boolean legacyTimestampMapping) {
         Preconditions.checkNotNull(avroClass, "Avro specific record class must 
not be null.");
         // determine schema to retrieve deterministic field order
         final Schema schema = SpecificData.get().getSchema(avroClass);
-        return (TypeInformation<Row>) convertToTypeInfo(schema);
+        return (TypeInformation<Row>) convertToTypeInfo(schema, true);
     }
 
     /**
@@ -88,6 +103,20 @@ public class AvroSchemaConverter {
      */
     @SuppressWarnings("unchecked")
     public static <T> TypeInformation<T> convertToTypeInfo(String 
avroSchemaString) {
+        return convertToTypeInfo(avroSchemaString, true);
+    }
+
+    /**
+     * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+     * data types that are compatible with Flink's Table & SQL API.
+     *
+     * @param avroSchemaString Avro schema definition string
+     * @param legacyTimestampMapping legacy mapping of timestamp types
+     * @return type information matching the schema
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> TypeInformation<T> convertToTypeInfo(
+            String avroSchemaString, boolean legacyTimestampMapping) {
         Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be 
null.");
         final Schema schema;
         try {
@@ -95,10 +124,11 @@ public class AvroSchemaConverter {
         } catch (SchemaParseException e) {
             throw new IllegalArgumentException("Could not parse Avro schema 
string.", e);
         }
-        return (TypeInformation<T>) convertToTypeInfo(schema);
+        return (TypeInformation<T>) convertToTypeInfo(schema, 
legacyTimestampMapping);
     }
 
-    private static TypeInformation<?> convertToTypeInfo(Schema schema) {
+    private static TypeInformation<?> convertToTypeInfo(
+            Schema schema, boolean legacyTimestampMapping) {
         switch (schema.getType()) {
             case RECORD:
                 final List<Schema.Field> fields = schema.getFields();
@@ -107,7 +137,7 @@ public class AvroSchemaConverter {
                 final String[] names = new String[fields.size()];
                 for (int i = 0; i < fields.size(); i++) {
                     final Schema.Field field = fields.get(i);
-                    types[i] = convertToTypeInfo(field.schema());
+                    types[i] = convertToTypeInfo(field.schema(), 
legacyTimestampMapping);
                     names[i] = field.name();
                 }
                 return Types.ROW_NAMED(names, types);
@@ -115,9 +145,12 @@ public class AvroSchemaConverter {
                 return Types.STRING;
             case ARRAY:
                 // result type might either be ObjectArrayTypeInfo or 
BasicArrayTypeInfo for Strings
-                return 
Types.OBJECT_ARRAY(convertToTypeInfo(schema.getElementType()));
+                return Types.OBJECT_ARRAY(
+                        convertToTypeInfo(schema.getElementType(), 
legacyTimestampMapping));
             case MAP:
-                return Types.MAP(Types.STRING, 
convertToTypeInfo(schema.getValueType()));
+                return Types.MAP(
+                        Types.STRING,
+                        convertToTypeInfo(schema.getValueType(), 
legacyTimestampMapping));
             case UNION:
                 final Schema actualSchema;
                 if (schema.getTypes().size() == 2
@@ -132,7 +165,7 @@ public class AvroSchemaConverter {
                     // use Kryo for serialization
                     return Types.GENERIC(Object.class);
                 }
-                return convertToTypeInfo(actualSchema);
+                return convertToTypeInfo(actualSchema, legacyTimestampMapping);
             case FIXED:
                 // logical decimal type
                 if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
@@ -159,12 +192,26 @@ public class AvroSchemaConverter {
                 }
                 return Types.INT;
             case LONG:
-                // logical timestamp type
-                if (schema.getLogicalType() == LogicalTypes.timestampMillis()
-                        || schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
-                    return Types.SQL_TIMESTAMP;
-                } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
-                    return Types.SQL_TIME;
+                if (legacyTimestampMapping) {
+                    if (schema.getLogicalType() == 
LogicalTypes.timestampMillis()
+                            || schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                        return Types.SQL_TIMESTAMP;
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()
+                            || schema.getLogicalType() == 
LogicalTypes.timeMillis()) {
+                        return Types.SQL_TIME;
+                    }
+                } else {
+                    // Avro logical timestamp types to Flink DataStream 
timestamp types
+                    if (schema.getLogicalType() == 
LogicalTypes.timestampMillis()
+                            || schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                        return Types.INSTANT;
+                    } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()
+                            || schema.getLogicalType() == 
LogicalTypes.localTimestampMicros()) {
+                        return Types.LOCAL_DATE_TIME;
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()
+                            || schema.getLogicalType() == 
LogicalTypes.timeMillis()) {
+                        return Types.SQL_TIME;
+                    }
                 }
                 return Types.LONG;
             case FLOAT:
@@ -187,6 +234,19 @@ public class AvroSchemaConverter {
      * @return data type matching the schema
      */
     public static DataType convertToDataType(String avroSchemaString) {
+        return convertToDataType(avroSchemaString, true);
+    }
+
+    /**
+     * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+     * data types that are compatible with Flink's Table & SQL API.
+     *
+     * @param avroSchemaString Avro schema definition string
+     * @param legacyTimestampMapping legacy mapping of local timestamps
+     * @return data type matching the schema
+     */
+    public static DataType convertToDataType(
+            String avroSchemaString, boolean legacyTimestampMapping) {
         Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be 
null.");
         final Schema schema;
         try {
@@ -194,10 +254,10 @@ public class AvroSchemaConverter {
         } catch (SchemaParseException e) {
             throw new IllegalArgumentException("Could not parse Avro schema 
string.", e);
         }
-        return convertToDataType(schema);
+        return convertToDataType(schema, legacyTimestampMapping);
     }
 
-    private static DataType convertToDataType(Schema schema) {
+    private static DataType convertToDataType(Schema schema, boolean 
legacyMapping) {
         switch (schema.getType()) {
             case RECORD:
                 final List<Schema.Field> schemaFields = schema.getFields();
@@ -205,17 +265,20 @@ public class AvroSchemaConverter {
                 final DataTypes.Field[] fields = new 
DataTypes.Field[schemaFields.size()];
                 for (int i = 0; i < schemaFields.size(); i++) {
                     final Schema.Field field = schemaFields.get(i);
-                    fields[i] = DataTypes.FIELD(field.name(), 
convertToDataType(field.schema()));
+                    fields[i] =
+                            DataTypes.FIELD(
+                                    field.name(), 
convertToDataType(field.schema(), legacyMapping));
                 }
                 return DataTypes.ROW(fields).notNull();
             case ENUM:
                 return DataTypes.STRING().notNull();
             case ARRAY:
-                return 
DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
+                return 
DataTypes.ARRAY(convertToDataType(schema.getElementType(), legacyMapping))
+                        .notNull();
             case MAP:
                 return DataTypes.MAP(
                                 DataTypes.STRING().notNull(),
-                                convertToDataType(schema.getValueType()))
+                                convertToDataType(schema.getValueType(), 
legacyMapping))
                         .notNull();
             case UNION:
                 final Schema actualSchema;
@@ -236,7 +299,7 @@ public class AvroSchemaConverter {
                     return new AtomicDataType(
                             new TypeInformationRawType<>(false, 
Types.GENERIC(Object.class)));
                 }
-                DataType converted = convertToDataType(actualSchema);
+                DataType converted = convertToDataType(actualSchema, 
legacyMapping);
                 return nullable ? converted.nullable() : converted;
             case FIXED:
                 // logical decimal type
@@ -270,16 +333,34 @@ public class AvroSchemaConverter {
                 }
                 return DataTypes.INT().notNull();
             case LONG:
-                // logical timestamp type
-                if (schema.getLogicalType() == LogicalTypes.timestampMillis()) 
{
-                    return DataTypes.TIMESTAMP(3).notNull();
-                } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
-                    return DataTypes.TIMESTAMP(6).notNull();
-                } else if (schema.getLogicalType() == 
LogicalTypes.timeMillis()) {
-                    return DataTypes.TIME(3).notNull();
-                } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
-                    return DataTypes.TIME(6).notNull();
+                if (legacyMapping) {
+                    // Avro logical timestamp types to Flink SQL timestamp 
types
+                    if (schema.getLogicalType() == 
LogicalTypes.timestampMillis()) {
+                        return DataTypes.TIMESTAMP(3).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                        return DataTypes.TIMESTAMP(6).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMillis()) {
+                        return DataTypes.TIME(3).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                        return DataTypes.TIME(6).notNull();
+                    }
+                } else {
+                    // Avro logical timestamp types to Flink SQL timestamp 
types
+                    if (schema.getLogicalType() == 
LogicalTypes.timestampMillis()) {
+                        return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+                        return 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMillis()) {
+                        return DataTypes.TIME(3).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.timeMicros()) {
+                        return DataTypes.TIME(6).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()) {
+                        return DataTypes.TIMESTAMP(3).notNull();
+                    } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMicros()) {
+                        return DataTypes.TIMESTAMP(6).notNull();
+                    }
                 }
+
                 return DataTypes.BIGINT().notNull();
             case FLOAT:
                 return DataTypes.FLOAT().notNull();
@@ -303,7 +384,22 @@ public class AvroSchemaConverter {
      * @return Avro's {@link Schema} matching this logical type.
      */
     public static Schema convertToSchema(LogicalType schema) {
-        return convertToSchema(schema, 
"org.apache.flink.avro.generated.record");
+        return convertToSchema(schema, true);
+    }
+
+    /**
+     * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+     *
+     * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+     *
+     * @param schema the schema type, usually it should be the top level 
record type, e.g. not a
+     *     nested type
+     * @param legacyTimestampMapping whether to use the legacy timestamp 
mapping
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(LogicalType schema, boolean 
legacyTimestampMapping) {
+        return convertToSchema(
+                schema, "org.apache.flink.avro.generated.record", 
legacyTimestampMapping);
     }
 
     /**
@@ -317,6 +413,22 @@ public class AvroSchemaConverter {
      * @return Avro's {@link Schema} matching this logical type.
      */
     public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+        return convertToSchema(logicalType, rowName, true);
+    }
+
+    /**
+     * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+     *
+     * <p>The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+     * schema. Nested record type that only differs with type name is still 
compatible.
+     *
+     * @param logicalType logical type
+     * @param rowName the record name
+     * @param legacyTimestampMapping whether to use legal timestamp mapping
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(
+            LogicalType logicalType, String rowName, boolean 
legacyTimestampMapping) {
         int precision;
         boolean nullable = logicalType.isNullable();
         switch (logicalType.getTypeRoot()) {
@@ -352,17 +464,53 @@ public class AvroSchemaConverter {
                 final TimestampType timestampType = (TimestampType) 
logicalType;
                 precision = timestampType.getPrecision();
                 org.apache.avro.LogicalType avroLogicalType;
-                if (precision <= 3) {
-                    avroLogicalType = LogicalTypes.timestampMillis();
+                if (legacyTimestampMapping) {
+                    if (precision <= 3) {
+                        avroLogicalType = LogicalTypes.timestampMillis();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Avro does not support TIMESTAMP type "
+                                        + "with precision: "
+                                        + precision
+                                        + ", it only supports precision less 
than 3.");
+                    }
                 } else {
-                    throw new IllegalArgumentException(
-                            "Avro does not support TIMESTAMP type "
-                                    + "with precision: "
-                                    + precision
-                                    + ", it only supports precision less than 
3.");
+                    if (precision <= 3) {
+                        avroLogicalType = LogicalTypes.localTimestampMillis();
+                    } else if (precision <= 6) {
+                        avroLogicalType = LogicalTypes.localTimestampMicros();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Avro does not support LOCAL TIMESTAMP type "
+                                        + "with precision: "
+                                        + precision
+                                        + ", it only supports precision less 
than 6.");
+                    }
                 }
                 Schema timestamp = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
                 return nullable ? nullableSchema(timestamp) : timestamp;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (legacyTimestampMapping) {
+                    throw new UnsupportedOperationException(
+                            "Unsupported to derive Schema for type: " + 
logicalType);
+                } else {
+                    final LocalZonedTimestampType localZonedTimestampType =
+                            (LocalZonedTimestampType) logicalType;
+                    precision = localZonedTimestampType.getPrecision();
+                    if (precision <= 3) {
+                        avroLogicalType = LogicalTypes.timestampMillis();
+                    } else if (precision <= 6) {
+                        avroLogicalType = LogicalTypes.timestampMicros();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Avro does not support TIMESTAMP type "
+                                        + "with precision: "
+                                        + precision
+                                        + ", it only supports precision less 
than 6.");
+                    }
+                    timestamp = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                    return nullable ? nullableSchema(timestamp) : timestamp;
+                }
             case DATE:
                 // use int to represents Date
                 Schema date = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
@@ -397,7 +545,11 @@ public class AvroSchemaConverter {
                     LogicalType fieldType = rowType.getTypeAt(i);
                     SchemaBuilder.GenericDefault<Schema> fieldBuilder =
                             builder.name(fieldName)
-                                    .type(convertToSchema(fieldType, rowName + 
"_" + fieldName));
+                                    .type(
+                                            convertToSchema(
+                                                    fieldType,
+                                                    rowName + "_" + fieldName,
+                                                    legacyTimestampMapping));
 
                     if (fieldType.isNullable()) {
                         builder = fieldBuilder.withDefault(null);
@@ -424,7 +576,6 @@ public class AvroSchemaConverter {
                                 
.items(convertToSchema(arrayType.getElementType(), rowName));
                 return nullable ? nullableSchema(array) : array;
             case RAW:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
             default:
                 throw new UnsupportedOperationException(
                         "Unsupported to derive Schema for type: " + 
logicalType);
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
index fef28062e63..0b9d572a13b 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
@@ -33,13 +33,16 @@ import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContex
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the {@link AvroFormatFactory}. */
 class AvroFormatFactoryTest {
@@ -48,11 +51,25 @@ class AvroFormatFactoryTest {
             ResolvedSchema.of(
                     Column.physical("a", DataTypes.STRING()),
                     Column.physical("b", DataTypes.INT()),
-                    Column.physical("c", DataTypes.BOOLEAN()));
+                    Column.physical("c", DataTypes.BOOLEAN()),
+                    Column.physical("d", DataTypes.TIMESTAMP(3)));
+
+    private static final ResolvedSchema NEW_SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("a", DataTypes.STRING()),
+                    Column.physical("b", DataTypes.INT()),
+                    Column.physical("c", DataTypes.BOOLEAN()),
+                    Column.physical("d", DataTypes.TIMESTAMP(3)),
+                    Column.physical("e", DataTypes.TIMESTAMP(6)),
+                    Column.physical("f", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                    Column.physical("g", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)));
 
     private static final RowType ROW_TYPE =
             (RowType) SCHEMA.toPhysicalRowDataType().getLogicalType();
 
+    private static final RowType NEW_ROW_TYPE =
+            (RowType) NEW_SCHEMA.toPhysicalRowDataType().getLogicalType();
+
     @ParameterizedTest
     @EnumSource(AvroEncoding.class)
     void testSeDeSchema(AvroEncoding encoding) {
@@ -60,7 +77,7 @@ class AvroFormatFactoryTest {
                 new AvroRowDataDeserializationSchema(
                         ROW_TYPE, InternalTypeInfo.of(ROW_TYPE), encoding);
 
-        final Map<String, String> options = getAllOptions();
+        final Map<String, String> options = getAllOptions(true);
 
         final DynamicTableSource actualSource = 
FactoryMocks.createTableSource(SCHEMA, options);
         
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
@@ -87,16 +104,87 @@ class AvroFormatFactoryTest {
         assertThat(actualSer).isEqualTo(expectedSer);
     }
 
+    @Test
+    void testOldSeDeNewSchema() {
+        assertThatThrownBy(
+                        () -> {
+                            new AvroRowDataDeserializationSchema(
+                                    NEW_ROW_TYPE, 
InternalTypeInfo.of(NEW_ROW_TYPE));
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+
+        assertThatThrownBy(
+                        () -> {
+                            new AvroRowDataSerializationSchema(NEW_ROW_TYPE);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+    }
+
+    @Test
+    void testNewSeDeNewSchema() {
+        testSeDeSchema(NEW_ROW_TYPE, NEW_SCHEMA, false);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSeDeSchema(boolean legacyTimestampMapping) {
+        testSeDeSchema(ROW_TYPE, SCHEMA, legacyTimestampMapping);
+    }
+
+    void testSeDeSchema(RowType rowType, ResolvedSchema schema, boolean 
legacyTimestampMapping) {
+        final AvroRowDataDeserializationSchema expectedDeser =
+                new AvroRowDataDeserializationSchema(
+                        rowType,
+                        InternalTypeInfo.of(rowType),
+                        AvroEncoding.BINARY,
+                        legacyTimestampMapping);
+
+        final Map<String, String> options = 
getAllOptions(legacyTimestampMapping);
+
+        final DynamicTableSource actualSource = 
FactoryMocks.createTableSource(schema, options);
+        
assertThat(actualSource).isInstanceOf(TestDynamicTableFactory.DynamicTableSourceMock.class);
+        TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+                (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+        DeserializationSchema<RowData> actualDeser =
+                scanSourceMock.valueFormat.createRuntimeDecoder(
+                        ScanRuntimeProviderContext.INSTANCE, 
schema.toPhysicalRowDataType());
+
+        assertThat(actualDeser).isEqualTo(expectedDeser);
+
+        final AvroRowDataSerializationSchema expectedSer =
+                new AvroRowDataSerializationSchema(
+                        rowType, AvroEncoding.BINARY, legacyTimestampMapping);
+
+        final DynamicTableSink actualSink = 
FactoryMocks.createTableSink(schema, options);
+        
assertThat(actualSink).isInstanceOf(TestDynamicTableFactory.DynamicTableSinkMock.class);
+        TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+                (TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+        SerializationSchema<RowData> actualSer =
+                sinkMock.valueFormat.createRuntimeEncoder(null, 
schema.toPhysicalRowDataType());
+
+        assertThat(actualSer).isEqualTo(expectedSer);
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
 
-    private Map<String, String> getAllOptions() {
+    private Map<String, String> getAllOptions(boolean legacyTimestampMapping) {
         final Map<String, String> options = new HashMap<>();
         options.put("connector", TestDynamicTableFactory.IDENTIFIER);
         options.put("target", "MyTarget");
         options.put("buffer-size", "1000");
 
+        if (!legacyTimestampMapping) {
+            options.put("avro.timestamp_mapping.legacy", "false");
+        }
+
         options.put("format", AvroFormatFactory.IDENTIFIER);
         return options;
     }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 561526112a2..84f9994f666 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -19,16 +19,21 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
 import org.apache.flink.formats.avro.generated.LogicalTimeRecord;
+import org.apache.flink.formats.avro.generated.Timestamps;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.AtomicDataType;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.Schema;
@@ -39,7 +44,10 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -84,7 +92,7 @@ class AvroRowDataDeSerializationSchemaTest {
     void testDeserializeNullRow(AvroEncoding encoding) throws Exception {
         final DataType dataType = ROW(FIELD("bool", BOOLEAN())).nullable();
         AvroRowDataDeserializationSchema deserializationSchema =
-                createDeserializationSchema(dataType, encoding);
+                createDeserializationSchema(dataType, encoding, true);
 
         assertThat(deserializationSchema.deserialize(null)).isNull();
     }
@@ -166,9 +174,9 @@ class AvroRowDataDeSerializationSchemaTest {
         record.put(18, map2);
 
         AvroRowDataSerializationSchema serializationSchema =
-                createSerializationSchema(dataType, encoding);
+                createSerializationSchema(dataType, encoding, true);
         AvroRowDataDeserializationSchema deserializationSchema =
-                createDeserializationSchema(dataType, encoding);
+                createDeserializationSchema(dataType, encoding, true);
 
         ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
         GenericDatumWriter<IndexedRecord> datumWriter = new 
GenericDatumWriter<>(schema);
@@ -292,9 +300,9 @@ class AvroRowDataDeSerializationSchemaTest {
                                 FIELD("type_time_millis", TIME(3).notNull()))
                         .notNull();
         AvroRowDataSerializationSchema serializationSchema =
-                createSerializationSchema(dataType, encoding);
+                createSerializationSchema(dataType, encoding, true);
         AvroRowDataDeserializationSchema deserializationSchema =
-                createDeserializationSchema(dataType, encoding);
+                createDeserializationSchema(dataType, encoding, true);
 
         RowData rowData = deserializationSchema.deserialize(input);
         byte[] output = serializationSchema.serialize(rowData);
@@ -319,7 +327,7 @@ class AvroRowDataDeSerializationSchemaTest {
     void testSerializationWithTypesMismatch(AvroEncoding encoding) throws 
Exception {
         AvroRowDataSerializationSchema serializationSchema =
                 createSerializationSchema(
-                        ROW(FIELD("f0", INT()), FIELD("f1", 
STRING())).notNull(), encoding);
+                        ROW(FIELD("f0", INT()), FIELD("f1", 
STRING())).notNull(), encoding, true);
         GenericRowData rowData = new GenericRowData(2);
         rowData.setField(0, 1);
         rowData.setField(1, 2); // This should be a STRING
@@ -330,23 +338,90 @@ class AvroRowDataDeSerializationSchemaTest {
                 .hasStackTraceContaining("Fail to serialize at field: f1");
     }
 
+    @Test
+    void testTimestampTypeLegacyMapping() throws Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        SpecificDatumWriter<Timestamps> datumWriter = new 
SpecificDatumWriter<>(Timestamps.class);
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        Encoder encoder = 
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+        datumWriter.write((Timestamps) testData.f1, encoder);
+        encoder.flush();
+
+        DataType dataType =
+                AvroSchemaConverter.convertToDataType(
+                        
SpecificData.get().getSchema(Timestamps.class).toString());
+
+        // Timestamp with local timezone is converted to BigIntType
+        assertThat(dataType.getChildren().get(2))
+                .isEqualTo(new AtomicDataType(new BigIntType(false)));
+        assertThat(dataType.getChildren().get(3))
+                .isEqualTo(new AtomicDataType(new BigIntType(false)));
+
+        assertThatThrownBy(() -> createSerializationSchema(dataType, 
AvroEncoding.BINARY, true))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+
+        assertThatThrownBy(() -> createDeserializationSchema(dataType, 
AvroEncoding.BINARY, true))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+    }
+
+    @Test
+    void testTimestampTypeNewMapping() throws Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        SpecificDatumWriter<Timestamps> datumWriter = new 
SpecificDatumWriter<>(Timestamps.class);
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        Encoder encoder = 
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
+        datumWriter.write((Timestamps) testData.f1, encoder);
+        encoder.flush();
+        byte[] input = byteArrayOutputStream.toByteArray();
+
+        DataType dataType =
+                AvroSchemaConverter.convertToDataType(
+                        
SpecificData.get().getSchema(Timestamps.class).toString(), false);
+
+        AvroRowDataSerializationSchema serializationSchema =
+                createSerializationSchema(dataType, AvroEncoding.BINARY, 
false);
+        AvroRowDataDeserializationSchema deserializationSchema =
+                createDeserializationSchema(dataType, AvroEncoding.BINARY, 
false);
+
+        RowData rowData = deserializationSchema.deserialize(input);
+        byte[] output = serializationSchema.serialize(rowData);
+        RowData rowData2 = deserializationSchema.deserialize(output);
+        assertThat(rowData2).isEqualTo(rowData);
+
+        assertThat(rowData.getTimestamp(2, 3).toLocalDateTime().toString())
+                .isEqualTo("2014-03-01T12:12:12.321");
+        assertThat(rowData.getTimestamp(3, 6).toLocalDateTime().toString())
+                .isEqualTo("1970-01-01T00:02:03.456");
+    }
+
     private AvroRowDataSerializationSchema createSerializationSchema(
-            DataType dataType, AvroEncoding encoding) throws Exception {
+            DataType dataType, AvroEncoding encoding, boolean 
legacyTimestampMapping)
+            throws Exception {
         final RowType rowType = (RowType) dataType.getLogicalType();
 
         AvroRowDataSerializationSchema serializationSchema =
-                new AvroRowDataSerializationSchema(rowType, encoding);
+                new AvroRowDataSerializationSchema(rowType, encoding, 
legacyTimestampMapping);
         serializationSchema.open(null);
         return serializationSchema;
     }
 
     private AvroRowDataDeserializationSchema createDeserializationSchema(
-            DataType dataType, AvroEncoding encoding) throws Exception {
+            DataType dataType, AvroEncoding encoding, boolean 
legacyTimestampMapping)
+            throws Exception {
         final RowType rowType = (RowType) dataType.getLogicalType();
         final TypeInformation<RowData> typeInfo = InternalTypeInfo.of(rowType);
 
         AvroRowDataDeserializationSchema deserializationSchema =
-                new AvroRowDataDeserializationSchema(rowType, typeInfo, 
encoding);
+                new AvroRowDataDeserializationSchema(
+                        rowType, typeInfo, encoding, legacyTimestampMapping);
         deserializationSchema.open(null);
         return deserializationSchema;
     }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
index c7bb6c52b40..1349ca2e03e 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.formats.avro.utils.AvroTestUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
@@ -26,33 +27,22 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificRecord;
-import org.junit.jupiter.api.Test;
+import org.junit.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for the Avro serialization and deserialization schema. */
 class AvroRowDeSerializationSchemaTest {
 
-    @Test
-    void testSpecificSerializeDeserializeFromClass() throws IOException {
-        final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
-                AvroTestUtils.getSpecificTestData();
-
-        final AvroRowSerializationSchema serializationSchema =
-                new AvroRowSerializationSchema(testData.f0);
-        final AvroRowDeserializationSchema deserializationSchema =
-                new AvroRowDeserializationSchema(testData.f0);
-
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
-    }
-
-    @Test
-    void testSpecificSerializeDeserializeFromSchema() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSpecificSerializeDeserializeFromSchema(boolean 
legacyTimestampMapping)
+            throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -62,14 +52,20 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(schemaString);
 
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serializationSchema.serialize(testData.f2);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f2);
+        } else {
+            final byte[] bytes = serializationSchema.serialize(testData.f2, 
false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f2);
+        }
     }
 
-    @Test
-    void testGenericSerializeDeserialize() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testGenericSerializeDeserialize(boolean legacyTimestampMapping) 
throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -77,14 +73,21 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(testData.f2.toString());
 
-        final byte[] bytes = serializationSchema.serialize(testData.f1);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f1);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serializationSchema.serialize(testData.f1);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f1);
+        } else {
+            final byte[] bytes = serializationSchema.serialize(testData.f1, 
false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f1);
+        }
     }
 
-    @Test
-    void testSpecificSerializeFromClassSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSpecificSerializeFromClassSeveralTimes(boolean 
legacyTimestampMapping)
+            throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
 
@@ -93,16 +96,25 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(testData.f0);
 
-        serializationSchema.serialize(testData.f2);
-        serializationSchema.serialize(testData.f2);
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
+        if (legacyTimestampMapping) {
+            serializationSchema.serialize(testData.f2);
+            serializationSchema.serialize(testData.f2);
+            final byte[] bytes = serializationSchema.serialize(testData.f2);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f2);
+        } else {
+            serializationSchema.serialize(testData.f2, false);
+            serializationSchema.serialize(testData.f2, false);
+            final byte[] bytes = serializationSchema.serialize(testData.f2, 
false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f2);
+        }
     }
 
-    @Test
-    void testSpecificSerializeFromSchemaSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSpecificSerializeFromSchemaSeveralTimes(boolean 
legacyTimestampMapping)
+            throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -112,16 +124,24 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(schemaString);
 
-        serializationSchema.serialize(testData.f2);
-        serializationSchema.serialize(testData.f2);
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
+        if (legacyTimestampMapping) {
+            serializationSchema.serialize(testData.f2);
+            serializationSchema.serialize(testData.f2);
+            final byte[] bytes = serializationSchema.serialize(testData.f2);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f2);
+        } else {
+            serializationSchema.serialize(testData.f2, false);
+            serializationSchema.serialize(testData.f2, false);
+            final byte[] bytes = serializationSchema.serialize(testData.f2, 
false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f2);
+        }
     }
 
-    @Test
-    void testGenericSerializeSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testGenericSerializeSeveralTimes(boolean legacyTimestampMapping) 
throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -129,16 +149,25 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(testData.f2.toString());
 
-        serializationSchema.serialize(testData.f1);
-        serializationSchema.serialize(testData.f1);
-        final byte[] bytes = serializationSchema.serialize(testData.f1);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f1);
+        if (legacyTimestampMapping) {
+            serializationSchema.serialize(testData.f1);
+            serializationSchema.serialize(testData.f1);
+            final byte[] bytes = serializationSchema.serialize(testData.f1);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f1);
+        } else {
+            serializationSchema.serialize(testData.f1, false);
+            serializationSchema.serialize(testData.f1, false);
+            final byte[] bytes = serializationSchema.serialize(testData.f1, 
false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f1);
+        }
     }
 
-    @Test
-    void testSpecificDeserializeFromClassSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSpecificDeserializeFromClassSeveralTimes(boolean 
legacyTimestampMapping)
+            throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
 
@@ -147,16 +176,25 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(testData.f0);
 
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        deserializationSchema.deserialize(bytes);
-        deserializationSchema.deserialize(bytes);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serializationSchema.serialize(testData.f2);
+            deserializationSchema.deserialize(bytes);
+            deserializationSchema.deserialize(bytes);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f2);
+        } else {
+            final byte[] bytes = serializationSchema.serialize(testData.f2, 
false);
+            deserializationSchema.deserialize(bytes, false);
+            deserializationSchema.deserialize(bytes, false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f2);
+        }
     }
 
-    @Test
-    void testSpecificDeserializeFromSchemaSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSpecificDeserializeFromSchemaSeveralTimes(boolean 
legacyTimestampMapping)
+            throws IOException {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -166,16 +204,24 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(schemaString);
 
-        final byte[] bytes = serializationSchema.serialize(testData.f2);
-        deserializationSchema.deserialize(bytes);
-        deserializationSchema.deserialize(bytes);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f2);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serializationSchema.serialize(testData.f2);
+            deserializationSchema.deserialize(bytes);
+            deserializationSchema.deserialize(bytes);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f2);
+        } else {
+            final byte[] bytes = serializationSchema.serialize(testData.f2, 
false);
+            deserializationSchema.deserialize(bytes, false);
+            deserializationSchema.deserialize(bytes, false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f2);
+        }
     }
 
-    @Test
-    void testGenericDeserializeSeveralTimes() throws IOException {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testGenericDeserializeSeveralTimes(boolean legacyTimestampMapping) 
throws IOException {
         final Tuple3<GenericRecord, Row, Schema> testData = 
AvroTestUtils.getGenericTestData();
 
         final AvroRowSerializationSchema serializationSchema =
@@ -183,16 +229,24 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowDeserializationSchema deserializationSchema =
                 new AvroRowDeserializationSchema(testData.f2.toString());
 
-        final byte[] bytes = serializationSchema.serialize(testData.f1);
-        deserializationSchema.deserialize(bytes);
-        deserializationSchema.deserialize(bytes);
-        final Row actual = deserializationSchema.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(testData.f1);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serializationSchema.serialize(testData.f1);
+            deserializationSchema.deserialize(bytes);
+            deserializationSchema.deserialize(bytes);
+            final Row actual = deserializationSchema.deserialize(bytes);
+            assertThat(actual).isEqualTo(testData.f1);
+        } else {
+            final byte[] bytes = serializationSchema.serialize(testData.f1, 
false);
+            deserializationSchema.deserialize(bytes, false);
+            deserializationSchema.deserialize(bytes, false);
+            final Row actual = deserializationSchema.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(testData.f1);
+        }
     }
 
-    @Test
-    void testSerializability() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testSerializability(boolean legacyTimestampMapping) throws Exception {
         final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
testData =
                 AvroTestUtils.getSpecificTestData();
         final String schemaString = testData.f1.getSchema().toString();
@@ -201,17 +255,97 @@ class AvroRowDeSerializationSchemaTest {
         final AvroRowSerializationSchema classSer = new 
AvroRowSerializationSchema(testData.f0);
         final AvroRowDeserializationSchema classDeser =
                 new AvroRowDeserializationSchema(testData.f0);
-        testSerializability(classSer, classDeser, testData.f2);
+        testSerializability(classSer, classDeser, testData.f2, 
legacyTimestampMapping);
 
         // from schema string
         final AvroRowSerializationSchema schemaSer = new 
AvroRowSerializationSchema(schemaString);
         final AvroRowDeserializationSchema schemaDeser =
                 new AvroRowDeserializationSchema(schemaString);
-        testSerializability(schemaSer, schemaDeser, testData.f2);
+        testSerializability(schemaSer, schemaDeser, testData.f2, 
legacyTimestampMapping);
+    }
+
+    void testTimestampSerializeDeserializeLegacyMapping() throws Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        final String schemaString = testData.f1.getSchema().toString();
+
+        final AvroRowSerializationSchema serializationSchema =
+                new AvroRowSerializationSchema(schemaString);
+        final AvroRowDeserializationSchema deserializationSchema =
+                new AvroRowDeserializationSchema(schemaString);
+
+        assertThatThrownBy(
+                        () -> {
+                            serializationSchema.serialize(testData.f3);
+                        })
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("Unsupported local timestamp type.");
+
+        final byte[] bytes = serializationSchema.serialize(testData.f3, false);
+
+        assertThatThrownBy(
+                        () -> {
+                            deserializationSchema.deserialize(bytes);
+                        })
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("Unsupported local timestamp type.");
+    }
+
+    @Test
+    void testTimestampSpecificSerializeDeserializeNewMapping() throws 
Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        final String schemaString = testData.f1.getSchema().toString();
+
+        final AvroRowSerializationSchema serializationSchema =
+                new AvroRowSerializationSchema(schemaString);
+        final AvroRowDeserializationSchema deserializationSchema =
+                new AvroRowDeserializationSchema(schemaString);
+
+        final byte[] bytes = serializationSchema.serialize(testData.f3, false);
+        final Row actual = deserializationSchema.deserialize(bytes, false);
+        assertThat(actual).isEqualTo(testData.f3);
+    }
+
+    @Test
+    void testTimestampGenericGenericSerializeDeserializeNewMapping() throws 
Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        final String schemaString = testData.f2.getSchema().toString();
+
+        final AvroRowSerializationSchema serializationSchema =
+                new AvroRowSerializationSchema(schemaString);
+        final AvroRowDeserializationSchema deserializationSchema =
+                new AvroRowDeserializationSchema(schemaString);
+
+        final byte[] bytes = serializationSchema.serialize(testData.f3);
+        final Row actual = deserializationSchema.deserialize(bytes);
+        assertThat(actual).isEqualTo(testData.f3);
+    }
+
+    @Test
+    void testTimestampClassSerializeDeserializeNewMapping() throws Exception {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+
+        final AvroRowSerializationSchema serializationSchema =
+                new AvroRowSerializationSchema(testData.f0);
+        final AvroRowDeserializationSchema deserializationSchema =
+                new AvroRowDeserializationSchema(testData.f0);
+
+        final byte[] bytes = serializationSchema.serialize(testData.f3);
+        final Row actual = deserializationSchema.deserialize(bytes);
+        assertThat(actual).isEqualTo(testData.f3);
     }
 
     private void testSerializability(
-            AvroRowSerializationSchema ser, AvroRowDeserializationSchema 
deser, Row data)
+            AvroRowSerializationSchema ser,
+            AvroRowDeserializationSchema deser,
+            Row data,
+            boolean legacyTimestampMapping)
             throws Exception {
         final byte[] serBytes = InstantiationUtil.serializeObject(ser);
         final byte[] deserBytes = InstantiationUtil.serializeObject(deser);
@@ -223,11 +357,18 @@ class AvroRowDeSerializationSchemaTest {
                 InstantiationUtil.deserializeObject(
                         deserBytes, 
Thread.currentThread().getContextClassLoader());
 
-        final byte[] bytes = serCopy.serialize(data);
-        deserCopy.deserialize(bytes);
-        deserCopy.deserialize(bytes);
-        final Row actual = deserCopy.deserialize(bytes);
-
-        assertThat(actual).isEqualTo(data);
+        if (legacyTimestampMapping) {
+            final byte[] bytes = serCopy.serialize(data);
+            deserCopy.deserialize(bytes);
+            deserCopy.deserialize(bytes);
+            final Row actual = deserCopy.deserialize(bytes);
+            assertThat(actual).isEqualTo(data);
+        } else {
+            final byte[] bytes = serCopy.serialize(data, false);
+            deserCopy.deserialize(bytes, false);
+            deserCopy.deserialize(bytes, false);
+            final Row actual = deserCopy.deserialize(bytes, false);
+            assertThat(actual).isEqualTo(data);
+        }
     }
 }
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 259f05c2ce9..f13a8200b15 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
 import org.apache.flink.formats.avro.generated.User;
@@ -41,6 +42,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -533,6 +535,47 @@ class AvroSchemaConverterTest {
         assertThat(schema).isEqualTo(new Schema.Parser().parse(schemaStr));
     }
 
+    @Test
+    void testTimestampsSchemaToDataTypeToSchemaLegacyTimestampMapping() {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+        String schemaStr = testData.f1.getSchema().toString();
+        DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr);
+        assertThatThrownBy(() -> 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Avro does not support TIMESTAMP type with precision: 
6, it only supports precision less than 3.");
+    }
+
+    @Test
+    void testTimestampsSchemaToTypeInfoLegacyTimestampMapping() {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+        String schemaStr = testData.f1.getSchema().toString();
+        TypeInformation<Row> typeInfo = 
AvroSchemaConverter.convertToTypeInfo(schemaStr);
+        validateLegacyTimestampsSchema(typeInfo);
+    }
+
+    @Test
+    void testTimestampsSchemaToDataTypeToSchemaNewMapping() {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+        String schemaStr = testData.f1.getSchema().toString();
+        DataType dataType = AvroSchemaConverter.convertToDataType(schemaStr, 
false);
+        Schema schema = 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType(), false);
+        DataType dataType2 = 
AvroSchemaConverter.convertToDataType(schema.toString(), false);
+        validateTimestampsSchema(dataType2);
+    }
+
+    @Test
+    void testTimestampsSchemaToTypeInfoNewMapping() {
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> testData =
+                AvroTestUtils.getTimestampTestData();
+        String schemaStr = testData.f1.getSchema().toString();
+        TypeInformation<Row> typeInfo = 
AvroSchemaConverter.convertToTypeInfo(schemaStr, false);
+        validateTimestampsSchema(typeInfo);
+    }
+
     private void validateUserSchema(TypeInformation<?> actual) {
         final TypeInformation<Row> address =
                 Types.ROW_NAMED(
@@ -600,6 +643,78 @@ class AvroSchemaConverterTest {
         assertThat(userRowInfo.schemaEquals(actual)).isTrue();
     }
 
+    private void validateTimestampsSchema(TypeInformation<?> actual) {
+        final TypeInformation<Row> timestamps =
+                Types.ROW_NAMED(
+                        new String[] {
+                            "type_timestamp_millis",
+                            "type_timestamp_micros",
+                            "type_local_timestamp_millis",
+                            "type_local_timestamp_micros"
+                        },
+                        Types.INSTANT,
+                        Types.INSTANT,
+                        Types.LOCAL_DATE_TIME,
+                        Types.LOCAL_DATE_TIME);
+        final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps;
+        assertThat(timestampsRowTypeInfo.schemaEquals(actual)).isTrue();
+    }
+
+    private void validateLegacyTimestampsSchema(TypeInformation<?> actual) {
+        final TypeInformation<Row> timestamps =
+                Types.ROW_NAMED(
+                        new String[] {
+                            "type_timestamp_millis",
+                            "type_timestamp_micros",
+                            "type_local_timestamp_millis",
+                            "type_local_timestamp_micros"
+                        },
+                        Types.SQL_TIMESTAMP,
+                        Types.SQL_TIMESTAMP,
+                        Types.LONG,
+                        Types.LONG);
+        final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps;
+        assertThat(timestampsRowTypeInfo.schemaEquals(actual)).isTrue();
+    }
+
+    private void validateLegacyTimestampsSchema(DataType actual) {
+        final DataType timestamps =
+                DataTypes.ROW(
+                                DataTypes.FIELD(
+                                        "type_timestamp_millis", 
DataTypes.TIMESTAMP(3).notNull()),
+                                DataTypes.FIELD(
+                                        "type_timestamp_micros", 
DataTypes.TIMESTAMP(6).notNull()),
+                                DataTypes.FIELD(
+                                        "type_local_timestamp_millis",
+                                        DataTypes.BIGINT().notNull()),
+                                DataTypes.FIELD(
+                                        "type_local_timestamp_micros",
+                                        DataTypes.BIGINT().notNull()))
+                        .notNull();
+
+        assertThat(actual).isEqualTo(timestamps);
+    }
+
+    private void validateTimestampsSchema(DataType actual) {
+        final DataType timestamps =
+                DataTypes.ROW(
+                                DataTypes.FIELD(
+                                        "type_timestamp_millis",
+                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()),
+                                DataTypes.FIELD(
+                                        "type_timestamp_micros",
+                                        
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()),
+                                DataTypes.FIELD(
+                                        "type_local_timestamp_millis",
+                                        DataTypes.TIMESTAMP(3).notNull()),
+                                DataTypes.FIELD(
+                                        "type_local_timestamp_micros",
+                                        DataTypes.TIMESTAMP(6).notNull()))
+                        .notNull();
+
+        assertThat(actual).isEqualTo(timestamps);
+    }
+
     private void validateUserSchema(DataType actual) {
         final DataType address =
                 DataTypes.ROW(
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index 289d9ca0ce3..7827569df9a 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -111,8 +111,8 @@ class AvroTypeExtractionTest {
                         + "\"type_bytes\": 
\"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", "
                         + "\"type_date\": \"2014-03-01\", 
\"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
                         + "\"type_timestamp_millis\": 
\"2014-03-01T12:12:12.321Z\", "
-                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", "
-                        + "\"type_decimal_fixed\": [7, -48]}\n"
+                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", "
+                        + "\"type_decimal_bytes\": \"\\u0007Ð\", 
\"type_decimal_fixed\": [7, -48]}\n"
                         + "{\"name\": \"Charlie\", \"favorite_number\": null, "
                         + "\"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, "
                         + "\"type_null_test\": null, \"type_bool_test\": 
false, \"type_array_string\": [], "
@@ -123,7 +123,8 @@ class AvroTypeExtractionTest {
                         + "\"type_bytes\": 
\"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", "
                         + "\"type_date\": \"2014-03-01\", 
\"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
                         + "\"type_timestamp_millis\": 
\"2014-03-01T12:12:12.321Z\", "
-                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", "
+                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", "
+                        + "\"type_decimal_bytes\": \"\\u0007Ð\", "
                         + "\"type_decimal_fixed\": [7, -48]}\n";
     }
 
@@ -162,8 +163,8 @@ class AvroTypeExtractionTest {
                         + " \"type_bytes\": 
\"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", "
                         + "\"type_date\": \"2014-03-01\", 
\"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
                         + "\"type_timestamp_millis\": 
\"2014-03-01T12:12:12.321Z\", "
-                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", "
-                        + "\"type_decimal_fixed\": [7, -48]}\n"
+                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", "
+                        + "\"type_decimal_bytes\": \"\\u0007Ð\", 
\"type_decimal_fixed\": [7, -48]}\n"
                         + "{\"name\": \"Charlie\", \"favorite_number\": null, "
                         + "\"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, "
                         + "\"type_null_test\": null, \"type_bool_test\": 
false, \"type_array_string\": [], "
@@ -174,8 +175,8 @@ class AvroTypeExtractionTest {
                         + "\"type_bytes\": 
\"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", "
                         + "\"type_date\": \"2014-03-01\", 
\"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
                         + "\"type_timestamp_millis\": 
\"2014-03-01T12:12:12.321Z\", "
-                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", \"type_decimal_bytes\": \"\\u0007Ð\", "
-                        + "\"type_decimal_fixed\": [7, -48]}\n";
+                        + "\"type_timestamp_micros\": 
\"1970-01-01T00:00:00.123456Z\", "
+                        + "\"type_decimal_bytes\": \"\\u0007Ð\", 
\"type_decimal_fixed\": [7, -48]}\n";
     }
 
     @ParameterizedTest
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 47fc67e710f..b4e12f8ca10 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -19,11 +19,13 @@
 package org.apache.flink.formats.avro.utils;
 
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.formats.avro.AvroFormatOptions.AvroEncoding;
 import org.apache.flink.formats.avro.generated.Address;
 import org.apache.flink.formats.avro.generated.Colors;
 import org.apache.flink.formats.avro.generated.Fixed16;
 import org.apache.flink.formats.avro.generated.Fixed2;
+import org.apache.flink.formats.avro.generated.Timestamps;
 import org.apache.flink.formats.avro.generated.User;
 import 
org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest;
 import org.apache.flink.types.Row;
@@ -48,6 +50,7 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
@@ -263,6 +266,53 @@ public final class AvroTestUtils {
         return t;
     }
 
+    public static Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row>
+            getTimestampTestData() {
+
+        final String schemaString =
+                
"{\"type\":\"record\",\"name\":\"GenericTimestamps\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
+                        + "\"fields\": 
[{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
+                        + 
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
+                        + "\"logicalType\":\"timestamp-micros\"}},{\"name\": 
\"type_local_timestamp_millis\", \"type\": {\"type\": \"long\", 
\"logicalType\": \"local-timestamp-millis\"}},"
+                        + "{\"name\": \"type_local_timestamp_micros\", 
\"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}}]}";
+        final Schema schema = new Schema.Parser().parse(schemaString);
+        final GenericRecord timestampRecord = new GenericData.Record(schema);
+        timestampRecord.put("type_timestamp_millis", 
Instant.parse("2014-03-01T12:12:12.321Z"));
+        timestampRecord.put(
+                "type_timestamp_micros", 
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+        timestampRecord.put(
+                "type_local_timestamp_millis", 
LocalDateTime.parse("2014-03-01T12:12:12.321"));
+        timestampRecord.put(
+                "type_local_timestamp_micros", 
LocalDateTime.parse("1970-01-01T00:00:00.123456"));
+
+        final Timestamps timestamps =
+                Timestamps.newBuilder()
+                        
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
+                        .setTypeTimestampMicros(
+                                Instant.ofEpochSecond(0).plus(123456L, 
ChronoUnit.MICROS))
+                        
.setTypeLocalTimestampMillis(LocalDateTime.parse("2014-03-01T12:12:12.321"))
+                        .setTypeLocalTimestampMicros(
+                                
LocalDateTime.parse("1970-01-01T00:00:00.123456"))
+                        .build();
+
+        final Row timestampRow = new Row(4);
+        timestampRow.setField(0, Timestamp.valueOf("2014-03-01 12:12:12.321"));
+        timestampRow.setField(
+                1, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, 
ChronoUnit.MICROS)));
+        timestampRow.setField(2, 
Timestamp.valueOf(LocalDateTime.parse("2014-03-01T12:12:12.321")));
+        timestampRow.setField(
+                3, 
Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456")));
+
+        final Tuple4<Class<? extends SpecificRecord>, SpecificRecord, 
GenericRecord, Row> t =
+                new Tuple4<>();
+        t.f0 = Timestamps.class;
+        t.f1 = timestamps;
+        t.f2 = timestampRecord;
+        t.f3 = timestampRow;
+
+        return t;
+    }
+
     /**
      * Craft a large Avro Schema which contains more than 0xFFFF characters.
      *
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc 
b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 858cf6face6..0462ec12539 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -115,4 +115,15 @@
        {"name": "type_date", "type": {"type": "int", "logicalType": "date"}},
        {"name": "type_time_millis", "type": {"type": "int", "logicalType": 
"time-millis"}}
    ]
- }]
+},
+{"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "Timestamps",
+ "fields": [
+     {"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": 
"timestamp-millis"}},
+     {"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": 
"timestamp-micros"}},
+     {"name": "type_local_timestamp_millis", "type": {"type": "long", 
"logicalType": "local-timestamp-millis"}},
+     {"name": "type_local_timestamp_micros", "type": {"type": "long", 
"logicalType": "local-timestamp-micros"}}
+ ]
+}
+]

Reply via email to