This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b7286cf0f Flink: Backport Preserve row lineage in RewriteDataFiles to 
Flink 2.1 and 1.20 (#14520)
4b7286cf0f is described below

commit 4b7286cf0f0e8ce48104835ba747941883ac1570
Author: GuoYu <[email protected]>
AuthorDate: Thu Nov 6 21:27:51 2025 +0800

    Flink: Backport Preserve row lineage in RewriteDataFiles to Flink 2.1 and 
1.20 (#14520)
---
 .../operator/DataFileRewritePlanner.java           |   6 --
 .../operator/DataFileRewriteRunner.java            |  34 ++++--
 .../flink/source/RowDataFileScanTaskReader.java    |   9 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  30 +++++-
 .../maintenance/api/TestRewriteDataFiles.java      | 115 +++++++++++++++++++++
 .../maintenance/operator/OperatorTestBase.java     |  76 ++++++++++++--
 .../operator/TestDataFileRewritePlanner.java       |  13 ---
 .../operator/TestDataFileRewriteRunner.java        |  20 ++++
 .../operator/DataFileRewritePlanner.java           |   6 --
 .../operator/DataFileRewriteRunner.java            |  34 ++++--
 .../flink/source/RowDataFileScanTaskReader.java    |   9 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  30 +++++-
 .../maintenance/api/TestRewriteDataFiles.java      | 115 +++++++++++++++++++++
 .../maintenance/operator/OperatorTestBase.java     |  76 ++++++++++++--
 .../operator/TestDataFileRewritePlanner.java       |  13 ---
 .../operator/TestDataFileRewriteRunner.java        |  20 ++++
 16 files changed, 512 insertions(+), 94 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 5403dfe19a..6751caeb28 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,8 +29,6 @@ import org.apache.flink.util.Collector;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.FileRewritePlan;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -93,10 +91,6 @@ public class DataFileRewritePlanner
   @Override
   public void open(Configuration parameters) throws Exception {
     tableLoader.open();
-    Table table = tableLoader.loadTable();
-    Preconditions.checkArgument(
-        !TableUtil.supportsRowLineage(table),
-        "Flink does not support compaction on row lineage enabled tables 
(V3+)");
     this.errorCounter =
         TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
             .counter(TableMaintenanceMetrics.ERROR_COUNTER);
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index c03b5cc1c8..1e8db128e9 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -28,11 +28,15 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.actions.RewriteFileGroup;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
@@ -108,8 +112,10 @@ public class DataFileRewriteRunner
           value.group().rewrittenFiles().size());
     }
 
-    try (TaskWriter<RowData> writer = writerFor(value)) {
-      try (DataIterator<RowData> iterator = readerFor(value)) {
+    boolean preserveRowId = TableUtil.supportsRowLineage(value.table());
+
+    try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
+      try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
         while (iterator.hasNext()) {
           writer.write(iterator.next());
         }
@@ -169,30 +175,42 @@ public class DataFileRewriteRunner
     }
   }
 
-  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+  private TaskWriter<RowData> writerFor(PlannedGroup value, boolean 
preserveRowId) {
     String formatString =
         PropertyUtil.propertyAsString(
             value.table().properties(),
             TableProperties.DEFAULT_FILE_FORMAT,
             TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    Schema writeSchema =
+        preserveRowId
+            ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+            : value.table().schema();
+    RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
     RowDataTaskWriterFactory factory =
         new RowDataTaskWriterFactory(
-            value.table(),
-            FlinkSchemaUtil.convert(value.table().schema()),
+            value::table,
+            flinkWriteType,
             value.group().inputSplitSize(),
             FileFormat.fromString(formatString),
             value.table().properties(),
             null,
-            false);
+            false,
+            writeSchema,
+            value.table().spec());
     factory.initialize(subTaskId, attemptId);
     return factory.create();
   }
 
-  private DataIterator<RowData> readerFor(PlannedGroup value) {
+  private DataIterator<RowData> readerFor(PlannedGroup value, boolean 
preserveRowId) {
+    Schema projectedSchema =
+        preserveRowId
+            ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+            : value.table().schema();
+
     RowDataFileScanTaskReader reader =
         new RowDataFileScanTaskReader(
             value.table().schema(),
-            value.table().schema(),
+            projectedSchema,
             PropertyUtil.propertyAsString(value.table().properties(), 
DEFAULT_NAME_MAPPING, null),
             false,
             Collections.emptyList());
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index bf6f72cc28..b8fb1ba32e 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PartitionUtil;
@@ -84,13 +83,7 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
   @Override
   public CloseableIterator<RowData> open(
       FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
-    Schema partitionSchema = TypeUtil.select(projectedSchema, 
task.spec().identitySourceIds());
-
-    Map<Integer, ?> idToConstant =
-        partitionSchema.columns().isEmpty()
-            ? ImmutableMap.of()
-            : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
-
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, 
RowDataUtil::convertConstant);
     FlinkDeleteFilter deletes =
         new FlinkDeleteFilter(task, tableSchema, projectedSchema, 
inputFilesDecryptor);
     CloseableIterable<RowData> iterable =
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index d3146d3f42..fc5bea17b4 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -89,6 +89,13 @@ public class SimpleDataUtil {
           Types.NestedField.optional(2, "data", Types.StringType.get()),
           Types.NestedField.optional(3, "extra", Types.StringType.get()));
 
+  public static final Schema SCHEMA3 =
+      new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()),
+          Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
+          Types.NestedField.optional(4, "_last_updated_sequence_number", 
Types.LongType.get()));
+
   public static final ResolvedSchema FLINK_SCHEMA =
       ResolvedSchema.of(
           Column.physical("id", DataTypes.INT()), Column.physical("data", 
DataTypes.STRING()));
@@ -100,6 +107,7 @@ public class SimpleDataUtil {
 
   public static final Record RECORD = GenericRecord.create(SCHEMA);
   public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
+  public static final Record RECORD3 = GenericRecord.create(SCHEMA3);
 
   public static Table createTable(
       String path, Map<String, String> properties, boolean partitioned) {
@@ -127,6 +135,16 @@ public class SimpleDataUtil {
     return record;
   }
 
+  public static Record createRecordWithRowId(
+      Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
+    Record record = RECORD3.copy();
+    record.setField("id", id);
+    record.setField("data", data);
+    record.setField("_row_id", rowId);
+    record.setField("_last_updated_sequence_number", 
lastUpdatedSequenceNumber);
+    return record;
+  }
+
   public static RowData createRowData(Integer id, String data) {
     return GenericRowData.of(id, StringData.fromString(data));
   }
@@ -348,6 +366,11 @@ public class SimpleDataUtil {
 
   public static void assertTableRecords(Table table, List<Record> expected, 
String branch)
       throws IOException {
+    assertTableRecords(table, expected, branch, table.schema());
+  }
+
+  public static void assertTableRecords(
+      Table table, List<Record> expected, String branch, Schema projectSchema) 
throws IOException {
     table.refresh();
     Snapshot snapshot = latestSnapshot(table, branch);
 
@@ -360,12 +383,15 @@ public class SimpleDataUtil {
       return;
     }
 
-    Types.StructType type = table.schema().asStruct();
+    Types.StructType type = projectSchema.asStruct();
     StructLikeSet expectedSet = StructLikeSet.create(type);
     expectedSet.addAll(expected);
 
     try (CloseableIterable<Record> iterable =
-        
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+        IcebergGenerics.read(table)
+            .useSnapshot(snapshot.snapshotId())
+            .project(projectSchema)
+            .build()) {
       StructLikeSet actualSet = StructLikeSet.create(type);
 
       for (Record record : iterable) {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 795057e235..707038c925 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,9 @@ import java.util.List;
 import java.util.stream.StreamSupport;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -79,6 +82,118 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             createRecord(4, "d")));
   }
 
+  @Test
+  void testRewriteUnpartitionedPreserveLineage() throws Exception {
+    Table table = createTable(3);
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    insert(table, 3, "c");
+    insert(table, 4, "d");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 1, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+            SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
+  @Test
+  void testRewriteTheSameFilePreserveLineage() throws Exception {
+    Table table = createTable(3);
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    // Create a file with two lines of data to verify that the rowid is read 
correctly.
+    insert(
+        table,
+        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")));
+
+    assertFileNum(table, 3, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 1, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+            // The Ids 3 and 4 come from the same file, so the last updated 
sequence number should
+            // be the same.
+            SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
+  @Test
+  void testRewritePartitionedPreserveLineage() throws Exception {
+    Table table = createPartitionedTable(3);
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles();
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 2, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L),
+            SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
   @Test
   void testRewritePartitioned() throws Exception {
     Table table = createPartitionedTable();
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 8460b392e2..5eecc5a803 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.execution.JobClient;
@@ -49,6 +50,7 @@ import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -122,11 +124,10 @@ public class OperatorTestBase {
   }
 
   protected static Table createTable() {
-    // only test V2 tables as compaction doesn't support V3 with row lineage
-    return createTable("2");
+    return createTable(2);
   }
 
-  protected static Table createTable(String formatVersion) {
+  protected static Table createTable(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -136,12 +137,16 @@ public class OperatorTestBase {
             null,
             ImmutableMap.of(
                 TableProperties.FORMAT_VERSION,
-                formatVersion,
+                String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
                 "100000"));
   }
 
   protected static Table createTableWithDelete() {
+    return createTableWithDelete(2);
+  }
+
+  protected static Table createTableWithDelete(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -149,10 +154,11 @@ public class OperatorTestBase {
             SCHEMA_WITH_PRIMARY_KEY,
             PartitionSpec.unpartitioned(),
             null,
-            ImmutableMap.of("format-version", "2", "write.upsert.enabled", 
"true"));
+            ImmutableMap.of(
+                "format-version", String.valueOf(formatVersion), 
"write.upsert.enabled", "true"));
   }
 
-  protected static Table createPartitionedTable() {
+  protected static Table createPartitionedTable(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -160,7 +166,15 @@ public class OperatorTestBase {
             SimpleDataUtil.SCHEMA,
             
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
             null,
-            ImmutableMap.of("format-version", "2", 
"flink.max-continuous-empty-commits", "100000"));
+            ImmutableMap.of(
+                "format-version",
+                String.valueOf(formatVersion),
+                "flink.max-continuous-empty-commits",
+                "100000"));
+  }
+
+  protected static Table createPartitionedTable() {
+    return createPartitionedTable(2);
   }
 
   protected void insert(Table table, Integer id, String data) throws 
IOException {
@@ -169,6 +183,11 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insert(Table table, List<Record> records) throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, 
warehouseDir).appendToTable(records);
+    table.refresh();
+  }
+
   protected void insert(Table table, Integer id, String data, String extra) 
throws IOException {
     new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
         .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data, extra)));
@@ -195,6 +214,35 @@ public class OperatorTestBase {
     table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit();
   }
 
+  /**
+   * For the same identifier column id this methods simulate the following row 
operations: <tr>
+   * <li>add an equality delete on oldData
+   * <li>insert tempData
+   * <li>add a position delete on tempData
+   * <li>insert newData </tr>
+   *
+   * @param table to modify
+   * @param id the identifier column id
+   * @param oldData the old data to be deleted
+   * @param tempData the temp data to be inserted and deleted with a position 
delete
+   * @param newData the new data to be inserted
+   * @param formatVersion the format version to use
+   */
+  protected void update(
+      Table table, Integer id, String oldData, String tempData, String 
newData, int formatVersion)
+      throws IOException {
+    DataFile dataFile =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+            .writeFile(
+                Lists.newArrayList(
+                    SimpleDataUtil.createRecord(id, tempData),
+                    SimpleDataUtil.createRecord(id, newData)));
+    DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData, formatVersion);
+
+    
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
+  }
+
   /**
    * For the same identifier column id this methods simulate the following row 
operations: <tr>
    * <li>add an equality delete on oldData
@@ -217,7 +265,7 @@ public class OperatorTestBase {
                     SimpleDataUtil.createRecord(id, tempData),
                     SimpleDataUtil.createRecord(id, newData)));
     DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
-    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData);
+    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData, 2);
 
     
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
   }
@@ -237,6 +285,13 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insertPartitioned(Table table, List<Record> records, String 
partition)
+      throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(TestHelpers.Row.of(partition), records);
+    table.refresh();
+  }
+
   protected void dropTable() {
     
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
   }
@@ -332,7 +387,8 @@ public class OperatorTestBase {
   }
 
   private DeleteFile writePosDelete(
-      Table table, CharSequence path, Integer pos, Integer id, String oldData) 
throws IOException {
+      Table table, CharSequence path, Integer pos, Integer id, String oldData, 
int formatVersion)
+      throws IOException {
     File file = File.createTempFile("junit", null, warehouseDir.toFile());
     assertThat(file.delete()).isTrue();
     PositionDelete<GenericRecord> posDelete = PositionDelete.create();
@@ -341,7 +397,7 @@ public class OperatorTestBase {
     nested.set(1, oldData);
     posDelete.set(path, pos, nested);
     return FileHelpers.writePosDeleteFile(
-        table, Files.localOutput(file), null, Lists.newArrayList(posDelete));
+        table, Files.localOutput(file), null, Lists.newArrayList(posDelete), 
formatVersion);
   }
 
   static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness) 
throws Exception {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 2d83f553e5..9f4f96e106 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -22,7 +22,6 @@ import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
 import java.util.Set;
@@ -41,18 +40,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
 class TestDataFileRewritePlanner extends OperatorTestBase {
-  @Test
-  void testFailsOnV3Table() throws Exception {
-    Table table = createTable("3");
-    Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3);
-    insert(table, 1, "a");
-    expected.addAll(newDataFiles(table));
-
-    assertThatThrownBy(() -> planDataFileRewrite(tableLoader()))
-        .hasMessageContaining(
-            "Flink does not support compaction on row lineage enabled tables 
(V3+)")
-        .isInstanceOf(IllegalArgumentException.class);
-  }
 
   @Test
   void testUnpartitioned() throws Exception {
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 3c5a103287..4e21c7a956 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -248,6 +248,26 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
         ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
   }
 
+  @Test
+  void testV3Table() throws Exception {
+    Table table = createTableWithDelete(3);
+    update(table, 1, null, "a", "b", 3);
+    update(table, 1, "b", "c");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(1);
+
+    List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+    assertThat(actual).hasSize(1);
+
+    assertRewriteFileGroup(
+        actual.get(0),
+        table,
+        records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))),
+        1,
+        ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+  }
+
   @Test
   void testSplitSize() throws Exception {
     Table table = createTable();
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
index 81db62e8bf..c50060e16a 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewritePlanner.java
@@ -29,8 +29,6 @@ import org.apache.flink.util.Collector;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.SerializableTable;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.actions.BinPackRewriteFilePlanner;
 import org.apache.iceberg.actions.FileRewritePlan;
 import org.apache.iceberg.actions.RewriteDataFiles;
@@ -93,10 +91,6 @@ public class DataFileRewritePlanner
   @Override
   public void open(OpenContext context) throws Exception {
     tableLoader.open();
-    Table table = tableLoader.loadTable();
-    Preconditions.checkArgument(
-        !TableUtil.supportsRowLineage(table),
-        "Flink does not support compaction on row lineage enabled tables 
(V3+)");
     this.errorCounter =
         TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex)
             .counter(TableMaintenanceMetrics.ERROR_COUNTER);
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
index ad3b045400..57b0e53d86 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DataFileRewriteRunner.java
@@ -28,11 +28,15 @@ import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.actions.RewriteFileGroup;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import 
org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner.PlannedGroup;
@@ -108,8 +112,10 @@ public class DataFileRewriteRunner
           value.group().rewrittenFiles().size());
     }
 
-    try (TaskWriter<RowData> writer = writerFor(value)) {
-      try (DataIterator<RowData> iterator = readerFor(value)) {
+    boolean preserveRowId = TableUtil.supportsRowLineage(value.table());
+
+    try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
+      try (DataIterator<RowData> iterator = readerFor(value, preserveRowId)) {
         while (iterator.hasNext()) {
           writer.write(iterator.next());
         }
@@ -169,30 +175,42 @@ public class DataFileRewriteRunner
     }
   }
 
-  private TaskWriter<RowData> writerFor(PlannedGroup value) {
+  private TaskWriter<RowData> writerFor(PlannedGroup value, boolean 
preserveRowId) {
     String formatString =
         PropertyUtil.propertyAsString(
             value.table().properties(),
             TableProperties.DEFAULT_FILE_FORMAT,
             TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+    Schema writeSchema =
+        preserveRowId
+            ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+            : value.table().schema();
+    RowType flinkWriteType = FlinkSchemaUtil.convert(writeSchema);
     RowDataTaskWriterFactory factory =
         new RowDataTaskWriterFactory(
-            value.table(),
-            FlinkSchemaUtil.convert(value.table().schema()),
+            value::table,
+            flinkWriteType,
             value.group().inputSplitSize(),
             FileFormat.fromString(formatString),
             value.table().properties(),
             null,
-            false);
+            false,
+            writeSchema,
+            value.table().spec());
     factory.initialize(subTaskId, attemptId);
     return factory.create();
   }
 
-  private DataIterator<RowData> readerFor(PlannedGroup value) {
+  private DataIterator<RowData> readerFor(PlannedGroup value, boolean 
preserveRowId) {
+    Schema projectedSchema =
+        preserveRowId
+            ? MetadataColumns.schemaWithRowLineage(value.table().schema())
+            : value.table().schema();
+
     RowDataFileScanTaskReader reader =
         new RowDataFileScanTaskReader(
             value.table().schema(),
-            value.table().schema(),
+            projectedSchema,
             PropertyUtil.propertyAsString(value.table().properties(), 
DEFAULT_NAME_MAPPING, null),
             false,
             Collections.emptyList());
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index bf6f72cc28..b8fb1ba32e 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PartitionUtil;
@@ -84,13 +83,7 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
   @Override
   public CloseableIterator<RowData> open(
       FileScanTask task, InputFilesDecryptor inputFilesDecryptor) {
-    Schema partitionSchema = TypeUtil.select(projectedSchema, 
task.spec().identitySourceIds());
-
-    Map<Integer, ?> idToConstant =
-        partitionSchema.columns().isEmpty()
-            ? ImmutableMap.of()
-            : PartitionUtil.constantsMap(task, RowDataUtil::convertConstant);
-
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, 
RowDataUtil::convertConstant);
     FlinkDeleteFilter deletes =
         new FlinkDeleteFilter(task, tableSchema, projectedSchema, 
inputFilesDecryptor);
     CloseableIterable<RowData> iterable =
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 6db2b79f77..542376b06c 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -89,6 +89,13 @@ public class SimpleDataUtil {
           Types.NestedField.optional(2, "data", Types.StringType.get()),
           Types.NestedField.optional(3, "extra", Types.StringType.get()));
 
+  public static final Schema SCHEMA3 =
+      new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "data", Types.StringType.get()),
+          Types.NestedField.optional(3, "_row_id", Types.LongType.get()),
+          Types.NestedField.optional(4, "_last_updated_sequence_number", 
Types.LongType.get()));
+
   public static final ResolvedSchema FLINK_SCHEMA =
       ResolvedSchema.of(
           Column.physical("id", DataTypes.INT()), Column.physical("data", 
DataTypes.STRING()));
@@ -100,6 +107,7 @@ public class SimpleDataUtil {
 
   public static final Record RECORD = GenericRecord.create(SCHEMA);
   public static final Record RECORD2 = GenericRecord.create(SCHEMA2);
+  public static final Record RECORD3 = GenericRecord.create(SCHEMA3);
 
   public static Table createTable(
       String path, Map<String, String> properties, boolean partitioned) {
@@ -127,6 +135,16 @@ public class SimpleDataUtil {
     return record;
   }
 
+  public static Record createRecordWithRowId(
+      Integer id, String data, Long rowId, Long lastUpdatedSequenceNumber) {
+    Record record = RECORD3.copy();
+    record.setField("id", id);
+    record.setField("data", data);
+    record.setField("_row_id", rowId);
+    record.setField("_last_updated_sequence_number", 
lastUpdatedSequenceNumber);
+    return record;
+  }
+
   public static RowData createRowData(Integer id, String data) {
     return GenericRowData.of(id, StringData.fromString(data));
   }
@@ -348,6 +366,11 @@ public class SimpleDataUtil {
 
   public static void assertTableRecords(Table table, List<Record> expected, 
String branch)
       throws IOException {
+    assertTableRecords(table, expected, branch, table.schema());
+  }
+
+  public static void assertTableRecords(
+      Table table, List<Record> expected, String branch, Schema projectSchema) 
throws IOException {
     table.refresh();
     Snapshot snapshot = latestSnapshot(table, branch);
 
@@ -360,12 +383,15 @@ public class SimpleDataUtil {
       return;
     }
 
-    Types.StructType type = table.schema().asStruct();
+    Types.StructType type = projectSchema.asStruct();
     StructLikeSet expectedSet = StructLikeSet.create(type);
     expectedSet.addAll(expected);
 
     try (CloseableIterable<Record> iterable =
-        
IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
+        IcebergGenerics.read(table)
+            .useSnapshot(snapshot.snapshotId())
+            .project(projectSchema)
+            .build()) {
       StructLikeSet actualSet = StructLikeSet.create(type);
 
       for (Record record : iterable) {
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 795057e235..707038c925 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -33,6 +33,9 @@ import java.util.List;
 import java.util.stream.StreamSupport;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -79,6 +82,118 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             createRecord(4, "d")));
   }
 
+  @Test
+  void testRewriteUnpartitionedPreserveLineage() throws Exception {
+    Table table = createTable(3);
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    insert(table, 3, "c");
+    insert(table, 4, "d");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 1, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+            SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 4L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
+  @Test
+  void testRewriteTheSameFilePreserveLineage() throws Exception {
+    Table table = createTable(3);
+    insert(table, 1, "a");
+    insert(table, 2, "b");
+    // Create a file with two lines of data to verify that the rowid is read 
correctly.
+    insert(
+        table,
+        ImmutableList.of(SimpleDataUtil.createRecord(3, "c"), 
SimpleDataUtil.createRecord(4, "d")));
+
+    assertFileNum(table, 3, 0);
+
+    appendRewriteDataFiles(
+        RewriteDataFiles.builder()
+            .parallelism(2)
+            .deleteFileThreshold(10)
+            .targetFileSizeBytes(1_000_000L)
+            .maxFileGroupSizeBytes(10_000_000L)
+            .maxFileSizeBytes(2_000_000L)
+            .minFileSizeBytes(500_000L)
+            .minInputFiles(2)
+            .partialProgressEnabled(true)
+            .partialProgressMaxCommits(1)
+            .maxRewriteBytes(100_000L)
+            .rewriteAll(false));
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 1, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "a", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "b", 1L, 2L),
+            // The Ids 3 and 4 come from the same file, so the last updated 
sequence number should
+            // be the same.
+            SimpleDataUtil.createRecordWithRowId(3, "c", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "d", 3L, 3L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
+  @Test
+  void testRewritePartitionedPreserveLineage() throws Exception {
+    Table table = createPartitionedTable(3);
+    insertPartitioned(table, 1, "p1");
+    insertPartitioned(table, 2, "p1");
+    insertPartitioned(table, 3, "p2");
+    insertPartitioned(table, 4, "p2");
+
+    assertFileNum(table, 4, 0);
+
+    appendRewriteDataFiles();
+
+    runAndWaitForSuccess(infra.env(), infra.source(), infra.sink());
+
+    assertFileNum(table, 2, 0);
+
+    Schema schema = MetadataColumns.schemaWithRowLineage(table.schema());
+    SimpleDataUtil.assertTableRecords(
+        table,
+        ImmutableList.of(
+            SimpleDataUtil.createRecordWithRowId(1, "p1", 0L, 1L),
+            SimpleDataUtil.createRecordWithRowId(2, "p1", 1L, 2L),
+            SimpleDataUtil.createRecordWithRowId(3, "p2", 2L, 3L),
+            SimpleDataUtil.createRecordWithRowId(4, "p2", 3L, 4L)),
+        SnapshotRef.MAIN_BRANCH,
+        schema);
+  }
+
   @Test
   void testRewritePartitioned() throws Exception {
     Table table = createPartitionedTable();
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index f9cbc9715c..b9422a63d6 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -24,6 +24,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
@@ -49,6 +50,7 @@ import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -122,11 +124,10 @@ public class OperatorTestBase {
   }
 
   protected static Table createTable() {
-    // only test V2 tables as compaction doesn't support V3 with row lineage
-    return createTable("2");
+    return createTable(2);
   }
 
-  protected static Table createTable(String formatVersion) {
+  protected static Table createTable(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -136,12 +137,16 @@ public class OperatorTestBase {
             null,
             ImmutableMap.of(
                 TableProperties.FORMAT_VERSION,
-                formatVersion,
+                String.valueOf(formatVersion),
                 "flink.max-continuous-empty-commits",
                 "100000"));
   }
 
   protected static Table createTableWithDelete() {
+    return createTableWithDelete(2);
+  }
+
+  protected static Table createTableWithDelete(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -149,10 +154,11 @@ public class OperatorTestBase {
             SCHEMA_WITH_PRIMARY_KEY,
             PartitionSpec.unpartitioned(),
             null,
-            ImmutableMap.of("format-version", "2", "write.upsert.enabled", 
"true"));
+            ImmutableMap.of(
+                "format-version", String.valueOf(formatVersion), 
"write.upsert.enabled", "true"));
   }
 
-  protected static Table createPartitionedTable() {
+  protected static Table createPartitionedTable(int formatVersion) {
     return CATALOG_EXTENSION
         .catalog()
         .createTable(
@@ -160,7 +166,15 @@ public class OperatorTestBase {
             SimpleDataUtil.SCHEMA,
             
PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build(),
             null,
-            ImmutableMap.of("format-version", "2", 
"flink.max-continuous-empty-commits", "100000"));
+            ImmutableMap.of(
+                "format-version",
+                String.valueOf(formatVersion),
+                "flink.max-continuous-empty-commits",
+                "100000"));
+  }
+
+  protected static Table createPartitionedTable() {
+    return createPartitionedTable(2);
   }
 
   protected void insert(Table table, Integer id, String data) throws 
IOException {
@@ -169,6 +183,11 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insert(Table table, List<Record> records) throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, 
warehouseDir).appendToTable(records);
+    table.refresh();
+  }
+
   protected void insert(Table table, Integer id, String data, String extra) 
throws IOException {
     new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
         .appendToTable(Lists.newArrayList(SimpleDataUtil.createRecord(id, 
data, extra)));
@@ -195,6 +214,35 @@ public class OperatorTestBase {
     table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).commit();
   }
 
+  /**
+   * For the same identifier column id this methods simulate the following row 
operations: <tr>
+   * <li>add an equality delete on oldData
+   * <li>insert tempData
+   * <li>add a position delete on tempData
+   * <li>insert newData </tr>
+   *
+   * @param table to modify
+   * @param id the identifier column id
+   * @param oldData the old data to be deleted
+   * @param tempData the temp data to be inserted and deleted with a position 
delete
+   * @param newData the new data to be inserted
+   * @param formatVersion the format version to use
+   */
+  protected void update(
+      Table table, Integer id, String oldData, String tempData, String 
newData, int formatVersion)
+      throws IOException {
+    DataFile dataFile =
+        new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+            .writeFile(
+                Lists.newArrayList(
+                    SimpleDataUtil.createRecord(id, tempData),
+                    SimpleDataUtil.createRecord(id, newData)));
+    DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
+    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData, formatVersion);
+
+    
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
+  }
+
   /**
    * For the same identifier column id this methods simulate the following row 
operations: <tr>
    * <li>add an equality delete on oldData
@@ -217,7 +265,7 @@ public class OperatorTestBase {
                     SimpleDataUtil.createRecord(id, tempData),
                     SimpleDataUtil.createRecord(id, newData)));
     DeleteFile eqDelete = writeEqualityDelete(table, id, oldData);
-    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData);
+    DeleteFile posDelete = writePosDelete(table, dataFile.path(), 0, id, 
tempData, 2);
 
     
table.newRowDelta().addRows(dataFile).addDeletes(eqDelete).addDeletes(posDelete).commit();
   }
@@ -237,6 +285,13 @@ public class OperatorTestBase {
     table.refresh();
   }
 
+  protected void insertPartitioned(Table table, List<Record> records, String 
partition)
+      throws IOException {
+    new GenericAppenderHelper(table, FileFormat.PARQUET, warehouseDir)
+        .appendToTable(TestHelpers.Row.of(partition), records);
+    table.refresh();
+  }
+
   protected void dropTable() {
     
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
   }
@@ -329,7 +384,8 @@ public class OperatorTestBase {
   }
 
   private DeleteFile writePosDelete(
-      Table table, CharSequence path, Integer pos, Integer id, String oldData) 
throws IOException {
+      Table table, CharSequence path, Integer pos, Integer id, String oldData, 
int formatVersion)
+      throws IOException {
     File file = File.createTempFile("junit", null, warehouseDir.toFile());
     assertThat(file.delete()).isTrue();
     PositionDelete<GenericRecord> posDelete = PositionDelete.create();
@@ -338,7 +394,7 @@ public class OperatorTestBase {
     nested.set(1, oldData);
     posDelete.set(path, pos, nested);
     return FileHelpers.writePosDeleteFile(
-        table, Files.localOutput(file), null, Lists.newArrayList(posDelete));
+        table, Files.localOutput(file), null, Lists.newArrayList(posDelete), 
formatVersion);
   }
 
   static void trigger(OneInputStreamOperatorTestHarness<Trigger, ?> harness) 
throws Exception {
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
index 2d83f553e5..9f4f96e106 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewritePlanner.java
@@ -22,7 +22,6 @@ import static 
org.apache.iceberg.actions.SizeBasedFileRewritePlanner.MIN_INPUT_F
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.newDataFiles;
 import static 
org.apache.iceberg.flink.maintenance.operator.RewriteUtil.planDataFileRewrite;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.List;
 import java.util.Set;
@@ -41,18 +40,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.junit.jupiter.api.Test;
 
 class TestDataFileRewritePlanner extends OperatorTestBase {
-  @Test
-  void testFailsOnV3Table() throws Exception {
-    Table table = createTable("3");
-    Set<DataFile> expected = Sets.newHashSetWithExpectedSize(3);
-    insert(table, 1, "a");
-    expected.addAll(newDataFiles(table));
-
-    assertThatThrownBy(() -> planDataFileRewrite(tableLoader()))
-        .hasMessageContaining(
-            "Flink does not support compaction on row lineage enabled tables 
(V3+)")
-        .isInstanceOf(IllegalArgumentException.class);
-  }
 
   @Test
   void testUnpartitioned() throws Exception {
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
index 3c5a103287..4e21c7a956 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDataFileRewriteRunner.java
@@ -248,6 +248,26 @@ class TestDataFileRewriteRunner extends OperatorTestBase {
         ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
   }
 
+  @Test
+  void testV3Table() throws Exception {
+    Table table = createTableWithDelete(3);
+    update(table, 1, null, "a", "b", 3);
+    update(table, 1, "b", "c");
+
+    List<DataFileRewritePlanner.PlannedGroup> planned = 
planDataFileRewrite(tableLoader());
+    assertThat(planned).hasSize(1);
+
+    List<DataFileRewriteRunner.ExecutedGroup> actual = executeRewrite(planned);
+    assertThat(actual).hasSize(1);
+
+    assertRewriteFileGroup(
+        actual.get(0),
+        table,
+        records(table.schema(), ImmutableSet.of(ImmutableList.of(1, "c"))),
+        1,
+        ImmutableSet.of(new 
PartitionData(PartitionSpec.unpartitioned().partitionType())));
+  }
+
   @Test
   void testSplitSize() throws Exception {
     Table table = createTable();

Reply via email to