This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new f767dad20e Flink: Backport add Nanosecond Precision Support for
Flink-Iceberg Integration to Flink 1.20 (#16240)
f767dad20e is described below
commit f767dad20e9a47495ce37f1acf6acdffbf63664d
Author: pvary <[email protected]>
AuthorDate: Thu May 7 16:49:25 2026 +0200
Flink: Backport add Nanosecond Precision Support for Flink-Iceberg
Integration to Flink 1.20 (#16240)
Backports #15475
---
flink/v1.20/build.gradle | 1 +
.../org/apache/iceberg/flink/FlinkTypeToType.java | 6 +
.../org/apache/iceberg/flink/RowDataWrapper.java | 36 +-
.../apache/iceberg/flink/data/FlinkOrcReader.java | 7 +
.../apache/iceberg/flink/data/FlinkOrcWriter.java | 7 +
.../apache/iceberg/flink/data/FlinkOrcWriters.java | 37 ++
.../org/apache/iceberg/flink/data/RowDataUtil.java | 2 +
.../apache/iceberg/flink/data/StructRowData.java | 52 +-
.../formats/avro/AvroToRowDataConverters.java | 303 ++++++++++
.../iceberg/flink/formats/avro/JodaConverter.java | 69 +++
.../formats/avro/RowDataToAvroConverters.java | 394 +++++++++++++
.../avro/typeutils/AvroSchemaConverter.java | 625 +++++++++++++++++++++
.../sink/AvroGenericRecordToRowDataMapper.java | 4 +-
.../RowDataToAvroGenericRecordConverter.java | 4 +-
.../source/reader/AvroGenericRecordConverter.java | 4 +-
.../org/apache/iceberg/flink/DataGenerators.java | 105 +++-
.../apache/iceberg/flink/TestRowDataWrapper.java | 13 -
.../flink/data/TestFlinkOrcReaderWriter.java | 5 +
.../iceberg/flink/data/TestRowDataProjection.java | 21 +-
19 files changed, 1642 insertions(+), 53 deletions(-)
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index c7ca24817b..467b0fa8c9 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -33,6 +33,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-hive-metastore')
compileOnly libs.flink120.avro
+ compileOnly libs.joda.time
// dropwizard histogram metrics (optional in Flink)
compileOnly libs.flink120.metrics.dropwizard
compileOnly libs.flink120.streaming.java
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
index 408065f060..8f106da8d5 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
@@ -137,11 +137,17 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
@Override
public Type visit(TimestampType timestampType) {
+ if (timestampType.getPrecision() > 6) {
+ return Types.TimestampNanoType.withoutZone();
+ }
return Types.TimestampType.withoutZone();
}
@Override
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+ if (localZonedTimestampType.getPrecision() > 6) {
+ return Types.TimestampNanoType.withZone();
+ }
return Types.TimestampType.withZone();
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
index 3ef611f2de..920e44b24b 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
@@ -114,19 +114,35 @@ public class RowDataWrapper implements StructLike {
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
- return (row, pos) -> {
- LocalDateTime localDateTime =
- row.getTimestamp(pos,
timestampType.getPrecision()).toLocalDateTime();
- return DateTimeUtil.microsFromTimestamp(localDateTime);
- };
+ if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+ return (row, pos) -> {
+ LocalDateTime localDateTime =
+ row.getTimestamp(pos,
timestampType.getPrecision()).toLocalDateTime();
+ return DateTimeUtil.nanosFromTimestamp(localDateTime);
+ };
+ } else {
+ return (row, pos) -> {
+ LocalDateTime localDateTime =
+ row.getTimestamp(pos,
timestampType.getPrecision()).toLocalDateTime();
+ return DateTimeUtil.microsFromTimestamp(localDateTime);
+ };
+ }
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType;
- return (row, pos) -> {
- TimestampData timestampData = row.getTimestamp(pos,
lzTs.getPrecision());
- return timestampData.getMillisecond() * 1000
- + timestampData.getNanoOfMillisecond() / 1000;
- };
+ if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
+ return (row, pos) -> {
+ TimestampData timestampData = row.getTimestamp(pos,
lzTs.getPrecision());
+ return timestampData.getMillisecond() * 1_000_000L
+ + timestampData.getNanoOfMillisecond();
+ };
+ } else {
+ return (row, pos) -> {
+ TimestampData timestampData = row.getTimestamp(pos,
lzTs.getPrecision());
+ return timestampData.getMillisecond() * 1000L
+ + timestampData.getNanoOfMillisecond() / 1000;
+ };
+ }
case ROW:
RowType rowType = (RowType) logicalType;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 65b9d44ad4..3e3a29112c 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -112,6 +112,13 @@ public class FlinkOrcReader implements
OrcRowReader<RowData> {
} else {
return FlinkOrcReaders.timestamps();
}
+ case TIMESTAMP_NANO:
+ Types.TimestampNanoType timestampNanoType =
(Types.TimestampNanoType) iPrimitive;
+ if (timestampNanoType.shouldAdjustToUTC()) {
+ return FlinkOrcReaders.timestampTzs();
+ } else {
+ return FlinkOrcReaders.timestamps();
+ }
case STRING:
return FlinkOrcReaders.strings();
case UUID:
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
index a467d84833..c1b46252e1 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java
@@ -145,6 +145,13 @@ public class FlinkOrcWriter implements
OrcRowWriter<RowData> {
} else {
return FlinkOrcWriters.timestamps();
}
+ case TIMESTAMP_NANO:
+ Types.TimestampNanoType timestampNanoType =
(Types.TimestampNanoType) iPrimitive;
+ if (timestampNanoType.shouldAdjustToUTC()) {
+ return FlinkOrcWriters.timestampNanoTzs();
+ } else {
+ return FlinkOrcWriters.timestampNanos();
+ }
case STRING:
return FlinkOrcWriters.strings();
case UUID:
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
index 684842aa09..bf19a46c05 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
@@ -70,6 +70,14 @@ class FlinkOrcWriters {
return TimestampTzWriter.INSTANCE;
}
+ static OrcValueWriter<TimestampData> timestampNanos() {
+ return TimestampNanoWriter.INSTANCE;
+ }
+
+ static OrcValueWriter<TimestampData> timestampNanoTzs() {
+ return TimestampNanoTzWriter.INSTANCE;
+ }
+
static OrcValueWriter<DecimalData> decimals(int precision, int scale) {
if (precision <= 18) {
return new Decimal18Writer(precision, scale);
@@ -170,6 +178,35 @@ class FlinkOrcWriters {
}
}
+ private static class TimestampNanoWriter implements
OrcValueWriter<TimestampData> {
+ private static final TimestampNanoWriter INSTANCE = new
TimestampNanoWriter();
+
+ @Override
+ public void nonNullWrite(int rowId, TimestampData data, ColumnVector
output) {
+ TimestampColumnVector cv = (TimestampColumnVector) output;
+ cv.setIsUTC(true);
+ // millis
+ OffsetDateTime offsetDateTime =
data.toInstant().atOffset(ZoneOffset.UTC);
+ cv.time[rowId] =
+ offsetDateTime.toEpochSecond() * 1_000 + offsetDateTime.getNano() /
1_000_000;
+ cv.nanos[rowId] = offsetDateTime.getNano();
+ }
+ }
+
+ private static class TimestampNanoTzWriter implements
OrcValueWriter<TimestampData> {
+ private static final TimestampNanoTzWriter INSTANCE = new
TimestampNanoTzWriter();
+
+ @SuppressWarnings("JavaInstantGetSecondsGetNano")
+ @Override
+ public void nonNullWrite(int rowId, TimestampData data, ColumnVector
output) {
+ TimestampColumnVector cv = (TimestampColumnVector) output;
+ // millis
+ Instant instant = data.toInstant();
+ cv.time[rowId] = instant.toEpochMilli();
+ cv.nanos[rowId] = instant.getNano();
+ }
+ }
+
private static class Decimal18Writer implements OrcValueWriter<DecimalData> {
private final int precision;
private final int scale;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
index f23a7ee3d0..81bb559679 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java
@@ -69,6 +69,8 @@ public class RowDataUtil {
return (int) ((Long) value / 1000);
case TIMESTAMP: // TimestampData
return
TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value));
+ case TIMESTAMP_NANO:
+ return
TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromNanos((Long) value));
case UUID:
return UUIDUtil.convert((UUID) value);
default:
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
index 34576a1e5c..b469f2310f 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java
@@ -48,6 +48,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.DateTimeUtil;
@Internal
public class StructRowData implements RowData {
@@ -120,8 +121,8 @@ public class StructRowData implements RowData {
if (integer instanceof Integer) {
return (int) integer;
- } else if (integer instanceof LocalDate) {
- return (int) ((LocalDate) integer).toEpochDay();
+ } else if (integer instanceof LocalDate localDate) {
+ return (int) localDate.toEpochDay();
} else if (integer instanceof LocalTime) {
return (int) (((LocalTime) integer).toNanoOfDay() / 1000_000);
} else {
@@ -185,8 +186,27 @@ public class StructRowData implements RowData {
@Override
public TimestampData getTimestamp(int pos, int precision) {
+ if (precision > 6) {
+ Object timeVal = struct.get(pos, Object.class);
+ if (timeVal instanceof OffsetDateTime) {
+ OffsetDateTime odt = (OffsetDateTime) timeVal;
+ return TimestampData.fromEpochMillis(
+ odt.toInstant().toEpochMilli(), odt.getNano() % 1_000_000);
+ } else if (timeVal instanceof LocalDateTime) {
+ LocalDateTime ldt = (LocalDateTime) timeVal;
+ return TimestampData.fromEpochMillis(
+ ldt.toInstant(ZoneOffset.UTC).toEpochMilli(), ldt.getNano() %
1_000_000);
+ } else if (timeVal instanceof Long) {
+ long timeLong = (Long) timeVal;
+ return TimestampData.fromEpochMillis(
+ Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong,
1_000_000L));
+ } else {
+ throw new IllegalStateException("Unknown type for timestamp_ns: " +
timeVal.getClass());
+ }
+ }
long timeLong = getLong(pos);
- return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong %
1000) * 1000);
+ return TimestampData.fromEpochMillis(
+ Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 1000L) *
1000);
}
@Override
@@ -257,9 +277,29 @@ public class StructRowData implements RowData {
case DECIMAL:
return value;
case TIMESTAMP:
- long millisecond = (long) value / 1000;
- int nanoOfMillisecond = (int) ((Long) value % 1000) * 1000;
- return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
+ long timeMillis;
+ if (value instanceof LocalDateTime localDateTime) {
+ timeMillis = DateTimeUtil.microsFromTimestamp(localDateTime) / 1000L;
+ } else if (value instanceof OffsetDateTime offsetDateTime) {
+ timeMillis = DateTimeUtil.microsFromTimestamptz(offsetDateTime) /
1000L;
+ } else {
+ timeMillis = Math.floorDiv((Long) value, 1000L);
+ }
+ return TimestampData.fromEpochMillis(
+ timeMillis,
+ (int) Math.floorMod(value instanceof Long ? (Long) value :
timeMillis * 1000L, 1000L)
+ * 1000);
+ case TIMESTAMP_NANO:
+ long nanoLong;
+ if (value instanceof LocalDateTime localDateTime) {
+ nanoLong = DateTimeUtil.nanosFromTimestamp(localDateTime);
+ } else if (value instanceof OffsetDateTime offsetDateTime) {
+ nanoLong = DateTimeUtil.nanosFromTimestamptz(offsetDateTime);
+ } else {
+ nanoLong = (Long) value;
+ }
+ return TimestampData.fromEpochMillis(
+ Math.floorDiv(nanoLong, 1_000_000L), (int) Math.floorMod(nanoLong,
1_000_000L));
case STRING:
return StringData.fromString(value.toString());
case FIXED:
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
new file mode 100644
index 0000000000..0f70e60a1b
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+/**
+ * Tool class used to convert from Avro {@link GenericRecord} to {@link
RowData}.
+ *
+ * <p>This class is adapted in Iceberg to add support for nanosecond precision
timestamps
+ * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter
may be removed.
+ */
+@Internal
+public class AvroToRowDataConverters {
+
+ private AvroToRowDataConverters() {}
+
+ /**
+ * Runtime converter that converts Avro data structures into objects of
Flink Table & SQL
+ * internal data structures.
+ */
+ @FunctionalInterface
+ public interface AvroToRowDataConverter extends Serializable {
+ Object convert(Object object);
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Runtime Converters
+ //
-------------------------------------------------------------------------------------
+
+ public static AvroToRowDataConverter createRowConverter(RowType rowType) {
+ return createRowConverter(rowType, true);
+ }
+
+ public static AvroToRowDataConverter createRowConverter(
+ RowType rowType, boolean legacyTimestampMapping) {
+ final AvroToRowDataConverter[] fieldConverters =
+ rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .map(type -> createNullableConverter(type, legacyTimestampMapping))
+ .toArray(AvroToRowDataConverter[]::new);
+ final int arity = rowType.getFieldCount();
+
+ return avroObject -> {
+ IndexedRecord record = (IndexedRecord) avroObject;
+ GenericRowData row = new GenericRowData(arity);
+ for (int i = 0; i < arity; ++i) {
+ // avro always deserialize successfully even though the type isn't
matched
+ // so no need to throw exception about which field can't be
deserialized
+ row.setField(i, fieldConverters[i].convert(record.get(i)));
+ }
+ return row;
+ };
+ }
+
+ /** Creates a runtime converter which is null safe. */
+ private static AvroToRowDataConverter createNullableConverter(
+ LogicalType type, boolean legacyTimestampMapping) {
+ final AvroToRowDataConverter converter = createConverter(type,
legacyTimestampMapping);
+ return avroObject -> {
+ if (avroObject == null) {
+ return null;
+ }
+ return converter.convert(avroObject);
+ };
+ }
+
+ /** Creates a runtime converter which assuming input object is not null. */
+ private static AvroToRowDataConverter createConverter(
+ LogicalType type, boolean legacyTimestampMapping) {
+ switch (type.getTypeRoot()) {
+ case NULL:
+ return avroObject -> null;
+ case TINYINT:
+ return avroObject -> ((Integer) avroObject).byteValue();
+ case SMALLINT:
+ return avroObject -> ((Integer) avroObject).shortValue();
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ return avroObject -> avroObject;
+ case DATE:
+ return AvroToRowDataConverters::convertToDate;
+ case TIME_WITHOUT_TIME_ZONE:
+ return AvroToRowDataConverters::convertToTime;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+ return avroObject -> convertToTimestamp(avroObject, type);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (legacyTimestampMapping) {
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ } else {
+ // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+ return avroObject -> convertToTimestamp(avroObject, type);
+ }
+ case CHAR:
+ case VARCHAR:
+ return avroObject -> StringData.fromString(avroObject.toString());
+ case BINARY:
+ case VARBINARY:
+ return AvroToRowDataConverters::convertToBytes;
+ case DECIMAL:
+ return createDecimalConverter((DecimalType) type);
+ case ARRAY:
+ return createArrayConverter((ArrayType) type, legacyTimestampMapping);
+ case ROW:
+ return createRowConverter((RowType) type);
+ case MAP:
+ case MULTISET:
+ return createMapConverter(type, legacyTimestampMapping);
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+ }
+
+ private static AvroToRowDataConverter createDecimalConverter(DecimalType
decimalType) {
+ final int precision = decimalType.getPrecision();
+ final int scale = decimalType.getScale();
+ return avroObject -> {
+ final byte[] bytes;
+ if (avroObject instanceof GenericFixed) {
+ bytes = ((GenericFixed) avroObject).bytes();
+ } else if (avroObject instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) avroObject;
+ bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ } else {
+ bytes = (byte[]) avroObject;
+ }
+ return DecimalData.fromUnscaledBytes(bytes, precision, scale);
+ };
+ }
+
+ private static AvroToRowDataConverter createArrayConverter(
+ ArrayType arrayType, boolean legacyTimestampMapping) {
+ final AvroToRowDataConverter elementConverter =
+ createNullableConverter(arrayType.getElementType(),
legacyTimestampMapping);
+ final Class<?> elementClass =
+ LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+
+ return avroObject -> {
+ final List<?> list = (List<?>) avroObject;
+ final int length = list.size();
+ final Object[] array = (Object[]) Array.newInstance(elementClass,
length);
+ for (int i = 0; i < length; ++i) {
+ array[i] = elementConverter.convert(list.get(i));
+ }
+ return new GenericArrayData(array);
+ };
+ }
+
+ private static AvroToRowDataConverter createMapConverter(
+ LogicalType type, boolean legacyTimestampMapping) {
+ final AvroToRowDataConverter keyConverter =
+ createConverter(DataTypes.STRING().getLogicalType(),
legacyTimestampMapping);
+ final AvroToRowDataConverter valueConverter =
+ createNullableConverter(
+ AvroSchemaConverter.extractValueTypeToAvroMap(type),
legacyTimestampMapping);
+
+ return avroObject -> {
+ final Map<?, ?> map = (Map<?, ?>) avroObject;
+ Map<Object, Object> result = Maps.newHashMap();
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ Object key = keyConverter.convert(entry.getKey());
+ Object value = valueConverter.convert(entry.getValue());
+ result.put(key, value);
+ }
+ return new GenericMapData(result);
+ };
+ }
+
+ private static TimestampData convertToTimestamp(Object object, LogicalType
type) {
+ int precision = 3;
+ if (type instanceof org.apache.flink.table.types.logical.TimestampType) {
+ precision = ((org.apache.flink.table.types.logical.TimestampType)
type).getPrecision();
+ } else if (type instanceof
org.apache.flink.table.types.logical.LocalZonedTimestampType) {
+ precision =
+ ((org.apache.flink.table.types.logical.LocalZonedTimestampType)
type).getPrecision();
+ }
+
+ if (object instanceof Long) {
+ long timeLong = (Long) object;
+ if (precision <= 3) {
+ return TimestampData.fromEpochMillis(timeLong);
+ } else if (precision <= 6) {
+ return TimestampData.fromEpochMillis(
+ Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong,
1000L) * 1_000_000);
+ } else {
+ // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+ return TimestampData.fromEpochMillis(
+ Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong,
1_000_000L));
+ }
+ } else if (object instanceof Instant) {
+ return TimestampData.fromInstant((Instant) object);
+ } else if (object instanceof LocalDateTime) {
+ return TimestampData.fromLocalDateTime((LocalDateTime) object);
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ return
TimestampData.fromEpochMillis(jodaConverter.convertTimestamp(object));
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIMESTAMP logical type. Received: " +
object);
+ }
+ }
+ }
+
+ private static int convertToDate(Object object) {
+ if (object instanceof Integer) {
+ return (Integer) object;
+ } else if (object instanceof LocalDate) {
+ return (int) ((LocalDate) object).toEpochDay();
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ return (int) jodaConverter.convertDate(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for DATE logical type. Received: " +
object);
+ }
+ }
+ }
+
+ private static int convertToTime(Object object) {
+ final int millis;
+ if (object instanceof Integer) {
+ millis = (Integer) object;
+ } else if (object instanceof LocalTime) {
+ millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
+ } else {
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTime(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIME logical type. Received: " +
object);
+ }
+ }
+ return millis;
+ }
+
+ private static byte[] convertToBytes(Object object) {
+ if (object instanceof GenericFixed) {
+ return ((GenericFixed) object).bytes();
+ } else if (object instanceof ByteBuffer) {
+ ByteBuffer byteBuffer = (ByteBuffer) object;
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return bytes;
+ } else {
+ return (byte[]) object;
+ }
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
new file mode 100644
index 0000000000..c30b780233
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda
is available on the
+ * classpath.
+ */
+@SuppressWarnings("JavaUtilDate")
+class JodaConverter {
+
+ private static JodaConverter instance;
+ private static boolean instantiated = false;
+
+ public static JodaConverter getConverter() {
+ if (instantiated) {
+ return instance;
+ }
+
+ try {
+ Class.forName(
+ "org.joda.time.DateTime", false,
Thread.currentThread().getContextClassLoader());
+ instance = new JodaConverter();
+ } catch (ClassNotFoundException e) {
+ instance = null;
+ } finally {
+ instantiated = true;
+ }
+ return instance;
+ }
+
+ public long convertDate(Object object) {
+ final LocalDate value = (LocalDate) object;
+ return value.toDate().getTime();
+ }
+
+ public int convertTime(Object object) {
+ final LocalTime value = (LocalTime) object;
+ return value.get(DateTimeFieldType.millisOfDay());
+ }
+
+ public long convertTimestamp(Object object) {
+ final DateTime value = (DateTime) object;
+ return value.toDate().getTime();
+ }
+
+ private JodaConverter() {}
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
new file mode 100644
index 0000000000..d4c7e4282d
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+/**
+ * Tool class used to convert from {@link RowData} to Avro {@link
GenericRecord}.
+ *
+ * <p>This class is adapted in Iceberg to add support for nanosecond precision
timestamps
+ * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter
may be removed.
+ */
+@Internal
+public class RowDataToAvroConverters {
+
+ private RowDataToAvroConverters() {}
+
+ //
--------------------------------------------------------------------------------
+ // Runtime Converters
+ //
--------------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that converts objects of Flink Table & SQL internal
data structures to
+ * corresponding Avro data structures.
+ */
+ @FunctionalInterface
+ public interface RowDataToAvroConverter extends Serializable {
+ Object convert(Schema schema, Object object);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // IMPORTANT! We use anonymous classes instead of lambdas for a reason here.
It is
+ // necessary because the maven shade plugin cannot relocate classes in
+ // SerializedLambdas (MSHADE-260). On the other hand we want to relocate
Avro for
+ // sql-client uber jars.
+ //
--------------------------------------------------------------------------------
+
+ /**
+ * Creates a runtime converter according to the given logical type that
converts objects of Flink
+ * Table & SQL internal data structures to corresponding Avro data
structures.
+ */
+ public static RowDataToAvroConverter createConverter(LogicalType type) {
+ return createConverter(type, true);
+ }
+
+ @SuppressWarnings("checkstyle:MethodLength")
+ public static RowDataToAvroConverter createConverter(
+ LogicalType type, boolean legacyTimestampMapping) {
+ final RowDataToAvroConverter converter;
+ switch (type.getTypeRoot()) {
+ case NULL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return null;
+ }
+ };
+ break;
+ case TINYINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Byte) object).intValue();
+ }
+ };
+ break;
+ case SMALLINT:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ((Short) object).intValue();
+ }
+ };
+ break;
+ case BOOLEAN: // boolean
+ case INTEGER: // int
+ case INTERVAL_YEAR_MONTH: // long
+ case BIGINT: // long
+ case INTERVAL_DAY_TIME: // long
+ case FLOAT: // float
+ case DOUBLE: // double
+ case TIME_WITHOUT_TIME_ZONE: // int
+ case DATE: // int
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return object;
+ }
+ };
+ break;
+ case CHAR:
+ case VARCHAR:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return new Utf8(object.toString());
+ }
+ };
+ break;
+ case BINARY:
+ case VARBINARY:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap((byte[]) object);
+ }
+ };
+ break;
+ // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int tzPrecision;
+ if (type instanceof
org.apache.flink.table.types.logical.TimestampType) {
+ tzPrecision = ((org.apache.flink.table.types.logical.TimestampType)
type).getPrecision();
+ } else {
+ tzPrecision = 3;
+ }
+ if (legacyTimestampMapping) {
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ TimestampData timestampData = (TimestampData) object;
+ if (tzPrecision <= 3) {
+ return timestampData.getMillisecond();
+ } else if (tzPrecision <= 6) {
+ return timestampData.getMillisecond() * 1000L
+ + timestampData.getNanoOfMillisecond() / 1000;
+ } else {
+ // Iceberg: Added support for nanoseconds precision
(FLINK-39251)
+ return timestampData.getMillisecond() * 1_000_000L
+ + timestampData.getNanoOfMillisecond();
+ }
+ }
+ };
+ } else {
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ TimestampData timestampData = (TimestampData) object;
+ java.time.Instant instant =
+
timestampData.toLocalDateTime().toInstant(ZoneOffset.UTC);
+ if (tzPrecision <= 3) {
+ return instant.toEpochMilli();
+ } else if (tzPrecision <= 6) {
+ return instant.getEpochSecond() * 1_000_000L +
instant.getNano() / 1000;
+ } else {
+ // Iceberg: Added support for nanoseconds precision
(FLINK-39251)
+ return instant.getEpochSecond() * 1_000_000_000L +
instant.getNano();
+ }
+ }
+ };
+ }
+ break;
+ // Iceberg: Added support for nanoseconds precision (FLINK-39251)
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int ltzPrecision;
+ if (type instanceof
org.apache.flink.table.types.logical.LocalZonedTimestampType) {
+ ltzPrecision =
+ ((org.apache.flink.table.types.logical.LocalZonedTimestampType)
type).getPrecision();
+ } else {
+ ltzPrecision = 3;
+ }
+ if (legacyTimestampMapping) {
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ } else {
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ TimestampData timestampData = (TimestampData) object;
+ if (ltzPrecision <= 3) {
+ return timestampData.getMillisecond();
+ } else if (ltzPrecision <= 6) {
+ return timestampData.getMillisecond() * 1000L
+ + timestampData.getNanoOfMillisecond() / 1000;
+ } else {
+ // Iceberg: Added support for nanoseconds precision
(FLINK-39251)
+ return timestampData.getMillisecond() * 1_000_000L
+ + timestampData.getNanoOfMillisecond();
+ }
+ }
+ };
+ }
+ break;
+ case DECIMAL:
+ converter =
+ new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ return ByteBuffer.wrap(((DecimalData)
object).toUnscaledBytes());
+ }
+ };
+ break;
+ case ARRAY:
+ converter = createArrayConverter((ArrayType) type,
legacyTimestampMapping);
+ break;
+ case ROW:
+ converter = createRowConverter((RowType) type, legacyTimestampMapping);
+ break;
+ case MAP:
+ case MULTISET:
+ converter = createMapConverter(type, legacyTimestampMapping);
+ break;
+ case RAW:
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type);
+ }
+
+ // wrap into nullable converter
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ // get actual schema if it is a nullable schema
+ Schema actualSchema;
+ if (schema.getType() == Schema.Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ int size = types.size();
+ if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(0);
+ } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+ actualSchema = types.get(1);
+ } else {
+ throw new IllegalArgumentException(
+ "The Avro schema is not a nullable type: " +
schema.toString());
+ }
+ } else {
+ actualSchema = schema;
+ }
+ return converter.convert(actualSchema, object);
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createRowConverter(
+ RowType rowType, boolean legacyTimestampMapping) {
+ final RowDataToAvroConverter[] fieldConverters =
+ rowType.getChildren().stream()
+ .map(legacyType -> createConverter(legacyType,
legacyTimestampMapping))
+ .toArray(RowDataToAvroConverter[]::new);
+ final LogicalType[] fieldTypes =
+
rowType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new);
+ final RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[fieldTypes.length];
+ for (int i = 0; i < fieldTypes.length; i++) {
+ fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i);
+ }
+ final int length = rowType.getFieldCount();
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final RowData row = (RowData) object;
+ final List<Schema.Field> fields = schema.getFields();
+ final GenericRecord record = new GenericData.Record(schema);
+ for (int i = 0; i < length; ++i) {
+ final Schema.Field schemaField = fields.get(i);
+ try {
+ Object avroObject =
+ fieldConverters[i].convert(
+ schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
+ record.put(i, avroObject);
+ } catch (Throwable t) {
+ throw new RuntimeException(
+ String.format("Fail to serialize at field: %s.",
schemaField.name()), t);
+ }
+ }
+ return record;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createArrayConverter(
+ ArrayType arrayType, boolean legacyTimestampMapping) {
+ LogicalType elementType = arrayType.getElementType();
+ final ArrayData.ElementGetter elementGetter =
ArrayData.createElementGetter(elementType);
+ final RowDataToAvroConverter elementConverter =
+ createConverter(arrayType.getElementType(), legacyTimestampMapping);
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema elementSchema = schema.getElementType();
+ ArrayData arrayData = (ArrayData) object;
+ List<Object> list = Lists.newArrayList();
+ for (int i = 0; i < arrayData.size(); ++i) {
+ list.add(
+ elementConverter.convert(
+ elementSchema, elementGetter.getElementOrNull(arrayData,
i)));
+ }
+ return list;
+ }
+ };
+ }
+
+ private static RowDataToAvroConverter createMapConverter(
+ LogicalType type, boolean legacyTimestampMapping) {
+ LogicalType valueType =
AvroSchemaConverter.extractValueTypeToAvroMap(type);
+ final ArrayData.ElementGetter valueGetter =
ArrayData.createElementGetter(valueType);
+ final RowDataToAvroConverter valueConverter =
+ createConverter(valueType, legacyTimestampMapping);
+
+ return new RowDataToAvroConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(Schema schema, Object object) {
+ final Schema valueSchema = schema.getValueType();
+ final MapData mapData = (MapData) object;
+ final ArrayData keyArray = mapData.keyArray();
+ final ArrayData valueArray = mapData.valueArray();
+ final Map<Object, Object> map =
CollectionUtil.newHashMapWithExpectedSize(mapData.size());
+ for (int i = 0; i < mapData.size(); ++i) {
+ final String key = keyArray.getString(i).toString();
+ final Object value =
+ valueConverter.convert(valueSchema,
valueGetter.getElementOrNull(valueArray, i));
+ map.put(key, value);
+ }
+ return map;
+ }
+ };
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
new file mode 100644
index 0000000000..347631c7f4
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.formats.avro.typeutils;
+
+import java.util.List;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaParseException;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/**
+ * Converts an Avro schema into Flink's type information. It uses {@link
RowTypeInfo} for
+ * representing objects and converts Avro types into types that are compatible
with Flink's Table
+ * & SQL API.
+ *
+ * <p>Note: Changes in this class need to be kept in sync with the
corresponding runtime classes
+ * {@link AvroRowDataDeserializationSchema} and {@link
AvroRowDataSerializationSchema}.
+ *
+ * <p>This class is adapted in Iceberg to support custom 'timestamp-nanos' and
+ * 'local-timestamp-nanos' logical types (FLINK-39251). Once that ticket is
resolved in Flink, these
+ * custom types may be removed.
+ */
+public class AvroSchemaConverter {
+
+ private AvroSchemaConverter() {
+ // private
+ }
+
+ /**
+ * Converts an Avro class into a nested row structure with deterministic
field order and data
+ * types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroClass Avro specific record that contains schema information
+ * @return type information matching the schema
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends SpecificRecord> TypeInformation<Row>
convertToTypeInfo(
+ Class<T> avroClass) {
+ return convertToTypeInfo(avroClass, true);
+ }
+
+ /**
+ * Converts an Avro class into a nested row structure with deterministic
field order and data
+ * types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroClass Avro specific record that contains schema information
+ * @param legacyTimestampMapping legacy mapping of timestamp types
+ * @return type information matching the schema
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends SpecificRecord> TypeInformation<Row>
convertToTypeInfo(
+ Class<T> avroClass, boolean legacyTimestampMapping) {
+ Preconditions.checkNotNull(avroClass, "Avro specific record class must not
be null.");
+ // determine schema to retrieve deterministic field order
+ final Schema schema = SpecificData.get().getSchema(avroClass);
+ return (TypeInformation<Row>) convertToTypeInfo(schema, true);
+ }
+
+ /**
+ * Converts an Avro schema string into a nested row structure with
deterministic field order and
+ * data types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroSchemaString Avro schema definition string
+ * @return type information matching the schema
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> TypeInformation<T> convertToTypeInfo(String
avroSchemaString) {
+ return convertToTypeInfo(avroSchemaString, true);
+ }
+
+ /**
+ * Converts an Avro schema string into a nested row structure with
deterministic field order and
+ * data types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroSchemaString Avro schema definition string
+ * @param legacyTimestampMapping legacy mapping of timestamp types
+ * @return type information matching the schema
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> TypeInformation<T> convertToTypeInfo(
+ String avroSchemaString, boolean legacyTimestampMapping) {
+ Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be
null.");
+ final Schema schema;
+ try {
+ schema = new Schema.Parser().parse(avroSchemaString);
+ } catch (SchemaParseException e) {
+ throw new IllegalArgumentException("Could not parse Avro schema
string.", e);
+ }
+ return (TypeInformation<T>) convertToTypeInfo(schema,
legacyTimestampMapping);
+ }
+
+ private static TypeInformation<?> convertToTypeInfo(
+ Schema schema, boolean legacyTimestampMapping) {
+ switch (schema.getType()) {
+ case RECORD:
+ final List<Schema.Field> fields = schema.getFields();
+
+ final TypeInformation<?>[] types = new
TypeInformation<?>[fields.size()];
+ final String[] names = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ final Schema.Field field = fields.get(i);
+ types[i] = convertToTypeInfo(field.schema(), legacyTimestampMapping);
+ names[i] = field.name();
+ }
+ return Types.ROW_NAMED(names, types);
+ case ENUM:
+ return Types.STRING;
+ case ARRAY:
+ // result type might either be ObjectArrayTypeInfo or
BasicArrayTypeInfo for Strings
+ return Types.OBJECT_ARRAY(
+ convertToTypeInfo(schema.getElementType(),
legacyTimestampMapping));
+ case MAP:
+ return Types.MAP(
+ Types.STRING, convertToTypeInfo(schema.getValueType(),
legacyTimestampMapping));
+ case UNION:
+ final Schema actualSchema;
+ if (schema.getTypes().size() == 2
+ && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(1);
+ } else if (schema.getTypes().size() == 2
+ && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(0);
+ } else if (schema.getTypes().size() == 1) {
+ actualSchema = schema.getTypes().get(0);
+ } else {
+ // use Kryo for serialization
+ return Types.GENERIC(Object.class);
+ }
+ return convertToTypeInfo(actualSchema, legacyTimestampMapping);
+ case FIXED:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ return Types.BIG_DEC;
+ }
+ // convert fixed size binary data to primitive byte arrays
+ return Types.PRIMITIVE_ARRAY(Types.BYTE);
+ case STRING:
+ // convert Avro's Utf8/CharSequence to String
+ return Types.STRING;
+ case BYTES:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ return Types.BIG_DEC;
+ }
+ return Types.PRIMITIVE_ARRAY(Types.BYTE);
+ case INT:
+ // logical date and time type
+ final org.apache.avro.LogicalType logicalType =
schema.getLogicalType();
+ if (logicalType == LogicalTypes.date()) {
+ return Types.SQL_DATE;
+ } else if (logicalType == LogicalTypes.timeMillis()) {
+ return Types.SQL_TIME;
+ }
+ return Types.INT;
+ case LONG:
+ if (legacyTimestampMapping) {
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis()
+ || schema.getLogicalType() == LogicalTypes.timestampMicros()
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ || (schema.getLogicalType() != null
+ &&
schema.getLogicalType().getName().equals("timestamp-nanos"))) {
+ return Types.SQL_TIMESTAMP;
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()
+ || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ return Types.SQL_TIME;
+ }
+ } else {
+ // Avro logical timestamp types to Flink DataStream timestamp types
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis()
+ || schema.getLogicalType() == LogicalTypes.timestampMicros()
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ || (schema.getLogicalType() != null
+ &&
schema.getLogicalType().getName().equals("timestamp-nanos"))) {
+ return Types.INSTANT;
+ } else if (schema.getLogicalType() ==
LogicalTypes.localTimestampMillis()
+ || schema.getLogicalType() == LogicalTypes.localTimestampMicros()
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ || (schema.getLogicalType() != null
+ &&
schema.getLogicalType().getName().equals("local-timestamp-nanos"))) {
+ return Types.LOCAL_DATE_TIME;
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()
+ || schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ return Types.SQL_TIME;
+ }
+ }
+ return Types.LONG;
+ case FLOAT:
+ return Types.FLOAT;
+ case DOUBLE:
+ return Types.DOUBLE;
+ case BOOLEAN:
+ return Types.BOOLEAN;
+ case NULL:
+ return Types.VOID;
+ }
+ throw new IllegalArgumentException("Unsupported Avro type '" +
schema.getType() + "'.");
+ }
+
+ /**
+ * Converts an Avro schema string into a nested row structure with
deterministic field order and
+ * data types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroSchemaString Avro schema definition string
+ * @return data type matching the schema
+ */
+ public static DataType convertToDataType(String avroSchemaString) {
+ return convertToDataType(avroSchemaString, true);
+ }
+
+ /**
+ * Converts an Avro schema string into a nested row structure with
deterministic field order and
+ * data types that are compatible with Flink's Table & SQL API.
+ *
+ * @param avroSchemaString Avro schema definition string
+ * @param legacyTimestampMapping legacy mapping of local timestamps
+ * @return data type matching the schema
+ */
+ public static DataType convertToDataType(
+ String avroSchemaString, boolean legacyTimestampMapping) {
+ Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be
null.");
+ final Schema schema;
+ try {
+ schema = new Schema.Parser().parse(avroSchemaString);
+ } catch (SchemaParseException e) {
+ throw new IllegalArgumentException("Could not parse Avro schema
string.", e);
+ }
+ return convertToDataType(schema, legacyTimestampMapping);
+ }
+
+ @SuppressWarnings("deprecation")
+ private static DataType convertToDataType(Schema schema, boolean
legacyMapping) {
+ switch (schema.getType()) {
+ case RECORD:
+ final List<Schema.Field> schemaFields = schema.getFields();
+
+ final DataTypes.Field[] fields = new
DataTypes.Field[schemaFields.size()];
+ for (int i = 0; i < schemaFields.size(); i++) {
+ final Schema.Field field = schemaFields.get(i);
+ fields[i] =
+ DataTypes.FIELD(field.name(), convertToDataType(field.schema(),
legacyMapping));
+ }
+ return DataTypes.ROW(fields).notNull();
+ case ENUM:
+ return DataTypes.STRING().notNull();
+ case ARRAY:
+ return DataTypes.ARRAY(convertToDataType(schema.getElementType(),
legacyMapping)).notNull();
+ case MAP:
+ return DataTypes.MAP(
+ DataTypes.STRING().notNull(),
+ convertToDataType(schema.getValueType(), legacyMapping))
+ .notNull();
+ case UNION:
+ final Schema actualSchema;
+ final boolean nullable;
+ if (schema.getTypes().size() == 2
+ && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(1);
+ nullable = true;
+ } else if (schema.getTypes().size() == 2
+ && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+ actualSchema = schema.getTypes().get(0);
+ nullable = true;
+ } else if (schema.getTypes().size() == 1) {
+ actualSchema = schema.getTypes().get(0);
+ nullable = false;
+ } else {
+ // use Kryo for serialization
+ return new AtomicDataType(
+ new TypeInformationRawType<>(false,
Types.GENERIC(Object.class)));
+ }
+ DataType converted = convertToDataType(actualSchema, legacyMapping);
+ return nullable ? converted.nullable() : converted;
+ case FIXED:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ return DataTypes.DECIMAL(decimalType.getPrecision(),
decimalType.getScale()).notNull();
+ }
+ // convert fixed size binary data to primitive byte arrays
+ return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
+ case STRING:
+ // convert Avro's Utf8/CharSequence to String
+ return DataTypes.STRING().notNull();
+ case BYTES:
+ // logical decimal type
+ if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal)
schema.getLogicalType();
+ return DataTypes.DECIMAL(decimalType.getPrecision(),
decimalType.getScale()).notNull();
+ }
+ return DataTypes.BYTES().notNull();
+ case INT:
+ // logical date and time type
+ final org.apache.avro.LogicalType logicalType =
schema.getLogicalType();
+ if (logicalType == LogicalTypes.date()) {
+ return DataTypes.DATE().notNull();
+ } else if (logicalType == LogicalTypes.timeMillis()) {
+ return DataTypes.TIME(3).notNull();
+ }
+ return DataTypes.INT().notNull();
+ case LONG:
+ if (legacyMapping) {
+ // Avro logical timestamp types to Flink SQL timestamp types
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+ return DataTypes.TIMESTAMP(3).notNull();
+ } else if (schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
+ return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() != null
+ && schema.getLogicalType().getName().equals("timestamp-nanos")) {
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ return DataTypes.TIMESTAMP(9).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ return DataTypes.TIME(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+ return DataTypes.TIME(6).notNull();
+ }
+ } else {
+ // Avro logical timestamp types to Flink SQL timestamp types
+ if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
+ } else if (schema.getLogicalType() ==
LogicalTypes.timestampMicros()) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
+ } else if (schema.getLogicalType() != null
+ && schema.getLogicalType().getName().equals("timestamp-nanos")) {
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
+ return DataTypes.TIME(3).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
+ return DataTypes.TIME(6).notNull();
+ } else if (schema.getLogicalType() ==
LogicalTypes.localTimestampMillis()) {
+ return DataTypes.TIMESTAMP(3).notNull();
+ } else if (schema.getLogicalType() ==
LogicalTypes.localTimestampMicros()) {
+ return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() != null
+ &&
schema.getLogicalType().getName().equals("local-timestamp-nanos")) {
+ // Iceberg: Added support for custom nanosecond logical type
(FLINK-39251)
+ return DataTypes.TIMESTAMP(9).notNull();
+ }
+ }
+
+ return DataTypes.BIGINT().notNull();
+ case FLOAT:
+ return DataTypes.FLOAT().notNull();
+ case DOUBLE:
+ return DataTypes.DOUBLE().notNull();
+ case BOOLEAN:
+ return DataTypes.BOOLEAN().notNull();
+ case NULL:
+ return DataTypes.NULL();
+ }
+ throw new IllegalArgumentException("Unsupported Avro type '" +
schema.getType() + "'.");
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+ *
+ * @param schema the schema type, usually it should be the top level record
type, e.g. not a
+ * nested type
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType schema) {
+ return convertToSchema(schema, true);
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * <p>Use "org.apache.flink.avro.generated.record" as the type name.
+ *
+ * @param schema the schema type, usually it should be the top level record
type, e.g. not a
+ * nested type
+ * @param legacyTimestampMapping whether to use the legacy timestamp mapping
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType schema, boolean
legacyTimestampMapping) {
+ return convertToSchema(
+ schema, "org.apache.flink.avro.generated.record",
legacyTimestampMapping);
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * <p>The "{rowName}_" is used as the nested row type name prefix in order
to generate the right
+ * schema. Nested record type that only differs with type name is still
compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(LogicalType logicalType, String
rowName) {
+ return convertToSchema(logicalType, rowName, true);
+ }
+
+ /**
+ * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
+ *
+ * <p>The "{rowName}_" is used as the nested row type name prefix in order
to generate the right
+ * schema. Nested record type that only differs with type name is still
compatible.
+ *
+ * @param logicalType logical type
+ * @param rowName the record name
+ * @param legacyTimestampMapping whether to use legal timestamp mapping
+ * @return Avro's {@link Schema} matching this logical type.
+ */
+ public static Schema convertToSchema(
+ LogicalType logicalType, String rowName, boolean legacyTimestampMapping)
{
+ int precision;
+ boolean nullable = logicalType.isNullable();
+ switch (logicalType.getTypeRoot()) {
+ case NULL:
+ return SchemaBuilder.builder().nullType();
+ case BOOLEAN:
+ Schema bool = SchemaBuilder.builder().booleanType();
+ return nullable ? nullableSchema(bool) : bool;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ Schema integer = SchemaBuilder.builder().intType();
+ return nullable ? nullableSchema(integer) : integer;
+ case BIGINT:
+ Schema bigint = SchemaBuilder.builder().longType();
+ return nullable ? nullableSchema(bigint) : bigint;
+ case FLOAT:
+ Schema floatSchema = SchemaBuilder.builder().floatType();
+ return nullable ? nullableSchema(floatSchema) : floatSchema;
+ case DOUBLE:
+ Schema doubleSchema = SchemaBuilder.builder().doubleType();
+ return nullable ? nullableSchema(doubleSchema) : doubleSchema;
+ case CHAR:
+ case VARCHAR:
+ Schema str = SchemaBuilder.builder().stringType();
+ return nullable ? nullableSchema(str) : str;
+ case BINARY:
+ case VARBINARY:
+ Schema binary = SchemaBuilder.builder().bytesType();
+ return nullable ? nullableSchema(binary) : binary;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ // use long to represents Timestamp
+ final TimestampType timestampType = (TimestampType) logicalType;
+ precision = timestampType.getPrecision();
+ org.apache.avro.LogicalType avroLogicalType;
+ if (legacyTimestampMapping) {
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.timestampMillis();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support TIMESTAMP type "
+ + "with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ } else {
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.localTimestampMillis();
+ } else if (precision <= 6) {
+ avroLogicalType = LogicalTypes.localTimestampMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support LOCAL TIMESTAMP type "
+ + "with precision: "
+ + precision
+ + ", it only supports precision less than 6.");
+ }
+ }
+ Schema timestamp =
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullable ? nullableSchema(timestamp) : timestamp;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (legacyTimestampMapping) {
+ throw new UnsupportedOperationException(
+ "Unsupported to derive Schema for type: " + logicalType);
+ } else {
+ final LocalZonedTimestampType localZonedTimestampType =
+ (LocalZonedTimestampType) logicalType;
+ precision = localZonedTimestampType.getPrecision();
+ if (precision <= 3) {
+ avroLogicalType = LogicalTypes.timestampMillis();
+ } else if (precision <= 6) {
+ avroLogicalType = LogicalTypes.timestampMicros();
+ } else {
+ throw new IllegalArgumentException(
+ "Avro does not support TIMESTAMP type "
+ + "with precision: "
+ + precision
+ + ", it only supports precision less than 6.");
+ }
+ timestamp =
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+ return nullable ? nullableSchema(timestamp) : timestamp;
+ }
+ case DATE:
+ // use int to represents Date
+ Schema date =
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(date) : date;
+ case TIME_WITHOUT_TIME_ZONE:
+ precision = ((TimeType) logicalType).getPrecision();
+ if (precision > 3) {
+ throw new IllegalArgumentException(
+ "Avro does not support TIME type with precision: "
+ + precision
+ + ", it only supports precision less than 3.");
+ }
+ // use int to represents Time, we only support millisecond when
deserialization
+ Schema time =
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+ return nullable ? nullableSchema(time) : time;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) logicalType;
+ // store BigDecimal as byte[]
+ Schema decimal =
+ LogicalTypes.decimal(decimalType.getPrecision(),
decimalType.getScale())
+ .addToSchema(SchemaBuilder.builder().bytesType());
+ return nullable ? nullableSchema(decimal) : decimal;
+ case ROW:
+ RowType rowType = (RowType) logicalType;
+ List<String> fieldNames = rowType.getFieldNames();
+ // we have to make sure the record name is different in a Schema
+ SchemaBuilder.FieldAssembler<Schema> builder =
+ SchemaBuilder.builder().record(rowName).fields();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = fieldNames.get(i);
+ LogicalType fieldType = rowType.getTypeAt(i);
+ SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+ builder
+ .name(fieldName)
+ .type(
+ convertToSchema(
+ fieldType, rowName + "_" + fieldName,
legacyTimestampMapping));
+
+ if (fieldType.isNullable()) {
+ builder = fieldBuilder.withDefault(null);
+ } else {
+ builder = fieldBuilder.noDefault();
+ }
+ }
+ Schema record = builder.endRecord();
+ return nullable ? nullableSchema(record) : record;
+ case MULTISET:
+ case MAP:
+ Schema map =
+ SchemaBuilder.builder()
+ .map()
+
.values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName));
+ return nullable ? nullableSchema(map) : map;
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) logicalType;
+ Schema array =
+ SchemaBuilder.builder()
+ .array()
+ .items(convertToSchema(arrayType.getElementType(), rowName));
+ return nullable ? nullableSchema(array) : array;
+ case RAW:
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported to derive Schema for type: " + logicalType);
+ }
+ }
+
+ public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ } else {
+ MultisetType multisetType = (MultisetType) type;
+ keyType = multisetType.getElementType();
+ valueType = new IntType();
+ }
+ if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "Avro format doesn't support non-string as key type of map. "
+ + "The key type is: "
+ + keyType.asSummaryString());
+ }
+ return valueType;
+ }
+
+ /** Returns schema with nullable true. */
+ private static Schema nullableSchema(Schema schema) {
+ return schema.isNullable()
+ ? schema
+ : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
index f7e8e0c884..5f3494330c 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java
@@ -21,14 +21,14 @@ package org.apache.iceberg.flink.sink;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.formats.avro.AvroToRowDataConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
/**
* This util class converts Avro GenericRecord to Flink RowData. <br>
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
index 8ef1f1fbb8..d74b8b9d62 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java
@@ -23,8 +23,6 @@ import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -32,6 +30,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
/**
* This is not serializable because Avro {@link Schema} is not actually
serializable, even though it
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
index b158b0871a..cfef780a4d 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
@@ -21,8 +21,6 @@ package org.apache.iceberg.flink.source.reader;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.RowDataToAvroConverters;
-import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
@@ -31,6 +29,8 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter;
public class AvroGenericRecordConverter implements
RowDataConverter<GenericRecord> {
private final Schema avroSchema;
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
index e2cd411d70..795c4fa5a7 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java
@@ -75,6 +75,11 @@ public class DataGenerators {
OffsetDateTime.of(2022, 1, 10, 0, 0, 0, 0, ZoneOffset.UTC);
private static final LocalDateTime JAVA_LOCAL_DATE_TIME_20220110 =
LocalDateTime.of(2022, 1, 10, 0, 0, 0);
+ private static final OffsetDateTime JAVA_OFFSET_DATE_TIME_MAX_NANO =
+ OffsetDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807,
ZoneOffset.UTC);
+ private static final LocalDateTime JAVA_LOCAL_DATE_TIME_MAX_NANO =
+ LocalDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807);
+ private static final long ICEBERG_MAX_NANOS_EPOCH = 9223372036854775807L;
private static final BigDecimal BIG_DECIMAL_NEGATIVE = new
BigDecimal("-1.50");
private static final byte[] FIXED_BYTES =
"012345689012345".getBytes(StandardCharsets.UTF_8);
@@ -96,7 +101,11 @@ public class DataGenerators {
Types.NestedField.required(12, "uuid_field", Types.UUIDType.get()),
Types.NestedField.required(13, "binary_field",
Types.BinaryType.get()),
Types.NestedField.required(14, "decimal_field",
Types.DecimalType.of(9, 2)),
- Types.NestedField.required(15, "fixed_field",
Types.FixedType.ofLength(16)));
+ Types.NestedField.required(15, "fixed_field",
Types.FixedType.ofLength(16)),
+ Types.NestedField.required(
+ 16, "ts_ns_with_zone_field",
Types.TimestampNanoType.withZone()),
+ Types.NestedField.required(
+ 17, "ts_ns_without_zone_field",
Types.TimestampNanoType.withoutZone()));
private final RowType flinkRowType =
FlinkSchemaUtil.convert(icebergSchema);
@@ -171,6 +180,8 @@ public class DataGenerators {
genericRecord.setField("time_field", JAVA_LOCAL_TIME_HOUR8);
genericRecord.setField("ts_with_zone_field",
JAVA_OFFSET_DATE_TIME_20220110);
genericRecord.setField("ts_without_zone_field",
JAVA_LOCAL_DATE_TIME_20220110);
+ genericRecord.setField("ts_ns_with_zone_field",
JAVA_OFFSET_DATE_TIME_MAX_NANO);
+ genericRecord.setField("ts_ns_without_zone_field",
JAVA_LOCAL_DATE_TIME_MAX_NANO);
byte[] uuidBytes = new byte[16];
for (int i = 0; i < 16; ++i) {
@@ -220,7 +231,11 @@ public class DataGenerators {
uuidBytes,
binaryBytes,
DecimalData.fromBigDecimal(BIG_DECIMAL_NEGATIVE, 9, 2),
- FIXED_BYTES);
+ FIXED_BYTES,
+ TimestampData.fromEpochMillis(
+ ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int)
(ICEBERG_MAX_NANOS_EPOCH % 1_000_000)),
+ TimestampData.fromEpochMillis(
+ ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int)
(ICEBERG_MAX_NANOS_EPOCH % 1_000_000)));
}
@Override
@@ -236,10 +251,12 @@ public class DataGenerators {
genericRecord.put("date_field", DAYS_BTW_EPOC_AND_20220110);
genericRecord.put("time_field", HOUR_8_IN_MILLI);
- // Although Avro logical type for timestamp fields are in micro seconds,
- // AvroToRowDataConverters only looks for long value in milliseconds.
- genericRecord.put("ts_with_zone_field",
JODA_DATETIME_20220110.getMillis());
- genericRecord.put("ts_without_zone_field",
JODA_DATETIME_20220110.getMillis());
+ // Now that AvroToRowDataConverters correctly supports microseconds,
+ // we must inject correct microsecond scale values into the Avro data.
+ genericRecord.put("ts_with_zone_field",
JODA_DATETIME_20220110.getMillis() * 1000L);
+ genericRecord.put("ts_without_zone_field",
JODA_DATETIME_20220110.getMillis() * 1000L);
+ genericRecord.put("ts_ns_with_zone_field", ICEBERG_MAX_NANOS_EPOCH);
+ genericRecord.put("ts_ns_without_zone_field", ICEBERG_MAX_NANOS_EPOCH);
byte[] uuidBytes = new byte[16];
for (int i = 0; i < 16; ++i) {
@@ -554,7 +571,11 @@ public class DataGenerators {
new Schema(
Types.NestedField.required(1, "row_id", Types.StringType.get()),
Types.NestedField.required(
- 2, "array_of_int", Types.ListType.ofOptional(101,
Types.IntegerType.get())));
+ 2, "array_of_int", Types.ListType.ofOptional(101,
Types.IntegerType.get())),
+ Types.NestedField.optional(
+ 3,
+ "array_of_ts_ns",
+ Types.ListType.ofRequired(102,
Types.TimestampNanoType.withoutZone())));
private final RowType flinkRowType =
FlinkSchemaUtil.convert(icebergSchema);
@@ -581,13 +602,33 @@ public class DataGenerators {
GenericRecord genericRecord = GenericRecord.create(icebergSchema);
genericRecord.setField("row_id", "row_id_value");
genericRecord.setField("array_of_int", Arrays.asList(1, 2, 3));
+
+ LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0,
123456789);
+ LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59,
987654321);
+ genericRecord.setField("array_of_ts_ns", Arrays.asList(posNanos,
negNanos));
return genericRecord;
}
@Override
public GenericRowData generateFlinkRowData() {
Integer[] arr = {1, 2, 3};
- return GenericRowData.of(StringData.fromString("row_id_value"), new
GenericArrayData(arr));
+
+ long posNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+ long negNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+ TimestampData[] tsArr = {
+ TimestampData.fromEpochMillis(
+ Math.floorDiv(posNanos, 1_000_000L), (int) Math.floorMod(posNanos,
1_000_000L)),
+ TimestampData.fromEpochMillis(
+ Math.floorDiv(negNanos, 1_000_000L), (int) Math.floorMod(negNanos,
1_000_000L))
+ };
+ return GenericRowData.of(
+ StringData.fromString("row_id_value"),
+ new GenericArrayData(arr),
+ new GenericArrayData(tsArr));
}
@Override
@@ -595,6 +636,14 @@ public class DataGenerators {
org.apache.avro.generic.GenericRecord genericRecord = new
GenericData.Record(avroSchema);
genericRecord.put("row_id", "row_id_value");
genericRecord.put("array_of_int", Arrays.asList(1, 2, 3));
+
+ long posNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+ long negNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+ genericRecord.put("array_of_ts_ns", Arrays.asList(posNanos, negNanos));
return genericRecord;
}
}
@@ -808,7 +857,12 @@ public class DataGenerators {
2,
"map_of_primitives",
Types.MapType.ofRequired(
- 101, 102, Types.StringType.get(),
Types.IntegerType.get())));
+ 101, 102, Types.StringType.get(),
Types.IntegerType.get())),
+ Types.NestedField.optional(
+ 3,
+ "map_of_ts_ns",
+ Types.MapType.ofRequired(
+ 103, 104, Types.StringType.get(),
Types.TimestampNanoType.withoutZone())));
private final RowType flinkRowType =
FlinkSchemaUtil.convert(icebergSchema);
@@ -835,15 +889,37 @@ public class DataGenerators {
GenericRecord genericRecord = GenericRecord.create(icebergSchema);
genericRecord.setField("row_id", "row_id_value");
genericRecord.setField("map_of_primitives", ImmutableMap.of("Jane", 1,
"Joe", 2));
+
+ LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0,
123456789);
+ LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59,
987654321);
+ genericRecord.setField(
+ "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative",
negNanos));
return genericRecord;
}
@Override
public GenericRowData generateFlinkRowData() {
+ long posNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+ long negNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+
return GenericRowData.of(
StringData.fromString("row_id_value"),
new GenericMapData(
- ImmutableMap.of(StringData.fromString("Jane"), 1,
StringData.fromString("Joe"), 2)));
+ ImmutableMap.of(StringData.fromString("Jane"), 1,
StringData.fromString("Joe"), 2)),
+ new GenericMapData(
+ ImmutableMap.of(
+ StringData.fromString("positive"),
+ TimestampData.fromEpochMillis(
+ Math.floorDiv(posNanos, 1_000_000L),
+ (int) Math.floorMod(posNanos, 1_000_000L)),
+ StringData.fromString("negative"),
+ TimestampData.fromEpochMillis(
+ Math.floorDiv(negNanos, 1_000_000L),
+ (int) Math.floorMod(negNanos, 1_000_000L)))));
}
@Override
@@ -851,6 +927,15 @@ public class DataGenerators {
org.apache.avro.generic.GenericRecord genericRecord = new
GenericData.Record(avroSchema);
genericRecord.put("row_id", "row_id_value");
genericRecord.put("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe",
2));
+
+ long posNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789));
+ long negNanos =
+ org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp(
+ LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321));
+ genericRecord.put(
+ "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative",
negNanos));
return genericRecord;
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
index cd6964b5ed..0e7635a33e 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java
@@ -30,7 +30,6 @@ import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.util.StructLikeWrapper;
-import org.junit.jupiter.api.Disabled;
public class TestRowDataWrapper extends RecordWrapperTestBase {
@@ -60,18 +59,6 @@ public class TestRowDataWrapper extends
RecordWrapperTestBase {
});
}
- @Disabled
- @Override
- public void testTimestampNanoWithoutZone() {
- // Flink does not support nanosecond timestamp without zone.
- }
-
- @Disabled
- @Override
- public void testTimestampNanoWithZone() {
- // Flink does not support nanosecond timestamp with zone.
- }
-
@Override
protected void generateAndValidate(
Schema schema, RecordWrapperTestBase.AssertMethod assertMethod) {
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
index 4a70802f2a..b7b0a54156 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
@@ -49,6 +49,11 @@ public class TestFlinkOrcReaderWriter extends DataTestBase {
return true;
}
+ @Override
+ protected boolean supportsTimestampNanos() {
+ return true;
+ }
+
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema,
NUM_RECORDS, 1990L);
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
index 4e5b38ffb0..a2411da1e3 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java
@@ -271,18 +271,19 @@ public class TestRowDataProjection {
GenericRowData.of(
StringData.fromString("row_id_value"),
new GenericMapData(
- ImmutableMap.of(StringData.fromString("foo"), 1,
StringData.fromString("bar"), 2)));
+ ImmutableMap.of(StringData.fromString("foo"), 1,
StringData.fromString("bar"), 2)),
+ null);
testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData,
true);
testEqualsAndHashCode(schema, mapOnly, rowData, copyRowData, otherRowData);
testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
GenericRowData rowDataNullOptionalFields =
- GenericRowData.of(StringData.fromString("row_id_value"), null);
+ GenericRowData.of(StringData.fromString("row_id_value"), null, null);
GenericRowData copyRowDataNullOptionalFields =
- GenericRowData.of(StringData.fromString("row_id_value"), null);
+ GenericRowData.of(StringData.fromString("row_id_value"), null, null);
// modify the map field value
GenericRowData otherRowDataNullOptionalFields =
- GenericRowData.of(StringData.fromString("other_row_id_value"), null);
+ GenericRowData.of(StringData.fromString("other_row_id_value"), null,
null);
testEqualsAndHashCode(
schema,
idOnly,
@@ -432,7 +433,8 @@ public class TestRowDataProjection {
GenericRowData otherRowData =
GenericRowData.of(
StringData.fromString("other_row_id_value"),
- new GenericArrayData(new Integer[] {4, 5, 6}));
+ new GenericArrayData(new Integer[] {4, 5, 6}),
+ null);
testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData);
testEqualsAndHashCode(schema, arrayOnly, rowData, copyRowData,
otherRowData);
testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData);
@@ -440,16 +442,19 @@ public class TestRowDataProjection {
GenericRowData rowDataNullOptionalFields =
GenericRowData.of(
StringData.fromString("row_id_value"),
- new GenericArrayData(new Integer[] {1, null, 3}));
+ new GenericArrayData(new Integer[] {1, null, 3}),
+ null);
GenericRowData copyRowDataNullOptionalFields =
GenericRowData.of(
StringData.fromString("row_id_value"),
- new GenericArrayData(new Integer[] {1, null, 3}));
+ new GenericArrayData(new Integer[] {1, null, 3}),
+ null);
// modify the map field value
GenericRowData otherRowDataNullOptionalFields =
GenericRowData.of(
StringData.fromString("other_row_id_value"),
- new GenericArrayData(new Integer[] {4, null, 6}));
+ new GenericArrayData(new Integer[] {4, null, 6}),
+ null);
testEqualsAndHashCode(
schema,
idOnly,