This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0547af078c Flink: Fix write unknown type to ORC exception and add ut
for unknown type (#14761)
0547af078c is described below
commit 0547af078c9d21b8642a4efcb3cbc85756547f22
Author: GuoYu <[email protected]>
AuthorDate: Tue Dec 9 22:56:28 2025 +0800
Flink: Fix write unknown type to ORC exception and add ut for unknown type
(#14761)
---
.../apache/iceberg/flink/data/FlinkOrcWriters.java | 19 ++-
.../iceberg/flink/data/FlinkSchemaVisitor.java | 4 +-
.../apache/iceberg/flink/TestFlinkUnknownType.java | 179 +++++++++++++++++++++
.../iceberg/flink/source/reader/ReaderUtil.java | 10 +-
4 files changed, 206 insertions(+), 6 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
index afce2cda1d..684842aa09 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.data.orc.GenericOrcWriters;
import org.apache.iceberg.flink.FlinkRowData;
@@ -93,7 +94,19 @@ class FlinkOrcWriters {
}
static OrcValueWriter<RowData> struct(List<OrcValueWriter<?>> writers,
List<LogicalType> types) {
- return new RowDataWriter(writers, types);
+ int[] fieldIndexes = new int[writers.size()];
+ int fieldIndex = 0;
+ List<LogicalType> logicalTypes = Lists.newArrayList();
+ for (int i = 0; i < types.size(); i += 1) {
+ LogicalType logicalType = types.get(i);
+ if (!logicalType.is(LogicalTypeRoot.NULL)) {
+ fieldIndexes[fieldIndex] = i;
+ fieldIndex += 1;
+ logicalTypes.add(logicalType);
+ }
+ }
+
+ return new RowDataWriter(fieldIndexes, writers, logicalTypes);
}
private static class StringWriter implements OrcValueWriter<StringData> {
@@ -294,12 +307,12 @@ class FlinkOrcWriters {
static class RowDataWriter extends GenericOrcWriters.StructWriter<RowData> {
private final List<RowData.FieldGetter> fieldGetters;
- RowDataWriter(List<OrcValueWriter<?>> writers, List<LogicalType> types) {
+ RowDataWriter(int[] fieldIndexes, List<OrcValueWriter<?>> writers,
List<LogicalType> types) {
super(writers);
this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size());
for (int i = 0; i < types.size(); i++) {
- fieldGetters.add(FlinkRowData.createFieldGetter(types.get(i), i));
+ fieldGetters.add(FlinkRowData.createFieldGetter(types.get(i),
fieldIndexes[i]));
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java
index ba4e1a7a7a..1440fde324 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java
@@ -106,7 +106,9 @@ abstract class FlinkSchemaVisitor<T> {
visitor.beforeField(iField);
try {
- results.add(visit(fieldFlinkType, iField.type(), visitor));
+ if (iField.type() != Types.UnknownType.get()) {
+ results.add(visit(fieldFlinkType, iField.type(), visitor));
+ }
} finally {
visitor.afterField(iField);
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUnknownType.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUnknownType.java
new file mode 100644
index 0000000000..aaf0ebda0e
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUnknownType.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import static org.apache.iceberg.flink.TestFixtures.DATABASE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderHelper;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.reader.ReaderUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestFlinkUnknownType {
+ private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
+ private static final Schema SCHEMA_WITH_UNKNOWN_COL =
+ new Schema(
+ Lists.newArrayList(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "unknown_col",
Types.UnknownType.get()),
+ Types.NestedField.optional(4, "data1", Types.StringType.get())));
+ private static final List<GenericRowData> EXCEPTED_ROW_DATA =
+ Lists.newArrayList(
+ GenericRowData.of(1, StringData.fromString("data"), null,
StringData.fromString("data1")),
+ GenericRowData.of(
+ 2, StringData.fromString("data"), null,
StringData.fromString("data1")));
+ private static final List<Record> EXPECTED_RECORDS = exceptedRecords();
+
+ @RegisterExtension
+ private static final HadoopCatalogExtension CATALOG_EXTENSION =
+ new HadoopCatalogExtension(DATABASE, TestFixtures.TABLE);
+
+ @TempDir private Path warehouseDir;
+
+ @Parameter private FileFormat fileFormat;
+
+ private Table table;
+
+ @Parameters(name = "fileFormat={0}")
+ public static Iterable<Object[]> parameters() {
+ return ImmutableList.of(
+ new Object[] {FileFormat.PARQUET},
+ new Object[] {FileFormat.AVRO},
+ new Object[] {FileFormat.ORC});
+ }
+
+ @BeforeEach
+ public void before() {
+ table =
+ CATALOG_EXTENSION
+ .catalog()
+ .createTable(
+ TestFixtures.TABLE_IDENTIFIER,
+ SCHEMA_WITH_UNKNOWN_COL,
+ PartitionSpec.unpartitioned(),
+ null,
+ ImmutableMap.of("format-version", "3", "write.format.default",
fileFormat.name()));
+ }
+
+ @TestTemplate
+ void testV3TableUnknownTypeRead() throws Exception {
+ new GenericAppenderHelper(table, fileFormat,
warehouseDir).appendToTable(EXPECTED_RECORDS);
+ table.refresh();
+
+ List<GenericRowData> genericRowData = Lists.newArrayList();
+ CloseableIterable<CombinedScanTask> combinedScanTasks =
table.newScan().planTasks();
+ for (CombinedScanTask combinedScanTask : combinedScanTasks) {
+ DataIterator<RowData> dataIterator =
+ ReaderUtil.createDataIterator(combinedScanTask, table.schema(),
table.schema());
+ while (dataIterator.hasNext()) {
+ GenericRowData rowData = (GenericRowData) dataIterator.next();
+ genericRowData.add(
+ GenericRowData.of(
+ rowData.getInt(0),
+ rowData.getString(1),
+ rowData.getField(2),
+ rowData.getString(3)));
+ }
+ }
+
+
assertThat(genericRowData).containsExactlyInAnyOrderElementsOf(EXCEPTED_ROW_DATA);
+ }
+
+ @TestTemplate
+ void testV3TableUnknownTypeWrite() throws Exception {
+ try (TaskWriter<RowData> taskWriter = createTaskWriter()) {
+ for (GenericRowData rowData : EXCEPTED_ROW_DATA) {
+ taskWriter.write(rowData);
+ }
+
+ taskWriter.close();
+ AppendFiles appendFiles = table.newAppend();
+ for (DataFile dataFile : taskWriter.dataFiles()) {
+ appendFiles.appendFile(dataFile);
+ }
+
+ appendFiles.commit();
+ List<Record> records = SimpleDataUtil.tableRecords(table);
+
assertThat(records).containsExactlyInAnyOrderElementsOf(exceptedRecords());
+ }
+ }
+
+ private TaskWriter<RowData> createTaskWriter() {
+ RowType flinkWriteType = FlinkSchemaUtil.convert(table.schema());
+ TaskWriterFactory<RowData> taskWriterFactory =
+ new RowDataTaskWriterFactory(
+ table, flinkWriteType, TARGET_FILE_SIZE, fileFormat,
table.properties(), null, false);
+ taskWriterFactory.initialize(1, 1);
+ return taskWriterFactory.create();
+ }
+
+ private static List<Record> exceptedRecords() {
+ GenericRecord record = GenericRecord.create(SCHEMA_WITH_UNKNOWN_COL);
+ ImmutableList.Builder<Record> builder = ImmutableList.builder();
+ EXCEPTED_ROW_DATA.forEach(
+ recordData -> {
+ GenericRecord copy = record.copy();
+ for (int i = 0; i < recordData.getArity(); i++) {
+ Object field = recordData.getField(i);
+ if (field instanceof StringData) {
+ copy.set(i, recordData.getField(i).toString());
+ } else {
+ copy.set(i, recordData.getField(i));
+ }
+ }
+
+ builder.add(copy);
+ });
+
+ return builder.build();
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
index 3322447329..3b094ba022 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.CombinedScanTask;
@@ -78,11 +79,16 @@ public class ReaderUtil {
}
public static DataIterator<RowData> createDataIterator(CombinedScanTask
combinedTask) {
+ return createDataIterator(combinedTask, TestFixtures.SCHEMA,
TestFixtures.SCHEMA);
+ }
+
+ public static DataIterator<RowData> createDataIterator(
+ CombinedScanTask combinedTask, Schema tableSchema, Schema projectSchema)
{
return new DataIterator<>(
new RowDataFileScanTaskReader(
- TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true,
Collections.emptyList()),
+ tableSchema, projectSchema, null, true, Collections.emptyList()),
combinedTask,
- new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
+ new HadoopFileIO(new Configuration()),
PlaintextEncryptionManager.instance());
}