szehon-ho commented on code in PR #16523:
URL: https://github.com/apache/iceberg/pull/16523#discussion_r3359484857


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java:
##########
@@ -159,7 +159,7 @@ private DeleteFileIndex 
doPlanDeletesRemotely(List<ManifestFile> deleteManifests
     List<DeleteFile> deleteFiles =
         sparkContext
             .parallelize(toBeans(deleteManifests), deleteManifests.size())
-            .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
+            .flatMap(new ReadDeleteManifest(tableBroadcast(), specs(), 
context()))

Review Comment:
   Good — this fixes the executor-side read for time travel. But a few lines 
below in this same method, the `DeleteFileIndex` is still built with 
`.specsById(table().specs())`:
   
   ```java
   return DeleteFileIndex.builderFor(deleteFiles)
       .specsById(table().specs())   // <- should be specs()
       .caseSensitive(isCaseSensitive())
       ...
   ```
   
   That's the current-schema spec, which is inconsistent with the read you just 
fixed here and with `BaseDistributedDataScan#planDeletesLocally` (which uses 
`.specsById(specs())`). Please switch it to `specs()` for the time-travel case. 
Same in the v4.0 and v4.1 copies.



##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java:
##########
@@ -740,6 +740,49 @@ public void simpleTypesInFilter() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @TestTemplate
+  public void testTimeTravelFilterOnRenamedColumn() {
+    String ttTableName = tableName("tt_rename_table");
+    sql("DROP TABLE IF EXISTS %s", ttTableName);
+    sql(
+        "CREATE TABLE %s (id BIGINT, col DOUBLE) USING iceberg TBLPROPERTIES ("
+            + "'read.data-planning-mode'='distributed',"
+            + "'read.delete-planning-mode'='distributed')",
+        ttTableName);
+    sql("INSERT INTO %s VALUES (1, 100.0), (2, 200.0), (3, 0.0)", ttTableName);
+
+    TableIdentifier ttTableIdent = TableIdentifier.of(tableIdent.namespace(), 
"tt_rename_table");
+    long snapshotId = 
validationCatalog.loadTable(ttTableIdent).currentSnapshot().snapshotId();
+
+    sql("ALTER TABLE %s RENAME COLUMN col TO value", ttTableName);
+    sql("INSERT INTO %s VALUES (4, 400.0)", ttTableName);
+
+    Dataset<Row> df =
+        spark
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.VERSION_AS_OF, snapshotId)

Review Comment:
   nit: this copy uses `SparkReadOptions.VERSION_AS_OF` whereas the v3.5 and 
v4.0 copies use `SparkReadOptions.SNAPSHOT_ID`. Please keep the three module 
copies identical (prefer `SNAPSHOT_ID` to match the others, since the intent 
here is snapshot-id time travel).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to