szehon-ho commented on code in PR #16523:
URL: https://github.com/apache/iceberg/pull/16523#discussion_r3359484857
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/SparkDistributedDataScan.java:
##########
@@ -159,7 +159,7 @@ private DeleteFileIndex
doPlanDeletesRemotely(List<ManifestFile> deleteManifests
List<DeleteFile> deleteFiles =
sparkContext
.parallelize(toBeans(deleteManifests), deleteManifests.size())
- .flatMap(new ReadDeleteManifest(tableBroadcast(), context()))
+ .flatMap(new ReadDeleteManifest(tableBroadcast(), specs(),
context()))
Review Comment:
Good — this fixes the executor-side read for time travel. But a few lines
below in this same method, the `DeleteFileIndex` is still built with
`.specsById(table().specs())`:
```java
return DeleteFileIndex.builderFor(deleteFiles)
.specsById(table().specs()) // <- should be specs()
.caseSensitive(isCaseSensitive())
...
```
That's the current-schema spec, which is inconsistent with the read you just
fixed here and with `BaseDistributedDataScan#planDeletesLocally` (which uses
`.specsById(specs())`). Please switch it to `specs()` for the time-travel case.
Same in the v4.0 and v4.1 copies.
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java:
##########
@@ -740,6 +740,49 @@ public void simpleTypesInFilter() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @TestTemplate
+ public void testTimeTravelFilterOnRenamedColumn() {
+ String ttTableName = tableName("tt_rename_table");
+ sql("DROP TABLE IF EXISTS %s", ttTableName);
+ sql(
+ "CREATE TABLE %s (id BIGINT, col DOUBLE) USING iceberg TBLPROPERTIES ("
+ + "'read.data-planning-mode'='distributed',"
+ + "'read.delete-planning-mode'='distributed')",
+ ttTableName);
+ sql("INSERT INTO %s VALUES (1, 100.0), (2, 200.0), (3, 0.0)", ttTableName);
+
+ TableIdentifier ttTableIdent = TableIdentifier.of(tableIdent.namespace(),
"tt_rename_table");
+ long snapshotId =
validationCatalog.loadTable(ttTableIdent).currentSnapshot().snapshotId();
+
+ sql("ALTER TABLE %s RENAME COLUMN col TO value", ttTableName);
+ sql("INSERT INTO %s VALUES (4, 400.0)", ttTableName);
+
+ Dataset<Row> df =
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.VERSION_AS_OF, snapshotId)
Review Comment:
nit: this copy uses `SparkReadOptions.VERSION_AS_OF` whereas the v3.5 and
v4.0 copies use `SparkReadOptions.SNAPSHOT_ID`. Please keep the three module
copies identical (prefer `SNAPSHOT_ID` to match the others, since the intent
here is snapshot-id time travel).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]