This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new adfd4a4c80 Flink: Backport support UUID type in Avro and Parquet
readers and writers (#16333)
adfd4a4c80 is described below
commit adfd4a4c80dbe8a8e4ad874514c927ba32576716
Author: Joy Haldar <[email protected]>
AuthorDate: Thu May 14 19:27:48 2026 +0530
Flink: Backport support UUID type in Avro and Parquet readers and writers
(#16333)
Backport of #16097.
---
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 2 +-
.../iceberg/flink/data/FlinkParquetReaders.java | 6 +
.../iceberg/flink/data/FlinkParquetWriters.java | 6 +
.../iceberg/flink/data/FlinkValueWriters.java | 13 ++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 10 +-
.../apache/iceberg/flink/TestFlinkUuidType.java | 192 +++++++++++++++++++++
.../apache/iceberg/flink/data/FlinkAvroWriter.java | 2 +-
.../iceberg/flink/data/FlinkParquetReaders.java | 6 +
.../iceberg/flink/data/FlinkParquetWriters.java | 6 +
.../iceberg/flink/data/FlinkValueWriters.java | 13 ++
.../org/apache/iceberg/flink/sink/FlinkSink.java | 10 +-
.../apache/iceberg/flink/TestFlinkUuidType.java | 192 +++++++++++++++++++++
12 files changed, 444 insertions(+), 14 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 8741958392..b0f8a42171 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -132,7 +132,7 @@ public class FlinkAvroWriter implements
MetricsAwareDatumWriter<RowData> {
return FlinkValueWriters.decimal(decimal.getPrecision(),
decimal.getScale());
case "uuid":
- return ValueWriters.uuids();
+ return FlinkValueWriters.uuids();
default:
throw new IllegalArgumentException("Unsupported logical type: " +
logicalType);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4e650e9574..3de64aa998 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -294,6 +294,12 @@ public class FlinkParquetReaders {
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
}
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
+ return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+ }
}
@Override
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 8de42411cd..2957a48378 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -61,6 +61,7 @@ import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisi
import
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -287,6 +288,11 @@ public class FlinkParquetWriters {
public Optional<ParquetValueWriter<?>> visit(BsonLogicalTypeAnnotation
ignored) {
return Optional.of(byteArrays(desc));
}
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(UUIDLogicalTypeAnnotation
uuid) {
+ return Optional.of(byteArrays(desc));
+ }
}
private static ParquetValueWriter<?> ints(LogicalType type, ColumnDescriptor
desc) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
index f87e637049..66e944eb52 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
@@ -55,6 +55,10 @@ public class FlinkValueWriters {
return TimestampNanosWriter.INSTANCE;
}
+ static ValueWriter<byte[]> uuids() {
+ return UUIDWriter.INSTANCE;
+ }
+
static ValueWriter<DecimalData> decimal(int precision, int scale) {
return new DecimalWriter(precision, scale);
}
@@ -145,6 +149,15 @@ public class FlinkValueWriters {
}
}
+ private static class UUIDWriter implements ValueWriter<byte[]> {
+ private static final UUIDWriter INSTANCE = new UUIDWriter();
+
+ @Override
+ public void write(byte[] bytes, Encoder encoder) throws IOException {
+ encoder.writeFixed(bytes);
+ }
+ }
+
private static class ArrayWriter<T> implements ValueWriter<ArrayData> {
private final ValueWriter<T> elementWriter;
private final ArrayData.ElementGetter elementGetter;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 40099c28c0..90fbdc5c23 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -730,9 +730,8 @@ public class FlinkSink {
@Deprecated
static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing
- // iceberg schema.
- Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ // Convert the flink schema to iceberg schema using the table schema as
the reference.
+ Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will
@@ -747,9 +746,8 @@ public class FlinkSink {
static RowType toFlinkRowType(Schema schema, ResolvedSchema requestedSchema)
{
if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing
- // iceberg schema.
- Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ // Convert the flink schema to iceberg schema using the table schema as
the reference.
+ Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
new file mode 100644
index 0000000000..cac258e7ad
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkUuidType extends CatalogTestBase {
+ private static final String TABLE_NAME = "test_table";
+ private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+ private static final UUID EXPECTED_UUID =
UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "uuid", Types.UUIDType.get()));
+
+ private Table icebergTable;
+ @TempDir private Path warehouseDir;
+
+ @Parameter(index = 2)
+ private FileFormat fileFormat;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, fileFormat={2}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.PARQUET},
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.AVRO},
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.ORC});
+ }
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ /** Writes UUID via Generic writer, reads via Flink. */
+ @TestTemplate
+ void uuidWrittenByGenericWriter() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ Record record =
+ GenericRecord.create(icebergTable.schema()).copy("id", 1, "uuid",
EXPECTED_UUID);
+ new GenericAppenderHelper(icebergTable, fileFormat, warehouseDir)
+ .appendToTable(ImmutableList.of(record));
+ icebergTable.refresh();
+
+ List<GenericRowData> genericRowData = Lists.newArrayList();
+ try (CloseableIterable<CombinedScanTask> combinedScanTasks =
+ icebergTable.newScan().planTasks()) {
+ for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+ try (DataIterator<RowData> dataIterator =
+ ReaderUtil.createDataIterator(
+ combinedScanTask, icebergTable.schema(),
icebergTable.schema())) {
+ while (dataIterator.hasNext()) {
+ GenericRowData rowData = (GenericRowData) dataIterator.next();
+ genericRowData.add(rowData);
+ }
+ }
+ }
+ }
+
+ assertThat(genericRowData).hasSize(1);
+ assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
+ byte[] bytes = (byte[]) genericRowData.get(0).getField(1);
+ assertThat(bytes).hasSize(16);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ UUID actualUuid = new UUID(byteBuffer.getLong(), byteBuffer.getLong());
+ assertThat(actualUuid).isEqualTo(EXPECTED_UUID);
+ }
+
+ /** Writes UUID via Flink TaskWriter, reads via Generic reader. */
+ @TestTemplate
+ void writeUuidViaFlinkWriter() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
+ RowDataTaskWriterFactory rowDataTaskWriterFactory =
+ new RowDataTaskWriterFactory(
+ icebergTable,
+ rowType,
+ TARGET_FILE_SIZE,
+ fileFormat,
+ icebergTable.properties(),
+ null,
+ false);
+ rowDataTaskWriterFactory.initialize(1, 1);
+
+ byte[] uuidBytes = UUIDUtil.convert(EXPECTED_UUID);
+ GenericRowData genericRowData = GenericRowData.of(1, uuidBytes);
+
+ try (TaskWriter<RowData> writer = rowDataTaskWriterFactory.create()) {
+ writer.write(genericRowData);
+ writer.close();
+
+ AppendFiles append = icebergTable.newAppend();
+ for (DataFile dataFile : writer.dataFiles()) {
+ append.appendFile(dataFile);
+ }
+
+ append.commit();
+ }
+
+ List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+ assertThat(records).hasSize(1);
+ assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+ }
+
+ /** Writes UUID via SQL INSERT, reads via Generic reader. */
+ @TestTemplate
+ void sqlInsertUuid() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ String uuidHex = EXPECTED_UUID.toString().replace("-", "");
+ sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME,
uuidHex);
+
+ List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+ assertThat(records).hasSize(1);
+ assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+ }
+}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
index 8741958392..b0f8a42171 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java
@@ -132,7 +132,7 @@ public class FlinkAvroWriter implements
MetricsAwareDatumWriter<RowData> {
return FlinkValueWriters.decimal(decimal.getPrecision(),
decimal.getScale());
case "uuid":
- return ValueWriters.uuids();
+ return FlinkValueWriters.uuids();
default:
throw new IllegalArgumentException("Unsupported logical type: " +
logicalType);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
index 4e650e9574..3de64aa998 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
@@ -294,6 +294,12 @@ public class FlinkParquetReaders {
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
}
+
+ @Override
+ public Optional<ParquetValueReader<?>> visit(
+ LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
+ return Optional.of(new ParquetValueReaders.ByteArrayReader(desc));
+ }
}
@Override
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 8de42411cd..2957a48378 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -61,6 +61,7 @@ import
org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisi
import
org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -287,6 +288,11 @@ public class FlinkParquetWriters {
public Optional<ParquetValueWriter<?>> visit(BsonLogicalTypeAnnotation
ignored) {
return Optional.of(byteArrays(desc));
}
+
+ @Override
+ public Optional<ParquetValueWriter<?>> visit(UUIDLogicalTypeAnnotation
uuid) {
+ return Optional.of(byteArrays(desc));
+ }
}
private static ParquetValueWriter<?> ints(LogicalType type, ColumnDescriptor
desc) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
index f87e637049..66e944eb52 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java
@@ -55,6 +55,10 @@ public class FlinkValueWriters {
return TimestampNanosWriter.INSTANCE;
}
+ static ValueWriter<byte[]> uuids() {
+ return UUIDWriter.INSTANCE;
+ }
+
static ValueWriter<DecimalData> decimal(int precision, int scale) {
return new DecimalWriter(precision, scale);
}
@@ -145,6 +149,15 @@ public class FlinkValueWriters {
}
}
+ private static class UUIDWriter implements ValueWriter<byte[]> {
+ private static final UUIDWriter INSTANCE = new UUIDWriter();
+
+ @Override
+ public void write(byte[] bytes, Encoder encoder) throws IOException {
+ encoder.writeFixed(bytes);
+ }
+ }
+
private static class ArrayWriter<T> implements ValueWriter<ArrayData> {
private final ValueWriter<T> elementWriter;
private final ArrayData.ElementGetter elementGetter;
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index c7a60d99b3..86194b4071 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -732,9 +732,8 @@ public class FlinkSink {
@Deprecated
static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing
- // iceberg schema.
- Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ // Convert the flink schema to iceberg schema using the table schema as
the reference.
+ Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will
@@ -749,9 +748,8 @@ public class FlinkSink {
static RowType toFlinkRowType(Schema schema, ResolvedSchema requestedSchema)
{
if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing
- // iceberg schema.
- Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
+ // Convert the flink schema to iceberg schema using the table schema as
the reference.
+ Schema writeSchema = FlinkSchemaUtil.convert(schema, requestedSchema);
TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
// We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
new file mode 100644
index 0000000000..cac258e7ad
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUuidType.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.UUIDUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestFlinkUuidType extends CatalogTestBase {
+ private static final String TABLE_NAME = "test_table";
+ private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+ private static final UUID EXPECTED_UUID =
UUID.fromString("0f8fad5b-d9cb-469f-a165-70867728950e");
+ private static final Schema SCHEMA =
+ new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "uuid", Types.UUIDType.get()));
+
+ private Table icebergTable;
+ @TempDir private Path warehouseDir;
+
+ @Parameter(index = 2)
+ private FileFormat fileFormat;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, fileFormat={2}")
+ protected static List<Object[]> parameters() {
+ return Arrays.asList(
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.PARQUET},
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.AVRO},
+ new Object[] {"testhadoop", Namespace.empty(), FileFormat.ORC});
+ }
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ }
+
+ /** Writes UUID via Generic writer, reads via Flink. */
+ @TestTemplate
+ void uuidWrittenByGenericWriter() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ Record record =
+ GenericRecord.create(icebergTable.schema()).copy("id", 1, "uuid",
EXPECTED_UUID);
+ new GenericAppenderHelper(icebergTable, fileFormat, warehouseDir)
+ .appendToTable(ImmutableList.of(record));
+ icebergTable.refresh();
+
+ List<GenericRowData> genericRowData = Lists.newArrayList();
+ try (CloseableIterable<CombinedScanTask> combinedScanTasks =
+ icebergTable.newScan().planTasks()) {
+ for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+ try (DataIterator<RowData> dataIterator =
+ ReaderUtil.createDataIterator(
+ combinedScanTask, icebergTable.schema(),
icebergTable.schema())) {
+ while (dataIterator.hasNext()) {
+ GenericRowData rowData = (GenericRowData) dataIterator.next();
+ genericRowData.add(rowData);
+ }
+ }
+ }
+ }
+
+ assertThat(genericRowData).hasSize(1);
+ assertThat(genericRowData.get(0).getField(1)).isInstanceOf(byte[].class);
+ byte[] bytes = (byte[]) genericRowData.get(0).getField(1);
+ assertThat(bytes).hasSize(16);
+
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ UUID actualUuid = new UUID(byteBuffer.getLong(), byteBuffer.getLong());
+ assertThat(actualUuid).isEqualTo(EXPECTED_UUID);
+ }
+
+ /** Writes UUID via Flink TaskWriter, reads via Generic reader. */
+ @TestTemplate
+ void writeUuidViaFlinkWriter() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
+ RowDataTaskWriterFactory rowDataTaskWriterFactory =
+ new RowDataTaskWriterFactory(
+ icebergTable,
+ rowType,
+ TARGET_FILE_SIZE,
+ fileFormat,
+ icebergTable.properties(),
+ null,
+ false);
+ rowDataTaskWriterFactory.initialize(1, 1);
+
+ byte[] uuidBytes = UUIDUtil.convert(EXPECTED_UUID);
+ GenericRowData genericRowData = GenericRowData.of(1, uuidBytes);
+
+ try (TaskWriter<RowData> writer = rowDataTaskWriterFactory.create()) {
+ writer.write(genericRowData);
+ writer.close();
+
+ AppendFiles append = icebergTable.newAppend();
+ for (DataFile dataFile : writer.dataFiles()) {
+ append.appendFile(dataFile);
+ }
+
+ append.commit();
+ }
+
+ List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+ assertThat(records).hasSize(1);
+ assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+ }
+
+ /** Writes UUID via SQL INSERT, reads via Generic reader. */
+ @TestTemplate
+ void sqlInsertUuid() throws Exception {
+ icebergTable =
+ validationCatalog.createTable(
+ TableIdentifier.of(icebergNamespace, TABLE_NAME),
+ SCHEMA,
+ PartitionSpec.unpartitioned(),
+ ImmutableMap.of(DEFAULT_FILE_FORMAT, fileFormat.name()));
+
+ String uuidHex = EXPECTED_UUID.toString().replace("-", "");
+ sql("INSERT INTO %s VALUES (1, CAST(X'%s' AS BINARY(16)))", TABLE_NAME,
uuidHex);
+
+ List<Record> records = SimpleDataUtil.tableRecords(icebergTable);
+ assertThat(records).hasSize(1);
+ assertThat(records.get(0).getField("uuid")).isEqualTo(EXPECTED_UUID);
+ }
+}