This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b8bde411f ORC: Backport add _row_id and _last_updated_sequence_number 
raeder in Orc to support lineage (#16256)
9b8bde411f is described below

commit 9b8bde411f81af88fb87457ebe3b78c62faee2eb
Author: pvary <[email protected]>
AuthorDate: Fri May 8 20:29:35 2026 +0200

    ORC: Backport add _row_id and _last_updated_sequence_number raeder in Orc 
to support lineage (#16256)
    
    backports #15776
---
 .../apache/iceberg/flink/data/FlinkOrcReader.java  |  2 +-
 .../apache/iceberg/flink/data/FlinkOrcReaders.java | 15 +++++--
 .../maintenance/api/TestRewriteDataFiles.java      | 51 +++++++++++++---------
 .../maintenance/operator/OperatorTestBase.java     | 35 ++++++++++++---
 .../apache/iceberg/flink/data/FlinkOrcReader.java  |  2 +-
 .../apache/iceberg/flink/data/FlinkOrcReaders.java | 15 +++++--
 .../maintenance/api/TestRewriteDataFiles.java      | 51 +++++++++++++---------
 .../maintenance/operator/OperatorTestBase.java     | 35 ++++++++++++---
 8 files changed, 144 insertions(+), 62 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 3e3a29112c..77f16bfdb2 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -70,7 +70,7 @@ public class FlinkOrcReader implements OrcRowReader<RowData> {
         TypeDescription record,
         List<String> names,
         List<OrcValueReader<?>> fields) {
-      return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
+      return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
     }
 
     @Override
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
index 7a4a15c7e6..c5c958fbdb 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
@@ -39,6 +39,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
 import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
@@ -91,8 +92,11 @@ class FlinkOrcReaders {
   }
 
   public static OrcValueReader<RowData> struct(
-      List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, 
?> idToConstant) {
-    return new StructReader(readers, struct, idToConstant);
+      TypeDescription record,
+      List<OrcValueReader<?>> readers,
+      Types.StructType struct,
+      Map<Integer, ?> idToConstant) {
+    return new StructReader(record, readers, struct, idToConstant);
   }
 
   private static class StringReader implements OrcValueReader<StringData> {
@@ -265,8 +269,11 @@ class FlinkOrcReaders {
     private final int numFields;
 
     StructReader(
-        List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, 
?> idToConstant) {
-      super(readers, struct, idToConstant);
+        TypeDescription record,
+        List<OrcValueReader<?>> readers,
+        Types.StructType struct,
+        Map<Integer, ?> idToConstant) {
+      super(record, readers, struct, idToConstant);
       this.numFields = struct.fields().size();
     }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 97b8b67865..88b949a9a7 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.stream.StreamSupport;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
@@ -44,8 +45,14 @@ import 
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTe
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
 
 class TestRewriteDataFiles extends MaintenanceTaskTestBase {
+
+  private static final FileFormat[] FILE_FORMATS =
+      new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
   @Test
   void testRewriteUnpartitioned() throws Exception {
     Table table = createTable();
@@ -83,13 +90,14 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             createRecord(4, "d")));
   }
 
-  @Test
-  void testRewriteUnpartitionedPreserveLineage() throws Exception {
-    Table table = createTable(3);
-    insert(table, 1, "a");
-    insert(table, 2, "b");
-    insert(table, 3, "c");
-    insert(table, 4, "d");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createTable(3, fileFormat);
+    insert(table, 1, "a", fileFormat);
+    insert(table, 2, "b", fileFormat);
+    insert(table, 3, "c", fileFormat);
+    insert(table, 4, "d", fileFormat);
 
     assertFileNum(table, 4, 0);
 
@@ -123,15 +131,17 @@ class TestRewriteDataFiles extends 
MaintenanceTaskTestBase {
         schema);
   }
 
-  @Test
-  void testRewriteTheSameFilePreserveLineage() throws Exception {
-    Table table = createTable(3);
-    insert(table, 1, "a");
-    insert(table, 2, "b");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createTable(3, fileFormat);
+    insert(table, 1, "a", fileFormat);
+    insert(table, 2, "b", fileFormat);
     // Create a file with two lines of data to verify that the rowid is read 
correctly.
     insert(
         table,
-        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")));
+        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")),
+        fileFormat);
 
     assertFileNum(table, 3, 0);
 
@@ -167,13 +177,14 @@ class TestRewriteDataFiles extends 
MaintenanceTaskTestBase {
         schema);
   }
 
-  @Test
-  void testRewritePartitionedPreserveLineage() throws Exception {
-    Table table = createPartitionedTable(3);
-    insertPartitioned(table, 1, "p1");
-    insertPartitioned(table, 2, "p1");
-    insertPartitioned(table, 3, "p2");
-    insertPartitioned(table, 4, "p2");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createPartitionedTable(3, fileFormat);
+    insertPartitioned(table, 1, "p1", fileFormat);
+    insertPartitioned(table, 2, "p1", fileFormat);
+    insertPartitioned(table, 3, "p2", fileFormat);
+    insertPartitioned(table, 4, "p2", fileFormat);
 
     assertFileNum(table, 4, 0);
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 93291e8cc2..06ab7861c0 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -133,10 +133,14 @@ public class OperatorTestBase {
   }
 
   protected static Table createTable() {
-    return createTable(2);
+    return createTable(2, FileFormat.PARQUET);
   }
 
   protected static Table createTable(int formatVersion) {
+    return createPartitionedTable(formatVersion, FileFormat.PARQUET);
+  }
+
+  protected static Table createTable(int formatVersion, FileFormat fileFormat) 
{
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -145,6 +149,8 @@ public class OperatorTestBase {
             PartitionSpec.unpartitioned(),
             null,
             ImmutableMap.of(
+                "write.format.default",
+                fileFormat.name(),
                 TableProperties.FORMAT_VERSION,
                 String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
@@ -182,7 +188,7 @@ public class OperatorTestBase {
                 "format-version", String.valueOf(formatVersion), 
"write.upsert.enabled", "true"));
   }
 
-  protected static Table createPartitionedTable(int formatVersion) {
+  protected static Table createPartitionedTable(int formatVersion, FileFormat 
fileFormat) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -191,6 +197,8 @@ public class OperatorTestBase {
             
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
             null,
             ImmutableMap.of(
+                "write.format.default",
+                fileFormat.name(),
                 "format-version",
                 String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
@@ -198,17 +206,27 @@ public class OperatorTestBase {
   }
 
   protected static Table createPartitionedTable() {
-    return createPartitionedTable(2);
+    return createPartitionedTable(2, FileFormat.PARQUET);
   }
 
   protected void insert(Table table, Integer id, String data) throws 
IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+    insert(table, id, data, FileFormat.PARQUET);
+  }
+
+  protected void insert(Table table, Integer id, String data, FileFormat 
fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, warehouseDir)
         .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data)));
     table.refresh();
   }
 
   protected void insert(Table table, List<Record> records) throws IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, 
warehouseDir).appendToTable(records);
+    insert(table, records, FileFormat.PARQUET);
+  }
+
+  protected void insert(Table table, List<Record> records, FileFormat 
fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, 
warehouseDir).appendToTable(records);
     table.refresh();
   }
 
@@ -309,7 +327,12 @@ public class OperatorTestBase {
   }
 
   protected void insertPartitioned(Table table, Integer id, String data) 
throws IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+    insertPartitioned(table, id, data, FileFormat.PARQUET);
+  }
+
+  protected void insertPartitioned(Table table, Integer id, String data, 
FileFormat fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, warehouseDir)
         .appendToTable(
             TestHelpers.Row.of(data), 
Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
     table.refresh();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
index 3e3a29112c..77f16bfdb2 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java
@@ -70,7 +70,7 @@ public class FlinkOrcReader implements OrcRowReader<RowData> {
         TypeDescription record,
         List<String> names,
         List<OrcValueReader<?>> fields) {
-      return FlinkOrcReaders.struct(fields, iStruct, idToConstant);
+      return FlinkOrcReaders.struct(record, fields, iStruct, idToConstant);
     }
 
     @Override
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
index 7a4a15c7e6..c5c958fbdb 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java
@@ -39,6 +39,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
+import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
 import org.apache.orc.storage.ql.exec.vector.ColumnVector;
 import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
@@ -91,8 +92,11 @@ class FlinkOrcReaders {
   }
 
   public static OrcValueReader<RowData> struct(
-      List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, 
?> idToConstant) {
-    return new StructReader(readers, struct, idToConstant);
+      TypeDescription record,
+      List<OrcValueReader<?>> readers,
+      Types.StructType struct,
+      Map<Integer, ?> idToConstant) {
+    return new StructReader(record, readers, struct, idToConstant);
   }
 
   private static class StringReader implements OrcValueReader<StringData> {
@@ -265,8 +269,11 @@ class FlinkOrcReaders {
     private final int numFields;
 
     StructReader(
-        List<OrcValueReader<?>> readers, Types.StructType struct, Map<Integer, 
?> idToConstant) {
-      super(readers, struct, idToConstant);
+        TypeDescription record,
+        List<OrcValueReader<?>> readers,
+        Types.StructType struct,
+        Map<Integer, ?> idToConstant) {
+      super(record, readers, struct, idToConstant);
       this.numFields = struct.fields().size();
     }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 97b8b67865..88b949a9a7 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.stream.StreamSupport;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
@@ -44,8 +45,14 @@ import 
org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTe
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
 
 class TestRewriteDataFiles extends MaintenanceTaskTestBase {
+
+  private static final FileFormat[] FILE_FORMATS =
+      new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
   @Test
   void testRewriteUnpartitioned() throws Exception {
     Table table = createTable();
@@ -83,13 +90,14 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             createRecord(4, "d")));
   }
 
-  @Test
-  void testRewriteUnpartitionedPreserveLineage() throws Exception {
-    Table table = createTable(3);
-    insert(table, 1, "a");
-    insert(table, 2, "b");
-    insert(table, 3, "c");
-    insert(table, 4, "d");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewriteUnpartitionedPreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createTable(3, fileFormat);
+    insert(table, 1, "a", fileFormat);
+    insert(table, 2, "b", fileFormat);
+    insert(table, 3, "c", fileFormat);
+    insert(table, 4, "d", fileFormat);
 
     assertFileNum(table, 4, 0);
 
@@ -123,15 +131,17 @@ class TestRewriteDataFiles extends 
MaintenanceTaskTestBase {
         schema);
   }
 
-  @Test
-  void testRewriteTheSameFilePreserveLineage() throws Exception {
-    Table table = createTable(3);
-    insert(table, 1, "a");
-    insert(table, 2, "b");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewriteTheSameFilePreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createTable(3, fileFormat);
+    insert(table, 1, "a", fileFormat);
+    insert(table, 2, "b", fileFormat);
     // Create a file with two lines of data to verify that the rowid is read 
correctly.
     insert(
         table,
-        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")));
+        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")),
+        fileFormat);
 
     assertFileNum(table, 3, 0);
 
@@ -167,13 +177,14 @@ class TestRewriteDataFiles extends 
MaintenanceTaskTestBase {
         schema);
   }
 
-  @Test
-  void testRewritePartitionedPreserveLineage() throws Exception {
-    Table table = createPartitionedTable(3);
-    insertPartitioned(table, 1, "p1");
-    insertPartitioned(table, 2, "p1");
-    insertPartitioned(table, 3, "p2");
-    insertPartitioned(table, 4, "p2");
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  void testRewritePartitionedPreserveLineage(FileFormat fileFormat) throws 
Exception {
+    Table table = createPartitionedTable(3, fileFormat);
+    insertPartitioned(table, 1, "p1", fileFormat);
+    insertPartitioned(table, 2, "p1", fileFormat);
+    insertPartitioned(table, 3, "p2", fileFormat);
+    insertPartitioned(table, 4, "p2", fileFormat);
 
     assertFileNum(table, 4, 0);
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 6dd6cda84f..d6563e782e 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -133,10 +133,14 @@ public class OperatorTestBase {
   }
 
   protected static Table createTable() {
-    return createTable(2);
+    return createTable(2, FileFormat.PARQUET);
   }
 
   protected static Table createTable(int formatVersion) {
+    return createPartitionedTable(formatVersion, FileFormat.PARQUET);
+  }
+
+  protected static Table createTable(int formatVersion, FileFormat fileFormat) 
{
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -145,6 +149,8 @@ public class OperatorTestBase {
             PartitionSpec.unpartitioned(),
             null,
             ImmutableMap.of(
+                "write.format.default",
+                fileFormat.name(),
                 TableProperties.FORMAT_VERSION,
                 String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
@@ -182,7 +188,7 @@ public class OperatorTestBase {
                 "format-version", String.valueOf(formatVersion), 
"write.upsert.enabled", "true"));
   }
 
-  protected static Table createPartitionedTable(int formatVersion) {
+  protected static Table createPartitionedTable(int formatVersion, FileFormat 
fileFormat) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -191,6 +197,8 @@ public class OperatorTestBase {
             
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
             null,
             ImmutableMap.of(
+                "write.format.default",
+                fileFormat.name(),
                 "format-version",
                 String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
@@ -198,17 +206,27 @@ public class OperatorTestBase {
   }
 
   protected static Table createPartitionedTable() {
-    return createPartitionedTable(2);
+    return createPartitionedTable(2, FileFormat.PARQUET);
   }
 
   protected void insert(Table table, Integer id, String data) throws 
IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+    insert(table, id, data, FileFormat.PARQUET);
+  }
+
+  protected void insert(Table table, Integer id, String data, FileFormat 
fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, warehouseDir)
         .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data)));
     table.refresh();
   }
 
   protected void insert(Table table, List<Record> records) throws IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, 
warehouseDir).appendToTable(records);
+    insert(table, records, FileFormat.PARQUET);
+  }
+
+  protected void insert(Table table, List<Record> records, FileFormat 
fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, 
warehouseDir).appendToTable(records);
     table.refresh();
   }
 
@@ -309,7 +327,12 @@ public class OperatorTestBase {
   }
 
   protected void insertPartitioned(Table table, Integer id, String data) 
throws IOException {
-    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+    insertPartitioned(table, id, data, FileFormat.PARQUET);
+  }
+
+  protected void insertPartitioned(Table table, Integer id, String data, 
FileFormat fileFormat)
+      throws IOException {
+    new GenericAppenderHelper(table, fileFormat, warehouseDir)
         .appendToTable(
             TestHelpers.Row.of(data), 
Lists.newArrayList(SimpleDataUtil.createRecord(id, data)));
     table.refresh();

Reply via email to