This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9b8bde411f ORC: Backport add _row_id and _last_updated_sequence_number
raeder in Orc to support lineage (#16256)
9b8bde411f is described below
commit 9b8bde411f81af88fb87457ebe3b78c62faee2eb
Author: pvary <[email protected]>
AuthorDate: Fri May 8 20:29:35 2026 +0200
ORC: Backport add _row_id and _last_updated_sequence_number raeder in Orc
to support lineage (#16256)
backports #15776
---
.../apache/iceberg/flink/data/FlinkOrcReader.java | 2 +-
.../apache/iceberg/flink/data/FlinkOrcReaders.java | 15 +++++--
.../maintenance/api/TestRewriteDataFiles.java | 51 +++++++++++++---------
.../maintenance/operator/OperatorTestBase.java | 35 ++++++++++++---
.../apache/iceberg/flink/data/FlinkOrcReader.java | 2 +-
.../apache/iceberg/flink/data/FlinkOrcReaders.java | 15 +++++--
.../maintenance/api/TestRewriteDataFiles.java | 51 +++++++++++++---------
.../maintenance/operator/OperatorTestBase.java | 35 ++++++++++++---
8 files changed, 144 insertions(+), 62 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 3e3a29112c..77f16bfdb2 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -70,7 +70,7 @@ public class FlinkOrcReader implements OrcRowReader<RowData> {
TypeDescription record,
List<String> names,
List<OrcValueReader<?>> fields) {
- return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
+ return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
}
@Override
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
index 7a4a15c7e6..c5c958fbdb 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
@@ -39,6 +39,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
@@ -91,8 +92,11 @@ class FlinkOrcReaders {
}
public static OrcValueReader<RowData> struct(
- List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer,
?> idToConstant) {
- return new StructReader(readers, struct, idToConstant);
+ TypeDescription record,
+ List<OrcValueReader<?>> readers,
+ Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ return new StructReader(record, readers, struct, idToConstant);
}
private static class StringReader implements OrcValueReader<StringData> {
@@ -265,8 +269,11 @@ class FlinkOrcReaders {
private final int numFields;
StructReader(
- List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer,
?> idToConstant) {
- super(readers, struct, idToConstant);
+ TypeDescription record,
+ List<OrcValueReader<?>> readers,
+ Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ super(record, readers, struct, idToConstant);
this.numFields = struct.fields().size();
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 97b8b67865..88b949a9a7 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,7 @@ import java.time.Instant;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
@@ -44,8 +45,14 @@ import
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTe
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
class TestRewriteDataFiles extends MaintenanceTaskTestBase {
+
+ private static final FileFormat[] FILE_FORMATS =
+ new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
@Test
void testRewriteUnpartitioned() throws Exception {
Table table = createTable();
@@ -83,13 +90,14 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
createRecord(4, "d")));
}
- @Test
- void testRewriteUnpartitionedPreserveLineage() throws Exception {
- Table table = createTable(3);
- insert(table, 1, "a");
- insert(table, 2, "b");
- insert(table, 3, "c");
- insert(table, 4, "d");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createTable(3, fileFormat);
+ insert(table, 1, "a", fileFormat);
+ insert(table, 2, "b", fileFormat);
+ insert(table, 3, "c", fileFormat);
+ insert(table, 4, "d", fileFormat);
assertFileNum(table, 4, 0);
@@ -123,15 +131,17 @@ class TestRewriteDataFiles extends
MaintenanceTaskTestBase {
schema);
}
- @Test
- void testRewriteTheSameFilePreserveLineage() throws Exception {
- Table table = createTable(3);
- insert(table, 1, "a");
- insert(table, 2, "b");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createTable(3, fileFormat);
+ insert(table, 1, "a", fileFormat);
+ insert(table, 2, "b", fileFormat);
// Create a file with two lines of data to verify that the rowid is read
correctly.
insert(
table,
- ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")));
+ ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")),
+ fileFormat);
assertFileNum(table, 3, 0);
@@ -167,13 +177,14 @@ class TestRewriteDataFiles extends
MaintenanceTaskTestBase {
schema);
}
- @Test
- void testRewritePartitionedPreserveLineage() throws Exception {
- Table table = createPartitionedTable(3);
- insertPartitioned(table, 1, "p1");
- insertPartitioned(table, 2, "p1");
- insertPartitioned(table, 3, "p2");
- insertPartitioned(table, 4, "p2");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createPartitionedTable(3, fileFormat);
+ insertPartitioned(table, 1, "p1", fileFormat);
+ insertPartitioned(table, 2, "p1", fileFormat);
+ insertPartitioned(table, 3, "p2", fileFormat);
+ insertPartitioned(table, 4, "p2", fileFormat);
assertFileNum(table, 4, 0);
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 93291e8cc2..06ab7861c0 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -133,10 +133,14 @@ public class OperatorTestBase {
}
protected static Table createTable() {
- return createTable(2);
+ return createTable(2, FileFormat.PARQUET);
}
protected static Table createTable(int formatVersion) {
+ return createPartitionedTable(formatVersion, FileFormat.PARQUET);
+ }
+
+ protected static Table createTable(int formatVersion, FileFormat fileFormat)
{
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -145,6 +149,8 @@ public class OperatorTestBase {
PartitionSpec.unpartitioned(),
null,
ImmutableMap.of(
+ "write.format.default",
+ fileFormat.name(),
TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
@@ -182,7 +188,7 @@ public class OperatorTestBase {
"format-version", String.valueOf(formatVersion),
"write.upsert.enabled", "true"));
}
- protected static Table createPartitionedTable(int formatVersion) {
+ protected static Table createPartitionedTable(int formatVersion, FileFormat
fileFormat) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -191,6 +197,8 @@ public class OperatorTestBase {
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
null,
ImmutableMap.of(
+ "write.format.default",
+ fileFormat.name(),
"format-version",
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
@@ -198,17 +206,27 @@ public class OperatorTestBase {
}
protected static Table createPartitionedTable() {
- return createPartitionedTable(2);
+ return createPartitionedTable(2, FileFormat.PARQUET);
}
protected void insert(Table table, Integer id, String data) throws
IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ insert(table, id, data, FileFormat.PARQUET);
+ }
+
+ protected void insert(Table table, Integer id, String data, FileFormat
fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id,
data)));
table.refresh();
}
protected void insert(Table table, List<Record> records) throws IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET,
warehouseDir).appendToTable(records);
+ insert(table, records, FileFormat.PARQUET);
+ }
+
+ protected void insert(Table table, List<Record> records, FileFormat
fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat,
warehouseDir).appendToTable(records);
table.refresh();
}
@@ -309,7 +327,12 @@ public class OperatorTestBase {
}
protected void insertPartitioned(Table table, Integer id, String data)
throws IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ insertPartitioned(table, id, data, FileFormat.PARQUET);
+ }
+
+ protected void insertPartitioned(Table table, Integer id, String data,
FileFormat fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(
TestHelpers.Row.of(data),
Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
table.refresh();
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 3e3a29112c..77f16bfdb2 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -70,7 +70,7 @@ public class FlinkOrcReader implements OrcRowReader<RowData> {
TypeDescription record,
List<String> names,
List<OrcValueReader<?>> fields) {
- return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
+ return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
}
@Override
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
index 7a4a15c7e6..c5c958fbdb 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
@@ -39,6 +39,7 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
@@ -91,8 +92,11 @@ class FlinkOrcReaders {
}
public static OrcValueReader<RowData> struct(
- List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer,
?> idToConstant) {
- return new StructReader(readers, struct, idToConstant);
+ TypeDescription record,
+ List<OrcValueReader<?>> readers,
+ Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ return new StructReader(record, readers, struct, idToConstant);
}
private static class StringReader implements OrcValueReader<StringData> {
@@ -265,8 +269,11 @@ class FlinkOrcReaders {
private final int numFields;
StructReader(
- List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer,
?> idToConstant) {
- super(readers, struct, idToConstant);
+ TypeDescription record,
+ List<OrcValueReader<?>> readers,
+ Types.StructType struct,
+ Map<Integer, ?> idToConstant) {
+ super(record, readers, struct, idToConstant);
this.numFields = struct.fields().size();
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 97b8b67865..88b949a9a7 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,7 @@ import java.time.Instant;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
@@ -44,8 +45,14 @@ import
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTe
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
class TestRewriteDataFiles extends MaintenanceTaskTestBase {
+
+ private static final FileFormat[] FILE_FORMATS =
+ new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
@Test
void testRewriteUnpartitioned() throws Exception {
Table table = createTable();
@@ -83,13 +90,14 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
createRecord(4, "d")));
}
- @Test
- void testRewriteUnpartitionedPreserveLineage() throws Exception {
- Table table = createTable(3);
- insert(table, 1, "a");
- insert(table, 2, "b");
- insert(table, 3, "c");
- insert(table, 4, "d");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createTable(3, fileFormat);
+ insert(table, 1, "a", fileFormat);
+ insert(table, 2, "b", fileFormat);
+ insert(table, 3, "c", fileFormat);
+ insert(table, 4, "d", fileFormat);
assertFileNum(table, 4, 0);
@@ -123,15 +131,17 @@ class TestRewriteDataFiles extends
MaintenanceTaskTestBase {
schema);
}
- @Test
- void testRewriteTheSameFilePreserveLineage() throws Exception {
- Table table = createTable(3);
- insert(table, 1, "a");
- insert(table, 2, "b");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createTable(3, fileFormat);
+ insert(table, 1, "a", fileFormat);
+ insert(table, 2, "b", fileFormat);
// Create a file with two lines of data to verify that the rowid is read
correctly.
insert(
table,
- ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")));
+ ImmutableList.of(SimpleDataUtil.createRecord(3, "c"),
SimpleDataUtil.createRecord(4, "d")),
+ fileFormat);
assertFileNum(table, 3, 0);
@@ -167,13 +177,14 @@ class TestRewriteDataFiles extends
MaintenanceTaskTestBase {
schema);
}
- @Test
- void testRewritePartitionedPreserveLineage() throws Exception {
- Table table = createPartitionedTable(3);
- insertPartitioned(table, 1, "p1");
- insertPartitioned(table, 2, "p1");
- insertPartitioned(table, 3, "p2");
- insertPartitioned(table, 4, "p2");
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws
Exception {
+ Table table = createPartitionedTable(3, fileFormat);
+ insertPartitioned(table, 1, "p1", fileFormat);
+ insertPartitioned(table, 2, "p1", fileFormat);
+ insertPartitioned(table, 3, "p2", fileFormat);
+ insertPartitioned(table, 4, "p2", fileFormat);
assertFileNum(table, 4, 0);
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 6dd6cda84f..d6563e782e 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -133,10 +133,14 @@ public class OperatorTestBase {
}
protected static Table createTable() {
- return createTable(2);
+ return createTable(2, FileFormat.PARQUET);
}
protected static Table createTable(int formatVersion) {
+ return createPartitionedTable(formatVersion, FileFormat.PARQUET);
+ }
+
+ protected static Table createTable(int formatVersion, FileFormat fileFormat)
{
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -145,6 +149,8 @@ public class OperatorTestBase {
PartitionSpec.unpartitioned(),
null,
ImmutableMap.of(
+ "write.format.default",
+ fileFormat.name(),
TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
@@ -182,7 +188,7 @@ public class OperatorTestBase {
"format-version", String.valueOf(formatVersion),
"write.upsert.enabled", "true"));
}
- protected static Table createPartitionedTable(int formatVersion) {
+ protected static Table createPartitionedTable(int formatVersion, FileFormat
fileFormat) {
return CATALOG_EXTENSION
.catalog()
.createTable(
@@ -191,6 +197,8 @@ public class OperatorTestBase {
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
null,
ImmutableMap.of(
+ "write.format.default",
+ fileFormat.name(),
"format-version",
String.valueOf(formatVersion),
"flink.max-continuous-empty-commits",
@@ -198,17 +206,27 @@ public class OperatorTestBase {
}
protected static Table createPartitionedTable() {
- return createPartitionedTable(2);
+ return createPartitionedTable(2, FileFormat.PARQUET);
}
protected void insert(Table table, Integer id, String data) throws
IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ insert(table, id, data, FileFormat.PARQUET);
+ }
+
+ protected void insert(Table table, Integer id, String data, FileFormat
fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id,
data)));
table.refresh();
}
protected void insert(Table table, List<Record> records) throws IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET,
warehouseDir).appendToTable(records);
+ insert(table, records, FileFormat.PARQUET);
+ }
+
+ protected void insert(Table table, List<Record> records, FileFormat
fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat,
warehouseDir).appendToTable(records);
table.refresh();
}
@@ -309,7 +327,12 @@ public class OperatorTestBase {
}
protected void insertPartitioned(Table table, Integer id, String data)
throws IOException {
- new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+ insertPartitioned(table, id, data, FileFormat.PARQUET);
+ }
+
+ protected void insertPartitioned(Table table, Integer id, String data,
FileFormat fileFormat)
+ throws IOException {
+ new GenericAppenderHelper(table, fileFormat, warehouseDir)
.appendToTable(
TestHelpers.Row.of(data),
Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
table.refresh();