This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f767dad20e Flink: Backport add Nanosecond Precision Support for 
Flink-Iceberg Integration to Flink 1.20 (#16240)
f767dad20e is described below

commit f767dad20e9a47495ce37f1acf6acdffbf63664d
Author: pvary <[email protected]>
AuthorDate: Thu May 7 16:49:25 2026 +0200

    Flink: Backport add Nanosecond Precision Support for Flink-Iceberg 
Integration to Flink 1.20 (#16240)
    
    Backports #15475
---
 flink/v1.20/build.gradle                           |   1 +
 .../org/apache/iceberg/flink/FlinkTypeToType.java  |   6 +
 .../org/apache/iceberg/flink/RowDataWrapper.java   |  36 +-
 .../apache/iceberg/flink/data/FlinkOrcReader.java  |   7 +
 .../apache/iceberg/flink/data/FlinkOrcWriter.java  |   7 +
 .../apache/iceberg/flink/data/FlinkOrcWriters.java |  37 ++
 .../org/apache/iceberg/flink/data/RowDataUtil.java |   2 +
 .../apache/iceberg/flink/data/StructRowData.java   |  52 +-
 .../formats/avro/AvroToRowDataConverters.java      | 303 ++++++++++
 .../iceberg/flink/formats/avro/JodaConverter.java  |  69 +++
 .../formats/avro/RowDataToAvroConverters.java      | 394 +++++++++++++
 .../avro/typeutils/AvroSchemaConverter.java        | 625 +++++++++++++++++++++
 .../sink/AvroGenericRecordToRowDataMapper.java     |   4 +-
 .../RowDataToAvroGenericRecordConverter.java       |   4 +-
 .../source/reader/AvroGenericRecordConverter.java  |   4 +-
 .../org/apache/iceberg/flink/DataGenerators.java   | 105 +++-
 .../apache/iceberg/flink/TestRowDataWrapper.java   |  13 -
 .../flink/data/TestFlinkOrcReaderWriter.java       |   5 +
 .../iceberg/flink/data/TestRowDataProjection.java  |  21 +-
 19 files changed, 1642 insertions(+), 53 deletions(-)

diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index c7ca24817b..467b0fa8c9 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -33,6 +33,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
     implementation project(':iceberg-hive-metastore')
 
     compileOnly libs.flink120.avro
+    compileOnly libs.joda.time
     // dropwizard histogram metrics (optional in Flink)
     compileOnly libs.flink120.metrics.dropwizard
     compileOnly libs.flink120.streaming.java
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
index 408065f060..8f106da8d5 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
@@ -137,11 +137,17 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
 
   @Override
   public Type visit(TimestampType timestampType) {
+    if (timestampType.getPrecision() > 6) {
+      return Types.TimestampNanoType.withoutZone();
+    }
     return Types.TimestampType.withoutZone();
   }
 
   @Override
   public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+    if (localZonedTimestampType.getPrecision() > 6) {
+      return Types.TimestampNanoType.withZone();
+    }
     return Types.TimestampType.withZone();
   }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
index 3ef611f2de..920e44b24b 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
@@ -114,19 +114,35 @@ public class RowDataWrapper implements StructLike {
 
       case TIMESTAMP_WITHOUT_TIME_ZONE:
         TimestampType timestampType = (TimestampType) logicalType;
-        return (row, pos) -> {
-          LocalDateTime localDateTime =
-              row.getTimestamp(pos, 
timestampType.getPrecision()).toLocalDateTime();
-          return DateTimeUtil.microsFromTimestamp(localDateTime);
-        };
+        if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+          return (row, pos) -> {
+            LocalDateTime localDateTime =
+                row.getTimestamp(pos, 
timestampType.getPrecision()).toLocalDateTime();
+            return DateTimeUtil.nanosFromTimestamp(localDateTime);
+          };
+        } else {
+          return (row, pos) -> {
+            LocalDateTime localDateTime =
+                row.getTimestamp(pos, 
timestampType.getPrecision()).toLocalDateTime();
+            return DateTimeUtil.microsFromTimestamp(localDateTime);
+          };
+        }
 
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType;
-        return (row, pos) -> {
-          TimestampData timestampData = row.getTimestamp(pos, 
lzTs.getPrecision());
-          return timestampData.getMillisecond() * 1000
-              + timestampData.getNanoOfMillisecond() / 1000;
-        };
+        if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+          return (row, pos) -> {
+            TimestampData timestampData = row.getTimestamp(pos, 
lzTs.getPrecision());
+            return timestampData.getMillisecond() * 1_000_000L
+                + timestampData.getNanoOfMillisecond();
+          };
+        } else {
+          return (row, pos) -> {
+            TimestampData timestampData = row.getTimestamp(pos, 
lzTs.getPrecision());
+            return timestampData.getMillisecond() * 1000L
+                + timestampData.getNanoOfMillisecond() / 1000;
+          };
+        }
 
       case ROW:
         RowType rowType = (RowType) logicalType;
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 65b9d44ad4..3e3a29112c 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -112,6 +112,13 @@ public class FlinkOrcReader implements 
OrcRowReader<RowData> {
           } else {
             return FlinkOrcReaders.timestamps();
           }
+        case TIMESTAMP_NANO:
+          Types.TimestampNanoType timestampNanoType = 
(Types.TimestampNanoType) iPrimitive;
+          if (timestampNanoType.shouldAdjustToUTC()) {
+            return FlinkOrcReaders.timestampTzs();
+          } else {
+            return FlinkOrcReaders.timestamps();
+          }
         case STRING:
           return FlinkOrcReaders.strings();
         case UUID:
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index a467d84833..c1b46252e1 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -145,6 +145,13 @@ public class FlinkOrcWriter implements 
OrcRowWriter<RowData> {
           } else {
             return FlinkOrcWriters.timestamps();
           }
+        case TIMESTAMP_NANO:
+          Types.TimestampNanoType timestampNanoType = 
(Types.TimestampNanoType) iPrimitive;
+          if (timestampNanoType.shouldAdjustToUTC()) {
+            return FlinkOrcWriters.timestampNanoTzs();
+          } else {
+            return FlinkOrcWriters.timestampNanos();
+          }
         case STRING:
           return FlinkOrcWriters.strings();
         case UUID:
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
index 684842aa09..bf19a46c05 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
@@ -70,6 +70,14 @@ class FlinkOrcWriters {
     return TimestampTzWriter.INSTANCE;
   }
 
+  static OrcValueWriter<TimestampData> timestampNanos() {
+    return TimestampNanoWriter.INSTANCE;
+  }
+
+  static OrcValueWriter<TimestampData> timestampNanoTzs() {
+    return TimestampNanoTzWriter.INSTANCE;
+  }
+
   static OrcValueWriter<DecimalData> decimals(int precision, int scale) {
     if (precision <= 18) {
       return new Decimal18Writer(precision, scale);
@@ -170,6 +178,35 @@ class FlinkOrcWriters {
     }
   }
 
+  private static class TimestampNanoWriter implements 
OrcValueWriter<TimestampData> {
+    private static final TimestampNanoWriter INSTANCE = new 
TimestampNanoWriter();
+
+    @Override
+    public void nonNullWrite(int rowId, TimestampData data, ColumnVector 
output) {
+      TimestampColumnVector cv = (TimestampColumnVector) output;
+      cv.setIsUTC(true);
+      // millis
+      OffsetDateTime offsetDateTime = 
data.toInstant().atOffset(ZoneOffset.UTC);
+      cv.time[rowId] =
+          offsetDateTime.toEpochSecond() * 1_000 + offsetDateTime.getNano() / 
1_000_000;
+      cv.nanos[rowId] = offsetDateTime.getNano();
+    }
+  }
+
+  private static class TimestampNanoTzWriter implements 
OrcValueWriter<TimestampData> {
+    private static final TimestampNanoTzWriter INSTANCE = new 
TimestampNanoTzWriter();
+
+    @SuppressWarnings("JavaInstantGetSecondsGetNano")
+    @Override
+    public void nonNullWrite(int rowId, TimestampData data, ColumnVector 
output) {
+      TimestampColumnVector cv = (TimestampColumnVector) output;
+      // millis
+      Instant instant = data.toInstant();
+      cv.time[rowId] = instant.toEpochMilli();
+      cv.nanos[rowId] = instant.getNano();
+    }
+  }
+
   private static class Decimal18Writer implements OrcValueWriter<DecimalData> {
     private final int precision;
     private final int scale;
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
index f23a7ee3d0..81bb559679 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
@@ -69,6 +69,8 @@ public class RowDataUtil {
         return (int) ((Long) value / 1000);
       case TIMESTAMP: // TimestampData
         return 
TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value));
+      case TIMESTAMP_NANO:
+        return 
TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromNanos((Long) value));
       case UUID:
         return UUIDUtil.convert((UUID) value);
       default:
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
index 34576a1e5c..b469f2310f 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
@@ -48,6 +48,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.DateTimeUtil;
 
 @Internal
 public class StructRowData implements RowData {
@@ -120,8 +121,8 @@ public class StructRowData implements RowData {
 
     if (integer instanceof Integer) {
       return (int) integer;
-    } else if (integer instanceof LocalDate) {
-      return (int) ((LocalDate) integer).toEpochDay();
+    } else if (integer instanceof LocalDate localDate) {
+      return (int) localDate.toEpochDay();
     } else if (integer instanceof LocalTime) {
       return (int) (((LocalTime) integer).toNanoOfDay() / 1000_000);
     } else {
@@ -185,8 +186,27 @@ public class StructRowData implements RowData {
 
   @Override
   public TimestampData getTimestamp(int pos, int precision) {
+    if (precision > 6) {
+      Object timeVal = struct.get(pos, Object.class);
+      if (timeVal instanceof OffsetDateTime) {
+        OffsetDateTime odt = (OffsetDateTime) timeVal;
+        return TimestampData.fromEpochMillis(
+            odt.toInstant().toEpochMilli(), odt.getNano() % 1_000_000);
+      } else if (timeVal instanceof LocalDateTime) {
+        LocalDateTime ldt = (LocalDateTime) timeVal;
+        return TimestampData.fromEpochMillis(
+            ldt.toInstant(ZoneOffset.UTC).toEpochMilli(), ldt.getNano() % 
1_000_000);
+      } else if (timeVal instanceof Long) {
+        long timeLong = (Long) timeVal;
+        return TimestampData.fromEpochMillis(
+            Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong, 
1_000_000L));
+      } else {
+        throw new IllegalStateException("Unknown type for timestamp_ns: " + 
timeVal.getClass());
+      }
+    }
     long timeLong = getLong(pos);
-    return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 
1000) * 1000);
+    return TimestampData.fromEpochMillis(
+        Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 1000L) * 
1000);
   }
 
   @Override
@@ -257,9 +277,29 @@ public class StructRowData implements RowData {
       case DECIMAL:
         return value;
       case TIMESTAMP:
-        long millisecond = (long) value / 1000;
-        int nanoOfMillisecond = (int) ((Long) value % 1000) * 1000;
-        return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
+        long timeMillis;
+        if (value instanceof LocalDateTime localDateTime) {
+          timeMillis = DateTimeUtil.microsFromTimestamp(localDateTime) / 1000L;
+        } else if (value instanceof OffsetDateTime offsetDateTime) {
+          timeMillis = DateTimeUtil.microsFromTimestamptz(offsetDateTime) / 
1000L;
+        } else {
+          timeMillis = Math.floorDiv((Long) value, 1000L);
+        }
+        return TimestampData.fromEpochMillis(
+            timeMillis,
+            (int) Math.floorMod(value instanceof Long ? (Long) value : 
timeMillis * 1000L, 1000L)
+                * 1000);
+      case TIMESTAMP_NANO:
+        long nanoLong;
+        if (value instanceof LocalDateTime localDateTime) {
+          nanoLong = DateTimeUtil.nanosFromTimestamp(localDateTime);
+        } else if (value instanceof OffsetDateTime offsetDateTime) {
+          nanoLong = DateTimeUtil.nanosFromTimestamptz(offsetDateTime);
+        } else {
+          nanoLong = (Long) value;
+        }
+        return TimestampData.fromEpochMillis(
+            Math.floorDiv(nanoLong, 1_000_000L), (int) Math.floorMod(nanoLong, 
1_000_000L));
       case STRING:
         return StringData.fromString(value.toString());
       case FIXED:
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
new file mode 100644
index 0000000000..0f70e60a1b
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Tool class used to convert from Avro {@link GenericRecord} to {@link 
RowData}.
+ *
+ * <p>This class is adapted in Iceberg to add support for nanosecond precision 
timestamps
+ * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter 
may be removed.
+ */
+@Internal
+public class AvroToRowDataConverters {
+
+  private AvroToRowDataConverters() {}
+
+  /**
+   * Runtime converter that converts Avro data structures into objects of 
Flink Table &amp; SQL
+   * internal data structures.
+   */
+  @FunctionalInterface
+  public interface AvroToRowDataConverter extends Serializable {
+    Object convert(Object object);
+  }
+
+  // 
-------------------------------------------------------------------------------------
+  // Runtime Converters
+  // 
-------------------------------------------------------------------------------------
+
+  public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+    return createRowConverter(rowType, true);
+  }
+
+  public static AvroToRowDataConverter createRowConverter(
+      RowType rowType, boolean legacyTimestampMapping) {
+    final AvroToRowDataConverter[] fieldConverters =
+        rowType.getFields().stream()
+            .map(RowType.RowField::getType)
+            .map(type -> createNullableConverter(type, legacyTimestampMapping))
+            .toArray(AvroToRowDataConverter[]::new);
+    final int arity = rowType.getFieldCount();
+
+    return avroObject -> {
+      IndexedRecord record = (IndexedRecord) avroObject;
+      GenericRowData row = new GenericRowData(arity);
+      for (int i = 0; i < arity; ++i) {
+        // avro always deserialize successfully even though the type isn't 
matched
+        // so no need to throw exception about which field can't be 
deserialized
+        row.setField(i, fieldConverters[i].convert(record.get(i)));
+      }
+      return row;
+    };
+  }
+
+  /** Creates a runtime converter which is null safe. */
+  private static AvroToRowDataConverter createNullableConverter(
+      LogicalType type, boolean legacyTimestampMapping) {
+    final AvroToRowDataConverter converter = createConverter(type, 
legacyTimestampMapping);
+    return avroObject -> {
+      if (avroObject == null) {
+        return null;
+      }
+      return converter.convert(avroObject);
+    };
+  }
+
+  /** Creates a runtime converter which assuming input object is not null. */
+  private static AvroToRowDataConverter createConverter(
+      LogicalType type, boolean legacyTimestampMapping) {
+    switch (type.getTypeRoot()) {
+      case NULL:
+        return avroObject -> null;
+      case TINYINT:
+        return avroObject -> ((Integer) avroObject).byteValue();
+      case SMALLINT:
+        return avroObject -> ((Integer) avroObject).shortValue();
+      case BOOLEAN: // boolean
+      case INTEGER: // int
+      case INTERVAL_YEAR_MONTH: // long
+      case BIGINT: // long
+      case INTERVAL_DAY_TIME: // long
+      case FLOAT: // float
+      case DOUBLE: // double
+        return avroObject -> avroObject;
+      case DATE:
+        return AvroToRowDataConverters::convertToDate;
+      case TIME_WITHOUT_TIME_ZONE:
+        return AvroToRowDataConverters::convertToTime;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+        return avroObject -> convertToTimestamp(avroObject, type);
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        if (legacyTimestampMapping) {
+          throw new UnsupportedOperationException("Unsupported type: " + type);
+        } else {
+          // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+          return avroObject -> convertToTimestamp(avroObject, type);
+        }
+      case CHAR:
+      case VARCHAR:
+        return avroObject -> StringData.fromString(avroObject.toString());
+      case BINARY:
+      case VARBINARY:
+        return AvroToRowDataConverters::convertToBytes;
+      case DECIMAL:
+        return createDecimalConverter((DecimalType) type);
+      case ARRAY:
+        return createArrayConverter((ArrayType) type, legacyTimestampMapping);
+      case ROW:
+        return createRowConverter((RowType) type);
+      case MAP:
+      case MULTISET:
+        return createMapConverter(type, legacyTimestampMapping);
+      case RAW:
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+  }
+
+  private static AvroToRowDataConverter createDecimalConverter(DecimalType 
decimalType) {
+    final int precision = decimalType.getPrecision();
+    final int scale = decimalType.getScale();
+    return avroObject -> {
+      final byte[] bytes;
+      if (avroObject instanceof GenericFixed) {
+        bytes = ((GenericFixed) avroObject).bytes();
+      } else if (avroObject instanceof ByteBuffer) {
+        ByteBuffer byteBuffer = (ByteBuffer) avroObject;
+        bytes = new byte[byteBuffer.remaining()];
+        byteBuffer.get(bytes);
+      } else {
+        bytes = (byte[]) avroObject;
+      }
+      return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+    };
+  }
+
+  private static AvroToRowDataConverter createArrayConverter(
+      ArrayType arrayType, boolean legacyTimestampMapping) {
+    final AvroToRowDataConverter elementConverter =
+        createNullableConverter(arrayType.getElementType(), 
legacyTimestampMapping);
+    final Class<?> elementClass =
+        LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+
+    return avroObject -> {
+      final List<?> list = (List<?>) avroObject;
+      final int length = list.size();
+      final Object[] array = (Object[]) Array.newInstance(elementClass, 
length);
+      for (int i = 0; i < length; ++i) {
+        array[i] = elementConverter.convert(list.get(i));
+      }
+      return new GenericArrayData(array);
+    };
+  }
+
+  private static AvroToRowDataConverter createMapConverter(
+      LogicalType type, boolean legacyTimestampMapping) {
+    final AvroToRowDataConverter keyConverter =
+        createConverter(DataTypes.STRING().getLogicalType(), 
legacyTimestampMapping);
+    final AvroToRowDataConverter valueConverter =
+        createNullableConverter(
+            AvroSchemaConverter.extractValueTypeToAvroMap(type), 
legacyTimestampMapping);
+
+    return avroObject -> {
+      final Map<?, ?> map = (Map<?, ?>) avroObject;
+      Map<Object, Object> result = Maps.newHashMap();
+      for (Map.Entry<?, ?> entry : map.entrySet()) {
+        Object key = keyConverter.convert(entry.getKey());
+        Object value = valueConverter.convert(entry.getValue());
+        result.put(key, value);
+      }
+      return new GenericMapData(result);
+    };
+  }
+
+  private static TimestampData convertToTimestamp(Object object, LogicalType 
type) {
+    int precision = 3;
+    if (type instanceof org.apache.flink.table.types.logical.TimestampType) {
+      precision = ((org.apache.flink.table.types.logical.TimestampType) 
type).getPrecision();
+    } else if (type instanceof 
org.apache.flink.table.types.logical.LocalZonedTimestampType) {
+      precision =
+          ((org.apache.flink.table.types.logical.LocalZonedTimestampType) 
type).getPrecision();
+    }
+
+    if (object instanceof Long) {
+      long timeLong = (Long) object;
+      if (precision <= 3) {
+        return TimestampData.fromEpochMillis(timeLong);
+      } else if (precision <= 6) {
+        return TimestampData.fromEpochMillis(
+            Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 
1000L) * 1_000_000);
+      } else {
+        // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+        return TimestampData.fromEpochMillis(
+            Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong, 
1_000_000L));
+      }
+    } else if (object instanceof Instant) {
+      return TimestampData.fromInstant((Instant) object);
+    } else if (object instanceof LocalDateTime) {
+      return TimestampData.fromLocalDateTime((LocalDateTime) object);
+    } else {
+      JodaConverter jodaConverter = JodaConverter.getConverter();
+      if (jodaConverter != null) {
+        return 
TimestampData.fromEpochMillis(jodaConverter.convertTimestamp(object));
+      } else {
+        throw new IllegalArgumentException(
+            "Unexpected object type for TIMESTAMP logical type. Received: " + 
object);
+      }
+    }
+  }
+
+  private static int convertToDate(Object object) {
+    if (object instanceof Integer) {
+      return (Integer) object;
+    } else if (object instanceof LocalDate) {
+      return (int) ((LocalDate) object).toEpochDay();
+    } else {
+      JodaConverter jodaConverter = JodaConverter.getConverter();
+      if (jodaConverter != null) {
+        return (int) jodaConverter.convertDate(object);
+      } else {
+        throw new IllegalArgumentException(
+            "Unexpected object type for DATE logical type. Received: " + 
object);
+      }
+    }
+  }
+
+  private static int convertToTime(Object object) {
+    final int millis;
+    if (object instanceof Integer) {
+      millis = (Integer) object;
+    } else if (object instanceof LocalTime) {
+      millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
+    } else {
+      JodaConverter jodaConverter = JodaConverter.getConverter();
+      if (jodaConverter != null) {
+        millis = jodaConverter.convertTime(object);
+      } else {
+        throw new IllegalArgumentException(
+            "Unexpected object type for TIME logical type. Received: " + 
object);
+      }
+    }
+    return millis;
+  }
+
+  private static byte[] convertToBytes(Object object) {
+    if (object instanceof GenericFixed) {
+      return ((GenericFixed) object).bytes();
+    } else if (object instanceof ByteBuffer) {
+      ByteBuffer byteBuffer = (ByteBuffer) object;
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return bytes;
+    } else {
+      return (byte[]) object;
+    }
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
new file mode 100644
index 0000000000..c30b780233
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda 
is available on the
+ * classpath.
+ */
+@SuppressWarnings("JavaUtilDate")
+class JodaConverter {
+
+  private static JodaConverter instance;
+  private static boolean instantiated = false;
+
+  public static JodaConverter getConverter() {
+    if (instantiated) {
+      return instance;
+    }
+
+    try {
+      Class.forName(
+          "org.joda.time.DateTime", false, 
Thread.currentThread().getContextClassLoader());
+      instance = new JodaConverter();
+    } catch (ClassNotFoundException e) {
+      instance = null;
+    } finally {
+      instantiated = true;
+    }
+    return instance;
+  }
+
+  public long convertDate(Object object) {
+    final LocalDate value = (LocalDate) object;
+    return value.toDate().getTime();
+  }
+
+  public int convertTime(Object object) {
+    final LocalTime value = (LocalTime) object;
+    return value.get(DateTimeFieldType.millisOfDay());
+  }
+
+  public long convertTimestamp(Object object) {
+    final DateTime value = (DateTime) object;
+    return value.toDate().getTime();
+  }
+
+  private JodaConverter() {}
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
new file mode 100644
index 0000000000..d4c7e4282d
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Tool class used to convert from {@link RowData} to Avro {@link 
GenericRecord}.
+ *
+ * <p>This class is adapted in Iceberg to add support for nanosecond precision 
timestamps
+ * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter 
may be removed.
+ */
+@Internal
+public class RowDataToAvroConverters {
+
+  private RowDataToAvroConverters() {}
+
+  // 
--------------------------------------------------------------------------------
+  // Runtime Converters
+  // 
--------------------------------------------------------------------------------
+
+  /**
+   * Runtime converter that converts objects of Flink Table &amp; SQL internal 
data structures to
+   * corresponding Avro data structures.
+   */
+  @FunctionalInterface
+  public interface RowDataToAvroConverter extends Serializable {
+    Object convert(Schema schema, Object object);
+  }
+
+  // 
--------------------------------------------------------------------------------
+  // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. 
It is
+  // necessary because the maven shade plugin cannot relocate classes in
+  // SerializedLambdas (MSHADE-260). On the other hand we want to relocate 
Avro for
+  // sql-client uber jars.
+  // 
--------------------------------------------------------------------------------
+
+  /**
+   * Creates a runtime converter according to the given logical type that 
converts objects of Flink
+   * Table &amp; SQL internal data structures to corresponding Avro data 
structures.
+   */
+  public static RowDataToAvroConverter createConverter(LogicalType type) {
+    return createConverter(type, true);
+  }
+
+  @SuppressWarnings("checkstyle:MethodLength")
+  public static RowDataToAvroConverter createConverter(
+      LogicalType type, boolean legacyTimestampMapping) {
+    final RowDataToAvroConverter converter;
+    switch (type.getTypeRoot()) {
+      case NULL:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return null;
+              }
+            };
+        break;
+      case TINYINT:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((Byte) object).intValue();
+              }
+            };
+        break;
+      case SMALLINT:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ((Short) object).intValue();
+              }
+            };
+        break;
+      case BOOLEAN: // boolean
+      case INTEGER: // int
+      case INTERVAL_YEAR_MONTH: // long
+      case BIGINT: // long
+      case INTERVAL_DAY_TIME: // long
+      case FLOAT: // float
+      case DOUBLE: // double
+      case TIME_WITHOUT_TIME_ZONE: // int
+      case DATE: // int
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return object;
+              }
+            };
+        break;
+      case CHAR:
+      case VARCHAR:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return new Utf8(object.toString());
+              }
+            };
+        break;
+      case BINARY:
+      case VARBINARY:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ByteBuffer.wrap((byte[]) object);
+              }
+            };
+        break;
+        // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        final int tzPrecision;
+        if (type instanceof 
org.apache.flink.table.types.logical.TimestampType) {
+          tzPrecision = ((org.apache.flink.table.types.logical.TimestampType) 
type).getPrecision();
+        } else {
+          tzPrecision = 3;
+        }
+        if (legacyTimestampMapping) {
+          converter =
+              new RowDataToAvroConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object convert(Schema schema, Object object) {
+                  TimestampData timestampData = (TimestampData) object;
+                  if (tzPrecision <= 3) {
+                    return timestampData.getMillisecond();
+                  } else if (tzPrecision <= 6) {
+                    return timestampData.getMillisecond() * 1000L
+                        + timestampData.getNanoOfMillisecond() / 1000;
+                  } else {
+                    // Iceberg: Added support for nanoseconds precision 
(FLINK-39251)
+                    return timestampData.getMillisecond() * 1_000_000L
+                        + timestampData.getNanoOfMillisecond();
+                  }
+                }
+              };
+        } else {
+          converter =
+              new RowDataToAvroConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object convert(Schema schema, Object object) {
+                  TimestampData timestampData = (TimestampData) object;
+                  java.time.Instant instant =
+                      
timestampData.toLocalDateTime().toInstant(ZoneOffset.UTC);
+                  if (tzPrecision <= 3) {
+                    return instant.toEpochMilli();
+                  } else if (tzPrecision <= 6) {
+                    return instant.getEpochSecond() * 1_000_000L + 
instant.getNano() / 1000;
+                  } else {
+                    // Iceberg: Added support for nanoseconds precision 
(FLINK-39251)
+                    return instant.getEpochSecond() * 1_000_000_000L + 
instant.getNano();
+                  }
+                }
+              };
+        }
+        break;
+        // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        final int ltzPrecision;
+        if (type instanceof 
org.apache.flink.table.types.logical.LocalZonedTimestampType) {
+          ltzPrecision =
+              ((org.apache.flink.table.types.logical.LocalZonedTimestampType) 
type).getPrecision();
+        } else {
+          ltzPrecision = 3;
+        }
+        if (legacyTimestampMapping) {
+          throw new UnsupportedOperationException("Unsupported type: " + type);
+        } else {
+          converter =
+              new RowDataToAvroConverter() {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object convert(Schema schema, Object object) {
+                  TimestampData timestampData = (TimestampData) object;
+                  if (ltzPrecision <= 3) {
+                    return timestampData.getMillisecond();
+                  } else if (ltzPrecision <= 6) {
+                    return timestampData.getMillisecond() * 1000L
+                        + timestampData.getNanoOfMillisecond() / 1000;
+                  } else {
+                    // Iceberg: Added support for nanoseconds precision 
(FLINK-39251)
+                    return timestampData.getMillisecond() * 1_000_000L
+                        + timestampData.getNanoOfMillisecond();
+                  }
+                }
+              };
+        }
+        break;
+      case DECIMAL:
+        converter =
+            new RowDataToAvroConverter() {
+              private static final long serialVersionUID = 1L;
+
+              @Override
+              public Object convert(Schema schema, Object object) {
+                return ByteBuffer.wrap(((DecimalData) 
object).toUnscaledBytes());
+              }
+            };
+        break;
+      case ARRAY:
+        converter = createArrayConverter((ArrayType) type, 
legacyTimestampMapping);
+        break;
+      case ROW:
+        converter = createRowConverter((RowType) type, legacyTimestampMapping);
+        break;
+      case MAP:
+      case MULTISET:
+        converter = createMapConverter(type, legacyTimestampMapping);
+        break;
+      case RAW:
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + type);
+    }
+
+    // wrap into nullable converter
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        if (object == null) {
+          return null;
+        }
+
+        // get actual schema if it is a nullable schema
+        Schema actualSchema;
+        if (schema.getType() == Schema.Type.UNION) {
+          List<Schema> types = schema.getTypes();
+          int size = types.size();
+          if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+            actualSchema = types.get(0);
+          } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+            actualSchema = types.get(1);
+          } else {
+            throw new IllegalArgumentException(
+                "The Avro schema is not a nullable type: " + 
schema.toString());
+          }
+        } else {
+          actualSchema = schema;
+        }
+        return converter.convert(actualSchema, object);
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createRowConverter(
+      RowType rowType, boolean legacyTimestampMapping) {
+    final RowDataToAvroConverter[] fieldConverters =
+        rowType.getChildren().stream()
+            .map(legacyType -> createConverter(legacyType, 
legacyTimestampMapping))
+            .toArray(RowDataToAvroConverter[]::new);
+    final LogicalType[] fieldTypes =
+        
rowType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new);
+    final RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[fieldTypes.length];
+    for (int i = 0; i < fieldTypes.length; i++) {
+      fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+    }
+    final int length = rowType.getFieldCount();
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final RowData row = (RowData) object;
+        final List<Schema.Field> fields = schema.getFields();
+        final GenericRecord record = new GenericData.Record(schema);
+        for (int i = 0; i < length; ++i) {
+          final Schema.Field schemaField = fields.get(i);
+          try {
+            Object avroObject =
+                fieldConverters[i].convert(
+                    schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+            record.put(i, avroObject);
+          } catch (Throwable t) {
+            throw new RuntimeException(
+                String.format("Fail to serialize at field: %s.", 
schemaField.name()), t);
+          }
+        }
+        return record;
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createArrayConverter(
+      ArrayType arrayType, boolean legacyTimestampMapping) {
+    LogicalType elementType = arrayType.getElementType();
+    final ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(elementType);
+    final RowDataToAvroConverter elementConverter =
+        createConverter(arrayType.getElementType(), legacyTimestampMapping);
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final Schema elementSchema = schema.getElementType();
+        ArrayData arrayData = (ArrayData) object;
+        List<Object> list = Lists.newArrayList();
+        for (int i = 0; i < arrayData.size(); ++i) {
+          list.add(
+              elementConverter.convert(
+                  elementSchema, elementGetter.getElementOrNull(arrayData, 
i)));
+        }
+        return list;
+      }
+    };
+  }
+
+  private static RowDataToAvroConverter createMapConverter(
+      LogicalType type, boolean legacyTimestampMapping) {
+    LogicalType valueType = 
AvroSchemaConverter.extractValueTypeToAvroMap(type);
+    final ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(valueType);
+    final RowDataToAvroConverter valueConverter =
+        createConverter(valueType, legacyTimestampMapping);
+
+    return new RowDataToAvroConverter() {
+      private static final long serialVersionUID = 1L;
+
+      @Override
+      public Object convert(Schema schema, Object object) {
+        final Schema valueSchema = schema.getValueType();
+        final MapData mapData = (MapData) object;
+        final ArrayData keyArray = mapData.keyArray();
+        final ArrayData valueArray = mapData.valueArray();
+        final Map<Object, Object> map = 
CollectionUtil.newHashMapWithExpectedSize(mapData.size());
+        for (int i = 0; i < mapData.size(); ++i) {
+          final String key = keyArray.getString(i).toString();
+          final Object value =
+              valueConverter.convert(valueSchema, 
valueGetter.getElementOrNull(valueArray, i));
+          map.put(key, value);
+        }
+        return map;
+      }
+    };
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
new file mode 100644
index 0000000000..347631c7f4
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro.typeutils;
+
+import java.util.List;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link 
RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible 
with Flink's Table
+ * &amp; SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the 
corresponding runtime classes
+ * {@link AvroRowDataDeserializationSchema} and {@link 
AvroRowDataSerializationSchema}.
+ *
+ * <p>This class is adapted in Iceberg to support custom 'timestamp-nanos' and
+ * 'local-timestamp-nanos' logical types (FLINK-39251). Once that ticket is 
resolved in Flink, these
+ * custom types may be removed.
+ */
+public class AvroSchemaConverter {
+
+  private AvroSchemaConverter() {
+    // private
+  }
+
+  /**
+   * Converts an Avro class into a nested row structure with deterministic 
field order and data
+   * types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroClass Avro specific record that contains schema information
+   * @return type information matching the schema
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends SpecificRecord> TypeInformation<Row> 
convertToTypeInfo(
+      Class<T> avroClass) {
+    return convertToTypeInfo(avroClass, true);
+  }
+
+  /**
+   * Converts an Avro class into a nested row structure with deterministic 
field order and data
+   * types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroClass Avro specific record that contains schema information
+   * @param legacyTimestampMapping legacy mapping of timestamp types
+   * @return type information matching the schema
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends SpecificRecord> TypeInformation<Row> 
convertToTypeInfo(
+      Class<T> avroClass, boolean legacyTimestampMapping) {
+    Preconditions.checkNotNull(avroClass, "Avro specific record class must not 
be null.");
+    // determine schema to retrieve deterministic field order
+    final Schema schema = SpecificData.get().getSchema(avroClass);
+    return (TypeInformation<Row>) convertToTypeInfo(schema, true);
+  }
+
+  /**
+   * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+   * data types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroSchemaString Avro schema definition string
+   * @return type information matching the schema
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> TypeInformation<T> convertToTypeInfo(String 
avroSchemaString) {
+    return convertToTypeInfo(avroSchemaString, true);
+  }
+
+  /**
+   * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+   * data types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroSchemaString Avro schema definition string
+   * @param legacyTimestampMapping legacy mapping of timestamp types
+   * @return type information matching the schema
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> TypeInformation<T> convertToTypeInfo(
+      String avroSchemaString, boolean legacyTimestampMapping) {
+    Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be 
null.");
+    final Schema schema;
+    try {
+      schema = new Schema.Parser().parse(avroSchemaString);
+    } catch (SchemaParseException e) {
+      throw new IllegalArgumentException("Could not parse Avro schema 
string.", e);
+    }
+    return (TypeInformation<T>) convertToTypeInfo(schema, 
legacyTimestampMapping);
+  }
+
+  private static TypeInformation<?> convertToTypeInfo(
+      Schema schema, boolean legacyTimestampMapping) {
+    switch (schema.getType()) {
+      case RECORD:
+        final List<Schema.Field> fields = schema.getFields();
+
+        final TypeInformation<?>[] types = new 
TypeInformation<?>[fields.size()];
+        final String[] names = new String[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
+          final Schema.Field field = fields.get(i);
+          types[i] = convertToTypeInfo(field.schema(), legacyTimestampMapping);
+          names[i] = field.name();
+        }
+        return Types.ROW_NAMED(names, types);
+      case ENUM:
+        return Types.STRING;
+      case ARRAY:
+        // result type might either be ObjectArrayTypeInfo or 
BasicArrayTypeInfo for Strings
+        return Types.OBJECT_ARRAY(
+            convertToTypeInfo(schema.getElementType(), 
legacyTimestampMapping));
+      case MAP:
+        return Types.MAP(
+            Types.STRING, convertToTypeInfo(schema.getValueType(), 
legacyTimestampMapping));
+      case UNION:
+        final Schema actualSchema;
+        if (schema.getTypes().size() == 2
+            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(1);
+        } else if (schema.getTypes().size() == 2
+            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(0);
+        } else if (schema.getTypes().size() == 1) {
+          actualSchema = schema.getTypes().get(0);
+        } else {
+          // use Kryo for serialization
+          return Types.GENERIC(Object.class);
+        }
+        return convertToTypeInfo(actualSchema, legacyTimestampMapping);
+      case FIXED:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          return Types.BIG_DEC;
+        }
+        // convert fixed size binary data to primitive byte arrays
+        return Types.PRIMITIVE_ARRAY(Types.BYTE);
+      case STRING:
+        // convert Avro's Utf8/CharSequence to String
+        return Types.STRING;
+      case BYTES:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          return Types.BIG_DEC;
+        }
+        return Types.PRIMITIVE_ARRAY(Types.BYTE);
+      case INT:
+        // logical date and time type
+        final org.apache.avro.LogicalType logicalType = 
schema.getLogicalType();
+        if (logicalType == LogicalTypes.date()) {
+          return Types.SQL_DATE;
+        } else if (logicalType == LogicalTypes.timeMillis()) {
+          return Types.SQL_TIME;
+        }
+        return Types.INT;
+      case LONG:
+        if (legacyTimestampMapping) {
+          if (schema.getLogicalType() == LogicalTypes.timestampMillis()
+              || schema.getLogicalType() == LogicalTypes.timestampMicros()
+              // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+              || (schema.getLogicalType() != null
+                  && 
schema.getLogicalType().getName().equals("timestamp-nanos"))) {
+            return Types.SQL_TIMESTAMP;
+          } else if (schema.getLogicalType() == LogicalTypes.timeMicros()
+              || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+            return Types.SQL_TIME;
+          }
+        } else {
+          // Avro logical timestamp types to Flink DataStream timestamp types
+          if (schema.getLogicalType() == LogicalTypes.timestampMillis()
+              || schema.getLogicalType() == LogicalTypes.timestampMicros()
+              // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+              || (schema.getLogicalType() != null
+                  && 
schema.getLogicalType().getName().equals("timestamp-nanos"))) {
+            return Types.INSTANT;
+          } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()
+              || schema.getLogicalType() == LogicalTypes.localTimestampMicros()
+              // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+              || (schema.getLogicalType() != null
+                  && 
schema.getLogicalType().getName().equals("local-timestamp-nanos"))) {
+            return Types.LOCAL_DATE_TIME;
+          } else if (schema.getLogicalType() == LogicalTypes.timeMicros()
+              || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+            return Types.SQL_TIME;
+          }
+        }
+        return Types.LONG;
+      case FLOAT:
+        return Types.FLOAT;
+      case DOUBLE:
+        return Types.DOUBLE;
+      case BOOLEAN:
+        return Types.BOOLEAN;
+      case NULL:
+        return Types.VOID;
+    }
+    throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
+  }
+
+  /**
+   * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+   * data types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroSchemaString Avro schema definition string
+   * @return data type matching the schema
+   */
+  public static DataType convertToDataType(String avroSchemaString) {
+    return convertToDataType(avroSchemaString, true);
+  }
+
+  /**
+   * Converts an Avro schema string into a nested row structure with 
deterministic field order and
+   * data types that are compatible with Flink's Table &amp; SQL API.
+   *
+   * @param avroSchemaString Avro schema definition string
+   * @param legacyTimestampMapping legacy mapping of local timestamps
+   * @return data type matching the schema
+   */
+  public static DataType convertToDataType(
+      String avroSchemaString, boolean legacyTimestampMapping) {
+    Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be 
null.");
+    final Schema schema;
+    try {
+      schema = new Schema.Parser().parse(avroSchemaString);
+    } catch (SchemaParseException e) {
+      throw new IllegalArgumentException("Could not parse Avro schema 
string.", e);
+    }
+    return convertToDataType(schema, legacyTimestampMapping);
+  }
+
+  @SuppressWarnings("deprecation")
+  private static DataType convertToDataType(Schema schema, boolean 
legacyMapping) {
+    switch (schema.getType()) {
+      case RECORD:
+        final List<Schema.Field> schemaFields = schema.getFields();
+
+        final DataTypes.Field[] fields = new 
DataTypes.Field[schemaFields.size()];
+        for (int i = 0; i < schemaFields.size(); i++) {
+          final Schema.Field field = schemaFields.get(i);
+          fields[i] =
+              DataTypes.FIELD(field.name(), convertToDataType(field.schema(), 
legacyMapping));
+        }
+        return DataTypes.ROW(fields).notNull();
+      case ENUM:
+        return DataTypes.STRING().notNull();
+      case ARRAY:
+        return DataTypes.ARRAY(convertToDataType(schema.getElementType(), 
legacyMapping)).notNull();
+      case MAP:
+        return DataTypes.MAP(
+                DataTypes.STRING().notNull(),
+                convertToDataType(schema.getValueType(), legacyMapping))
+            .notNull();
+      case UNION:
+        final Schema actualSchema;
+        final boolean nullable;
+        if (schema.getTypes().size() == 2
+            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(1);
+          nullable = true;
+        } else if (schema.getTypes().size() == 2
+            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+          actualSchema = schema.getTypes().get(0);
+          nullable = true;
+        } else if (schema.getTypes().size() == 1) {
+          actualSchema = schema.getTypes().get(0);
+          nullable = false;
+        } else {
+          // use Kryo for serialization
+          return new AtomicDataType(
+              new TypeInformationRawType<>(false, 
Types.GENERIC(Object.class)));
+        }
+        DataType converted = convertToDataType(actualSchema, legacyMapping);
+        return nullable ? converted.nullable() : converted;
+      case FIXED:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+          return DataTypes.DECIMAL(decimalType.getPrecision(), 
decimalType.getScale()).notNull();
+        }
+        // convert fixed size binary data to primitive byte arrays
+        return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
+      case STRING:
+        // convert Avro's Utf8/CharSequence to String
+        return DataTypes.STRING().notNull();
+      case BYTES:
+        // logical decimal type
+        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) 
schema.getLogicalType();
+          return DataTypes.DECIMAL(decimalType.getPrecision(), 
decimalType.getScale()).notNull();
+        }
+        return DataTypes.BYTES().notNull();
+      case INT:
+        // logical date and time type
+        final org.apache.avro.LogicalType logicalType = 
schema.getLogicalType();
+        if (logicalType == LogicalTypes.date()) {
+          return DataTypes.DATE().notNull();
+        } else if (logicalType == LogicalTypes.timeMillis()) {
+          return DataTypes.TIME(3).notNull();
+        }
+        return DataTypes.INT().notNull();
+      case LONG:
+        if (legacyMapping) {
+          // Avro logical timestamp types to Flink SQL timestamp types
+          if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+            return DataTypes.TIMESTAMP(3).notNull();
+          } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+            return DataTypes.TIMESTAMP(6).notNull();
+          } else if (schema.getLogicalType() != null
+              && schema.getLogicalType().getName().equals("timestamp-nanos")) {
+            // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+            return DataTypes.TIMESTAMP(9).notNull();
+          } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+            return DataTypes.TIME(3).notNull();
+          } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+            return DataTypes.TIME(6).notNull();
+          }
+        } else {
+          // Avro logical timestamp types to Flink SQL timestamp types
+          if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+            return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
+          } else if (schema.getLogicalType() == 
LogicalTypes.timestampMicros()) {
+            return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
+          } else if (schema.getLogicalType() != null
+              && schema.getLogicalType().getName().equals("timestamp-nanos")) {
+            // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+            return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull();
+          } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+            return DataTypes.TIME(3).notNull();
+          } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+            return DataTypes.TIME(6).notNull();
+          } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()) {
+            return DataTypes.TIMESTAMP(3).notNull();
+          } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMicros()) {
+            return DataTypes.TIMESTAMP(6).notNull();
+          } else if (schema.getLogicalType() != null
+              && 
schema.getLogicalType().getName().equals("local-timestamp-nanos")) {
+            // Iceberg: Added support for custom nanosecond logical type 
(FLINK-39251)
+            return DataTypes.TIMESTAMP(9).notNull();
+          }
+        }
+
+        return DataTypes.BIGINT().notNull();
+      case FLOAT:
+        return DataTypes.FLOAT().notNull();
+      case DOUBLE:
+        return DataTypes.DOUBLE().notNull();
+      case BOOLEAN:
+        return DataTypes.BOOLEAN().notNull();
+      case NULL:
+        return DataTypes.NULL();
+    }
+    throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
+  }
+
+  /**
+   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+   *
+   * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+   *
+   * @param schema the schema type, usually it should be the top level record 
type, e.g. not a
+   *     nested type
+   * @return Avro's {@link Schema} matching this logical type.
+   */
+  public static Schema convertToSchema(LogicalType schema) {
+    return convertToSchema(schema, true);
+  }
+
+  /**
+   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+   *
+   * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+   *
+   * @param schema the schema type, usually it should be the top level record 
type, e.g. not a
+   *     nested type
+   * @param legacyTimestampMapping whether to use the legacy timestamp mapping
+   * @return Avro's {@link Schema} matching this logical type.
+   */
+  public static Schema convertToSchema(LogicalType schema, boolean 
legacyTimestampMapping) {
+    return convertToSchema(
+        schema, "org.apache.flink.avro.generated.record", 
legacyTimestampMapping);
+  }
+
+  /**
+   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+   *
+   * <p>The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+   * schema. Nested record type that only differs with type name is still 
compatible.
+   *
+   * @param logicalType logical type
+   * @param rowName the record name
+   * @return Avro's {@link Schema} matching this logical type.
+   */
+  public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
+    return convertToSchema(logicalType, rowName, true);
+  }
+
+  /**
+   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
+   *
+   * <p>The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+   * schema. Nested record type that only differs with type name is still 
compatible.
+   *
+   * @param logicalType logical type
+   * @param rowName the record name
+   * @param legacyTimestampMapping whether to use legal timestamp mapping
+   * @return Avro's {@link Schema} matching this logical type.
+   */
+  public static Schema convertToSchema(
+      LogicalType logicalType, String rowName, boolean legacyTimestampMapping) 
{
+    int precision;
+    boolean nullable = logicalType.isNullable();
+    switch (logicalType.getTypeRoot()) {
+      case NULL:
+        return SchemaBuilder.builder().nullType();
+      case BOOLEAN:
+        Schema bool = SchemaBuilder.builder().booleanType();
+        return nullable ? nullableSchema(bool) : bool;
+      case TINYINT:
+      case SMALLINT:
+      case INTEGER:
+        Schema integer = SchemaBuilder.builder().intType();
+        return nullable ? nullableSchema(integer) : integer;
+      case BIGINT:
+        Schema bigint = SchemaBuilder.builder().longType();
+        return nullable ? nullableSchema(bigint) : bigint;
+      case FLOAT:
+        Schema floatSchema = SchemaBuilder.builder().floatType();
+        return nullable ? nullableSchema(floatSchema) : floatSchema;
+      case DOUBLE:
+        Schema doubleSchema = SchemaBuilder.builder().doubleType();
+        return nullable ? nullableSchema(doubleSchema) : doubleSchema;
+      case CHAR:
+      case VARCHAR:
+        Schema str = SchemaBuilder.builder().stringType();
+        return nullable ? nullableSchema(str) : str;
+      case BINARY:
+      case VARBINARY:
+        Schema binary = SchemaBuilder.builder().bytesType();
+        return nullable ? nullableSchema(binary) : binary;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+        // use long to represents Timestamp
+        final TimestampType timestampType = (TimestampType) logicalType;
+        precision = timestampType.getPrecision();
+        org.apache.avro.LogicalType avroLogicalType;
+        if (legacyTimestampMapping) {
+          if (precision <= 3) {
+            avroLogicalType = LogicalTypes.timestampMillis();
+          } else {
+            throw new IllegalArgumentException(
+                "Avro does not support TIMESTAMP type "
+                    + "with precision: "
+                    + precision
+                    + ", it only supports precision less than 3.");
+          }
+        } else {
+          if (precision <= 3) {
+            avroLogicalType = LogicalTypes.localTimestampMillis();
+          } else if (precision <= 6) {
+            avroLogicalType = LogicalTypes.localTimestampMicros();
+          } else {
+            throw new IllegalArgumentException(
+                "Avro does not support LOCAL TIMESTAMP type "
+                    + "with precision: "
+                    + precision
+                    + ", it only supports precision less than 6.");
+          }
+        }
+        Schema timestamp = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+        return nullable ? nullableSchema(timestamp) : timestamp;
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        if (legacyTimestampMapping) {
+          throw new UnsupportedOperationException(
+              "Unsupported to derive Schema for type: " + logicalType);
+        } else {
+          final LocalZonedTimestampType localZonedTimestampType =
+              (LocalZonedTimestampType) logicalType;
+          precision = localZonedTimestampType.getPrecision();
+          if (precision <= 3) {
+            avroLogicalType = LogicalTypes.timestampMillis();
+          } else if (precision <= 6) {
+            avroLogicalType = LogicalTypes.timestampMicros();
+          } else {
+            throw new IllegalArgumentException(
+                "Avro does not support TIMESTAMP type "
+                    + "with precision: "
+                    + precision
+                    + ", it only supports precision less than 6.");
+          }
+          timestamp = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+          return nullable ? nullableSchema(timestamp) : timestamp;
+        }
+      case DATE:
+        // use int to represents Date
+        Schema date = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+        return nullable ? nullableSchema(date) : date;
+      case TIME_WITHOUT_TIME_ZONE:
+        precision = ((TimeType) logicalType).getPrecision();
+        if (precision > 3) {
+          throw new IllegalArgumentException(
+              "Avro does not support TIME type with precision: "
+                  + precision
+                  + ", it only supports precision less than 3.");
+        }
+        // use int to represents Time, we only support millisecond when 
deserialization
+        Schema time = 
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+        return nullable ? nullableSchema(time) : time;
+      case DECIMAL:
+        DecimalType decimalType = (DecimalType) logicalType;
+        // store BigDecimal as byte[]
+        Schema decimal =
+            LogicalTypes.decimal(decimalType.getPrecision(), 
decimalType.getScale())
+                .addToSchema(SchemaBuilder.builder().bytesType());
+        return nullable ? nullableSchema(decimal) : decimal;
+      case ROW:
+        RowType rowType = (RowType) logicalType;
+        List<String> fieldNames = rowType.getFieldNames();
+        // we have to make sure the record name is different in a Schema
+        SchemaBuilder.FieldAssembler<Schema> builder =
+            SchemaBuilder.builder().record(rowName).fields();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+          String fieldName = fieldNames.get(i);
+          LogicalType fieldType = rowType.getTypeAt(i);
+          SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+              builder
+                  .name(fieldName)
+                  .type(
+                      convertToSchema(
+                          fieldType, rowName + "_" + fieldName, 
legacyTimestampMapping));
+
+          if (fieldType.isNullable()) {
+            builder = fieldBuilder.withDefault(null);
+          } else {
+            builder = fieldBuilder.noDefault();
+          }
+        }
+        Schema record = builder.endRecord();
+        return nullable ? nullableSchema(record) : record;
+      case MULTISET:
+      case MAP:
+        Schema map =
+            SchemaBuilder.builder()
+                .map()
+                
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
+        return nullable ? nullableSchema(map) : map;
+      case ARRAY:
+        ArrayType arrayType = (ArrayType) logicalType;
+        Schema array =
+            SchemaBuilder.builder()
+                .array()
+                .items(convertToSchema(arrayType.getElementType(), rowName));
+        return nullable ? nullableSchema(array) : array;
+      case RAW:
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported to derive Schema for type: " + logicalType);
+    }
+  }
+
+  public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
+    LogicalType keyType;
+    LogicalType valueType;
+    if (type instanceof MapType) {
+      MapType mapType = (MapType) type;
+      keyType = mapType.getKeyType();
+      valueType = mapType.getValueType();
+    } else {
+      MultisetType multisetType = (MultisetType) type;
+      keyType = multisetType.getElementType();
+      valueType = new IntType();
+    }
+    if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+      throw new UnsupportedOperationException(
+          "Avro format doesn't support non-string as key type of map. "
+              + "The key type is: "
+              + keyType.asSummaryString());
+    }
+    return valueType;
+  }
+
+  /** Returns schema with nullable true. */
+  private static Schema nullableSchema(Schema schema) {
+    return schema.isNullable()
+        ? schema
+        : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
index f7e8e0c884..5f3494330c 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
@@ -21,14 +21,14 @@ package org.apache.iceberg.flink.sink;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.formats.avro.AvroToRowDataConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
 
 /**
  * This util class converts Avro GenericRecord to Flink RowData. <br>
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
index 8ef1f1fbb8..d74b8b9d62 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
@@ -23,8 +23,6 @@ import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -32,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
 
 /**
  * This is not serializable because Avro {@link Schema} is not actually 
serializable, even though it
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
index b158b0871a..cfef780a4d 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
@@ -21,8 +21,6 @@ package org.apache.iceberg.flink.source.reader;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -31,6 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
 
 public class AvroGenericRecordConverter implements 
RowDataConverter<GenericRecord> {
   private final Schema avroSchema;
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
index e2cd411d70..795c4fa5a7 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
@@ -75,6 +75,11 @@ public class DataGenerators {
         OffsetDateTime.of(2022, 1, 10, 0, 0, 0, 0, ZoneOffset.UTC);
     private static final LocalDateTime JAVA_LOCAL_DATE_TIME_20220110 =
         LocalDateTime.of(2022, 1, 10, 0, 0, 0);
+    private static final OffsetDateTime JAVA_OFFSET_DATE_TIME_MAX_NANO =
+        OffsetDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807, 
ZoneOffset.UTC);
+    private static final LocalDateTime JAVA_LOCAL_DATE_TIME_MAX_NANO =
+        LocalDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807);
+    private static final long ICEBERG_MAX_NANOS_EPOCH = 9223372036854775807L;
     private static final BigDecimal BIG_DECIMAL_NEGATIVE = new 
BigDecimal("-1.50");
     private static final byte[] FIXED_BYTES = 
"012345689012345".getBytes(StandardCharsets.UTF_8);
 
@@ -96,7 +101,11 @@ public class DataGenerators {
             Types.NestedField.required(12, "uuid_field", Types.UUIDType.get()),
             Types.NestedField.required(13, "binary_field", 
Types.BinaryType.get()),
             Types.NestedField.required(14, "decimal_field", 
Types.DecimalType.of(9, 2)),
-            Types.NestedField.required(15, "fixed_field", 
Types.FixedType.ofLength(16)));
+            Types.NestedField.required(15, "fixed_field", 
Types.FixedType.ofLength(16)),
+            Types.NestedField.required(
+                16, "ts_ns_with_zone_field", 
Types.TimestampNanoType.withZone()),
+            Types.NestedField.required(
+                17, "ts_ns_without_zone_field", 
Types.TimestampNanoType.withoutZone()));
 
     private final RowType flinkRowType = 
FlinkSchemaUtil.convert(icebergSchema);
 
@@ -171,6 +180,8 @@ public class DataGenerators {
       genericRecord.setField("time_field", JAVA_LOCAL_TIME_HOUR8);
       genericRecord.setField("ts_with_zone_field", 
JAVA_OFFSET_DATE_TIME_20220110);
       genericRecord.setField("ts_without_zone_field", 
JAVA_LOCAL_DATE_TIME_20220110);
+      genericRecord.setField("ts_ns_with_zone_field", 
JAVA_OFFSET_DATE_TIME_MAX_NANO);
+      genericRecord.setField("ts_ns_without_zone_field", 
JAVA_LOCAL_DATE_TIME_MAX_NANO);
 
       byte[] uuidBytes = new byte[16];
       for (int i = 0; i < 16; ++i) {
@@ -220,7 +231,11 @@ public class DataGenerators {
           uuidBytes,
           binaryBytes,
           DecimalData.fromBigDecimal(BIG_DECIMAL_NEGATIVE, 9, 2),
-          FIXED_BYTES);
+          FIXED_BYTES,
+          TimestampData.fromEpochMillis(
+              ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int) 
(ICEBERG_MAX_NANOS_EPOCH % 1_000_000)),
+          TimestampData.fromEpochMillis(
+              ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int) 
(ICEBERG_MAX_NANOS_EPOCH % 1_000_000)));
     }
 
     @Override
@@ -236,10 +251,12 @@ public class DataGenerators {
 
       genericRecord.put("date_field", DAYS_BTW_EPOC_AND_20220110);
       genericRecord.put("time_field", HOUR_8_IN_MILLI);
-      // Although Avro logical type for timestamp fields are in micro seconds,
-      // AvroToRowDataConverters only looks for long value in milliseconds.
-      genericRecord.put("ts_with_zone_field", 
JODA_DATETIME_20220110.getMillis());
-      genericRecord.put("ts_without_zone_field", 
JODA_DATETIME_20220110.getMillis());
+      // Now that AvroToRowDataConverters correctly supports microseconds,
+      // we must inject correct microsecond scale values into the Avro data.
+      genericRecord.put("ts_with_zone_field", 
JODA_DATETIME_20220110.getMillis() * 1000L);
+      genericRecord.put("ts_without_zone_field", 
JODA_DATETIME_20220110.getMillis() * 1000L);
+      genericRecord.put("ts_ns_with_zone_field", ICEBERG_MAX_NANOS_EPOCH);
+      genericRecord.put("ts_ns_without_zone_field", ICEBERG_MAX_NANOS_EPOCH);
 
       byte[] uuidBytes = new byte[16];
       for (int i = 0; i < 16; ++i) {
@@ -554,7 +571,11 @@ public class DataGenerators {
         new Schema(
             Types.NestedField.required(1, "row_id", Types.StringType.get()),
             Types.NestedField.required(
-                2, "array_of_int", Types.ListType.ofOptional(101, 
Types.IntegerType.get())));
+                2, "array_of_int", Types.ListType.ofOptional(101, 
Types.IntegerType.get())),
+            Types.NestedField.optional(
+                3,
+                "array_of_ts_ns",
+                Types.ListType.ofRequired(102, 
Types.TimestampNanoType.withoutZone())));
 
     private final RowType flinkRowType = 
FlinkSchemaUtil.convert(icebergSchema);
 
@@ -581,13 +602,33 @@ public class DataGenerators {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
       genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("array_of_int", Arrays.asList(1, 2, 3));
+
+      LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0, 
123456789);
+      LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59, 
987654321);
+      genericRecord.setField("array_of_ts_ns", Arrays.asList(posNanos, 
negNanos));
       return genericRecord;
     }
 
     @Override
     public GenericRowData generateFlinkRowData() {
       Integer[] arr = {1, 2, 3};
-      return GenericRowData.of(StringData.fromString("row_id_value"), new 
GenericArrayData(arr));
+
+      long posNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+      long negNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+      TimestampData[] tsArr = {
+        TimestampData.fromEpochMillis(
+            Math.floorDiv(posNanos, 1_000_000L), (int) Math.floorMod(posNanos, 
1_000_000L)),
+        TimestampData.fromEpochMillis(
+            Math.floorDiv(negNanos, 1_000_000L), (int) Math.floorMod(negNanos, 
1_000_000L))
+      };
+      return GenericRowData.of(
+          StringData.fromString("row_id_value"),
+          new GenericArrayData(arr),
+          new GenericArrayData(tsArr));
     }
 
     @Override
@@ -595,6 +636,14 @@ public class DataGenerators {
       org.apache.avro.generic.GenericRecord genericRecord = new 
GenericData.Record(avroSchema);
       genericRecord.put("row_id", "row_id_value");
       genericRecord.put("array_of_int", Arrays.asList(1, 2, 3));
+
+      long posNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+      long negNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+      genericRecord.put("array_of_ts_ns", Arrays.asList(posNanos, negNanos));
       return genericRecord;
     }
   }
@@ -808,7 +857,12 @@ public class DataGenerators {
                 2,
                 "map_of_primitives",
                 Types.MapType.ofRequired(
-                    101, 102, Types.StringType.get(), 
Types.IntegerType.get())));
+                    101, 102, Types.StringType.get(), 
Types.IntegerType.get())),
+            Types.NestedField.optional(
+                3,
+                "map_of_ts_ns",
+                Types.MapType.ofRequired(
+                    103, 104, Types.StringType.get(), 
Types.TimestampNanoType.withoutZone())));
 
     private final RowType flinkRowType = 
FlinkSchemaUtil.convert(icebergSchema);
 
@@ -835,15 +889,37 @@ public class DataGenerators {
       GenericRecord genericRecord = GenericRecord.create(icebergSchema);
       genericRecord.setField("row_id", "row_id_value");
       genericRecord.setField("map_of_primitives", ImmutableMap.of("Jane", 1, 
"Joe", 2));
+
+      LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0, 
123456789);
+      LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59, 
987654321);
+      genericRecord.setField(
+          "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative", 
negNanos));
       return genericRecord;
     }
 
     @Override
     public GenericRowData generateFlinkRowData() {
+      long posNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+      long negNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+
       return GenericRowData.of(
           StringData.fromString("row_id_value"),
           new GenericMapData(
-              ImmutableMap.of(StringData.fromString("Jane"), 1, 
StringData.fromString("Joe"), 2)));
+              ImmutableMap.of(StringData.fromString("Jane"), 1, 
StringData.fromString("Joe"), 2)),
+          new GenericMapData(
+              ImmutableMap.of(
+                  StringData.fromString("positive"),
+                  TimestampData.fromEpochMillis(
+                      Math.floorDiv(posNanos, 1_000_000L),
+                      (int) Math.floorMod(posNanos, 1_000_000L)),
+                  StringData.fromString("negative"),
+                  TimestampData.fromEpochMillis(
+                      Math.floorDiv(negNanos, 1_000_000L),
+                      (int) Math.floorMod(negNanos, 1_000_000L)))));
     }
 
     @Override
@@ -851,6 +927,15 @@ public class DataGenerators {
       org.apache.avro.generic.GenericRecord genericRecord = new 
GenericData.Record(avroSchema);
       genericRecord.put("row_id", "row_id_value");
       genericRecord.put("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe", 
2));
+
+      long posNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+      long negNanos =
+          org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+              LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+      genericRecord.put(
+          "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative", 
negNanos));
       return genericRecord;
     }
   }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
index cd6964b5ed..0e7635a33e 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.data.RandomRowData;
 import org.apache.iceberg.util.StructLikeWrapper;
-import org.junit.jupiter.api.Disabled;
 
 public class TestRowDataWrapper extends RecordWrapperTestBase {
 
@@ -60,18 +59,6 @@ public class TestRowDataWrapper extends 
RecordWrapperTestBase {
         });
   }
 
-  @Disabled
-  @Override
-  public void testTimestampNanoWithoutZone() {
-    // Flink does not support nanosecond timestamp without zone.
-  }
-
-  @Disabled
-  @Override
-  public void testTimestampNanoWithZone() {
-    // Flink does not support nanosecond timestamp with zone.
-  }
-
   @Override
   protected void generateAndValidate(
       Schema schema, RecordWrapperTestBase.AssertMethod assertMethod) {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index 4a70802f2a..b7b0a54156 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -49,6 +49,11 @@ public class TestFlinkOrcReaderWriter extends DataTestBase {
     return true;
   }
 
+  @Override
+  protected boolean supportsTimestampNanos() {
+    return true;
+  }
+
   @Override
   protected void writeAndValidate(Schema schema) throws IOException {
     List<Record> expectedRecords = RandomGenericData.generate(schema, 
NUM_RECORDS, 1990L);
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
index 4e5b38ffb0..a2411da1e3 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
@@ -271,18 +271,19 @@ public class TestRowDataProjection {
         GenericRowData.of(
             StringData.fromString("row_id_value"),
             new GenericMapData(
-                ImmutableMap.of(StringData.fromString("foo"), 1, 
StringData.fromString("bar"), 2)));
+                ImmutableMap.of(StringData.fromString("foo"), 1, 
StringData.fromString("bar"), 2)),
+            null);
     testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData, 
true);
     testEqualsAndHashCode(schema, mapOnly, rowData, copyRowData, otherRowData);
     testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
 
     GenericRowData rowDataNullOptionalFields =
-        GenericRowData.of(StringData.fromString("row_id_value"), null);
+        GenericRowData.of(StringData.fromString("row_id_value"), null, null);
     GenericRowData copyRowDataNullOptionalFields =
-        GenericRowData.of(StringData.fromString("row_id_value"), null);
+        GenericRowData.of(StringData.fromString("row_id_value"), null, null);
     // modify the map field value
     GenericRowData otherRowDataNullOptionalFields =
-        GenericRowData.of(StringData.fromString("other_row_id_value"), null);
+        GenericRowData.of(StringData.fromString("other_row_id_value"), null, 
null);
     testEqualsAndHashCode(
         schema,
         idOnly,
@@ -432,7 +433,8 @@ public class TestRowDataProjection {
     GenericRowData otherRowData =
         GenericRowData.of(
             StringData.fromString("other_row_id_value"),
-            new GenericArrayData(new Integer[] {4, 5, 6}));
+            new GenericArrayData(new Integer[] {4, 5, 6}),
+            null);
     testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
     testEqualsAndHashCode(schema, arrayOnly, rowData, copyRowData, 
otherRowData);
     testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
@@ -440,16 +442,19 @@ public class TestRowDataProjection {
     GenericRowData rowDataNullOptionalFields =
         GenericRowData.of(
             StringData.fromString("row_id_value"),
-            new GenericArrayData(new Integer[] {1, null, 3}));
+            new GenericArrayData(new Integer[] {1, null, 3}),
+            null);
     GenericRowData copyRowDataNullOptionalFields =
         GenericRowData.of(
             StringData.fromString("row_id_value"),
-            new GenericArrayData(new Integer[] {1, null, 3}));
+            new GenericArrayData(new Integer[] {1, null, 3}),
+            null);
     // modify the map field value
     GenericRowData otherRowDataNullOptionalFields =
         GenericRowData.of(
             StringData.fromString("other_row_id_value"),
-            new GenericArrayData(new Integer[] {4, null, 6}));
+            new GenericArrayData(new Integer[] {4, null, 6}),
+            null);
     testEqualsAndHashCode(
         schema,
         idOnly,


Reply via email to