anuragmantri commented on code in PR #16220:
URL: https://github.com/apache/iceberg/pull/16220#discussion_r3210600924


##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java:
##########
@@ -316,6 +316,104 @@ public void testIncrementalRewrite() throws Exception {
     assertEquals("Rows should match after copy", expected, actual);
   }
 
+  @TestTemplate
+  public void testIncrementalRewriteWithRewriteManifestsAndExpire() throws 
Exception {
+    String location = newTableLocation();
+    Table sourceTable =
+        TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), 
Maps.newHashMap(), location);
+
+    // Write 2 initial snapshots
+    List<ThreeColumnRecord> recordsA =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> dfA = spark.createDataFrame(recordsA, 
ThreeColumnRecord.class).coalesce(1);
+    dfA.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+
+    List<ThreeColumnRecord> recordsB =
+        Lists.newArrayList(new ThreeColumnRecord(2, "BBBBBBBBBB", "BBBB"));
+    Dataset<Row> dfB = spark.createDataFrame(recordsB, 
ThreeColumnRecord.class).coalesce(1);
+    dfB.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    sourceTable.refresh();
+
+    // Full replication
+    RewriteTablePath.Result initialResult =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .stagingLocation(stagingLocation())
+            .execute();
+    copyTableFiles(initialResult);
+    
assertThat(spark.read().format("iceberg").load(targetTableLocation()).collectAsList())
+        .hasSize(2);
+
+    // Append new data
+    List<ThreeColumnRecord> recordsC =
+        Lists.newArrayList(new ThreeColumnRecord(3, "CCCCCCCCCC", "CCCC"));
+    Dataset<Row> dfC = spark.createDataFrame(recordsC, 
ThreeColumnRecord.class).coalesce(1);
+    dfC.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(location);
+    sourceTable.refresh();
+
+    // RewriteManifests merges all manifests
+    actions().rewriteManifests(sourceTable).execute();
+    sourceTable.refresh();
+
+    // Expire old snapshots including the one that added file C
+    actions()
+        .expireSnapshots(sourceTable)
+        .retainLast(1)
+        .expireOlderThan(sourceTable.currentSnapshot().timestampMillis() + 1)
+        .execute();
+    sourceTable.refresh();
+
+    // Incremental replication should still copy file C
+    Table targetTable = TABLES.load(targetTableLocation());
+    String startVersion = 
fileName(currentMetadata(targetTable).metadataFileLocation());
+    RewriteTablePath.Result incrementalResult =
+        actions()
+            .rewriteTablePath(sourceTable)
+            .rewriteLocationPrefix(sourceTable.location(), 
targetTableLocation())
+            .stagingLocation(stagingLocation())
+            .startVersion(startVersion)
+            .execute();
+    copyTableFiles(incrementalResult);
+
+    List<Object[]> actual = rowsSorted(targetTableLocation(), "c1");
+    List<Object[]> expected = rowsSorted(location, "c1");
+    assertEquals(

Review Comment:
   Worth asserting the incremental result only contains 1 new file (fileC) to 
validate dedup is actually filtering.



##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java:
##########
@@ -310,8 +310,22 @@ private Result rebuildMetadata() {
 
     // rebuild manifest files
     Set<ManifestFile> metaFiles = rewriteManifestListResult.toRewrite();
-    RewriteContentFileResult rewriteManifestResult =
-        rewriteManifests(deltaSnapshots, endMetadata, metaFiles);
+    RewriteContentFileResult rewriteManifestResult = 
rewriteManifests(endMetadata, metaFiles);
+
+    // For incremental copies, filter out files that were already copied in 
previous replication
+    if (startMetadata != null && !rewriteManifestResult.copyPlan().isEmpty()) {
+      try {
+        Table startTable = newStaticTable(startVersionName, table.io());
+        Set<String> previouslyCopiedPaths =
+            Sets.newHashSet(
+                
contentFileDS(startTable).select("path").as(Encoders.STRING()).collectAsList());
+        rewriteManifestResult
+            .copyPlan()
+            .removeIf(pair -> previouslyCopiedPaths.contains(pair.first()));
+      } catch (Exception e) {
+        LOG.warn("Unable to read content files from start version for 
incremental filtering", e);

Review Comment:
   Can you also add the consequence of this to the warning - `falling back to 
full copy plan.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to