This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 1919ae65fb Flink: Support UUID type in Avro and Parquet readers and 
writers (#16097)
1919ae65fb is described below

commit 1919ae65fb6e836df3530562af693d1fb8f14b5b
Author: Joy Haldar <[email protected]>
AuthorDate: Thu May 14 17:41:01 2026 +0530

    Flink: Support UUID type in Avro and Parquet readers and writers (#16097)
---
 .../apache/iceberg/flink/data/FlinkAvroWriter.java |   2 +-
 .../iceberg/flink/data/FlinkParquetReaders.java    |   6 +
 .../iceberg/flink/data/FlinkParquetWriters.java    |   6 +
 .../iceberg/flink/data/FlinkValueWriters.java      |  13 ++
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  10 +-
 .../apache/iceberg/flink/TestFlinkUuidType.java    | 192 +++++++++++++++++++++
 6 files changed, 222 insertions(+), 7 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 8741958392..b0f8a42171 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -132,7 +132,7 @@ public class FlinkAvroWriter implements 
MetricsAwareDatumWriter<RowData> {
             return FlinkValueWriters.decimal(decimal.getPrecision(), 
decimal.getScale());
 
           case "uuid":
-            return ValueWriters.uuids();
+            return FlinkValueWriters.uuids();
 
           default:
             throw new IllegalArgumentException("Unsupported logical type: " + 
logicalType);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 36b7f8805f..0e6856daa6 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -311,6 +311,12 @@ public class FlinkParquetReaders {
           LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
         return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
       }
+
+      @Override
+      public Optional<ParquetValueReader<?>> visit(
+          LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
+        return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+      }
     }
 
     @Override
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index df51a25d21..d9a7e0dab9 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -75,6 +75,7 @@ import 
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisi
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
@@ -309,6 +310,11 @@ public class FlinkParquetWriters {
     public Optional<ParquetValueWriter<?>> visit(BsonLogicalTypeAnnotation 
ignored) {
       return Optional.of(byteArrays(desc));
     }
+
+    @Override
+    public Optional<ParquetValueWriter<?>> visit(UUIDLogicalTypeAnnotation 
uuid) {
+      return Optional.of(byteArrays(desc));
+    }
   }
 
   private static ParquetValueWriter<?> ints(LogicalType type, ColumnDescriptor 
desc) {
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
index f87e637049..66e944eb52 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
@@ -55,6 +55,10 @@ public class FlinkValueWriters {
     return TimestampNanosWriter.INSTANCE;
   }
 
+  static ValueWriter<byte[]> uuids() {
+    return UUIDWriter.INSTANCE;
+  }
+
   static ValueWriter<DecimalData> decimal(int precision, int scale) {
     return new DecimalWriter(precision, scale);
   }
@@ -145,6 +149,15 @@ public class FlinkValueWriters {
     }
   }
 
+  private static class UUIDWriter implements ValueWriter<byte[]> {
+    private static final UUIDWriter INSTANCE = new UUIDWriter();
+
+    @Override
+    public void write(byte[] bytes, Encoder encoder) throws IOException {
+      encoder.writeFixed(bytes);
+    }
+  }
+
   private static class ArrayWriter<T> implements ValueWriter<ArrayData> {
     private final ValueWriter<T> elementWriter;
     private final ArrayData.ElementGetter elementGetter;
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c7a60d99b3..86194b4071 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -732,9 +732,8 @@ public class FlinkSink {
   @Deprecated
   static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
     if (requestedSchema != null) {
-      // Convert the flink schema to iceberg schema firstly, then reassign ids 
to match the existing
-      // iceberg schema.
-      Schema writeSchema = 
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+      // Convert the flink schema to iceberg schema using the table schema as 
the reference.
+      Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
       TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
 
       // We use this flink schema to read values from RowData. The flink's 
TINYINT and SMALLINT will
@@ -749,9 +748,8 @@ public class FlinkSink {
 
   static RowType toFlinkRowType(Schema schema, ResolvedSchema requestedSchema) 
{
     if (requestedSchema != null) {
-      // Convert the flink schema to iceberg schema firstly, then reassign ids 
to match the existing
-      // iceberg schema.
-      Schema writeSchema = 
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+      // Convert the flink schema to iceberg schema using the table schema as 
the reference.
+      Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
       TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
 
       // We use this flink schema to read values from RowData. The flink's 
TINYINT and SMALLINT will
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
new file mode 100644
index 0000000000..cac258e7ad
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkUuidType extends CatalogTestBase {
+  private static final String TABLE_NAME = "test_table";
+  private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+  private static final UUID EXPECTED_UUID = 
UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
+  private static final Schema SCHEMA =
+      new Schema(
+          Types.NestedField.required(1, "id", Types.IntegerType.get()),
+          Types.NestedField.required(2, "uuid", Types.UUIDType.get()));
+
+  private Table icebergTable;
+  @TempDir private Path warehouseDir;
+
+  @Parameter(index = 2)
+  private FileFormat fileFormat;
+
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, fileFormat={2}")
+  protected static List<Object[]> parameters() {
+    return Arrays.asList(
+        new Object[] {"testhadoop", Namespace.empty(), FileFormat.PARQUET},
+        new Object[] {"testhadoop", Namespace.empty(), FileFormat.AVRO},
+        new Object[] {"testhadoop", Namespace.empty(), FileFormat.ORC});
+  }
+
+  @Override
+  @BeforeEach
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+  }
+
+  /** Writes UUID via Generic writer, reads via Flink. */
+  @TestTemplate
+  void uuidWrittenByGenericWriter() throws Exception {
+    icebergTable =
+        validationCatalog.createTable(
+            TableIdentifier.of(icebergNamespace, TABLE_NAME),
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+    Record record =
+        GenericRecord.create(icebergTable.schema()).copy("id", 1, "uuid", 
EXPECTED_UUID);
+    new GenericAppenderHelper(icebergTable, fileFormat, warehouseDir)
+        .appendToTable(ImmutableList.of(record));
+    icebergTable.refresh();
+
+    List<GenericRowData> genericRowData = Lists.newArrayList();
+    try (CloseableIterable<CombinedScanTask> combinedScanTasks =
+        icebergTable.newScan().planTasks()) {
+      for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+        try (DataIterator<RowData> dataIterator =
+            ReaderUtil.createDataIterator(
+                combinedScanTask, icebergTable.schema(), 
icebergTable.schema())) {
+          while (dataIterator.hasNext()) {
+            GenericRowData rowData = (GenericRowData) dataIterator.next();
+            genericRowData.add(rowData);
+          }
+        }
+      }
+    }
+
+    assertThat(genericRowData).hasSize(1);
+    assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
+    byte[] bytes = (byte[]) genericRowData.get(0).getField(1);
+    assertThat(bytes).hasSize(16);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    UUID actualUuid = new UUID(byteBuffer.getLong(), byteBuffer.getLong());
+    assertThat(actualUuid).isEqualTo(EXPECTED_UUID);
+  }
+
+  /** Writes UUID via Flink TaskWriter, reads via Generic reader. */
+  @TestTemplate
+  void writeUuidViaFlinkWriter() throws Exception {
+    icebergTable =
+        validationCatalog.createTable(
+            TableIdentifier.of(icebergNamespace, TABLE_NAME),
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+    RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
+    RowDataTaskWriterFactory rowDataTaskWriterFactory =
+        new RowDataTaskWriterFactory(
+            icebergTable,
+            rowType,
+            TARGET_FILE_SIZE,
+            fileFormat,
+            icebergTable.properties(),
+            null,
+            false);
+    rowDataTaskWriterFactory.initialize(1, 1);
+
+    byte[] uuidBytes = UUIDUtil.convert(EXPECTED_UUID);
+    GenericRowData genericRowData = GenericRowData.of(1, uuidBytes);
+
+    try (TaskWriter<RowData> writer = rowDataTaskWriterFactory.create()) {
+      writer.write(genericRowData);
+      writer.close();
+
+      AppendFiles append = icebergTable.newAppend();
+      for (DataFile dataFile : writer.dataFiles()) {
+        append.appendFile(dataFile);
+      }
+
+      append.commit();
+    }
+
+    List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+    assertThat(records).hasSize(1);
+    assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+  }
+
+  /** Writes UUID via SQL INSERT, reads via Generic reader. */
+  @TestTemplate
+  void sqlInsertUuid() throws Exception {
+    icebergTable =
+        validationCatalog.createTable(
+            TableIdentifier.of(icebergNamespace, TABLE_NAME),
+            SCHEMA,
+            PartitionSpec.unpartitioned(),
+            ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+    String uuidHex = EXPECTED_UUID.toString().replace("-", "");
+    sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME, 
uuidHex);
+
+    List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+    assertThat(records).hasSize(1);
+    assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+  }
+}

Reply via email to