wypoon commented on a change in pull request #1508:
URL: https://github.com/apache/iceberg/pull/1508#discussion_r563955001
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -60,10 +61,13 @@ public SparkTable getTable(StructType schema, Transform[]
partitioning, Map<Stri
// Get Iceberg table from options
Configuration conf = SparkSession.active().sessionState().newHadoopConf();
Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
+ CaseInsensitiveStringMap cIOptions = new CaseInsensitiveStringMap(options);
+ Long snapshotId = Spark3Util.propertyAsLong(cIOptions, "snapshot-id",
null);
+ Long asOfTimestamp = Spark3Util.propertyAsLong(cIOptions,
"as-of-timestamp", null);
// Build Spark table based on Iceberg table, and return it
// Eagerly refresh the table before reading to ensure views containing
this table show up-to-date data
- return new SparkTable(icebergTable, schema, true);
+ return new SparkTable(icebergTable, schema, snapshotId, asOfTimestamp,
true);
Review comment:
@waterlx, you are correct that when we invoke `DataFrameReader#load` in
Spark 3, `IcebergSource#getTable` is not called; rather,
`SparkCatalog#loadTable` is called directly. This is because #1783 changed the
spark3 `IcebergSource` to be a `SupportsCatalogOptions`, not just a
`TableProvider`. Therefore in `DataFrameReader#load`, this
[logic](https://github.com/apache/spark/blob/2b147c4cd50da32fe2b4167f97c8142102a0510d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L265-L271)
is executed (which calls `SparkCatalog#loadTable`), where before, when
`IcebergSource` was only a `TableProvider`, this [other
logic](https://github.com/apache/spark/blob/2b147c4cd50da32fe2b4167f97c8142102a0510d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L272-L275)
was executed (which calls `IcebergSource#getTable`). (You already know this.)
Given this, in order for the `SparkTable` returned by
`SparkCatalog#loadTable(Identifier)` to be aware of the snapshot, the
information about the snapshot (such as is specified by
`df.read.format("iceberg").option("snapshot-id", ...).load(...)`) needs to be
present in the `Identifier`, as you concluded. I too don't see another way to
do it. I have an implementation along this line that works. (At the moment, my
implementation causes some failures in the spark3-extensions unit tests around
relation cache refresh in snapshot-related procedures, so I need to figure that
out.)
As for the failure that results in `iceberg does not support user specified
schema`, I simply need to remove specifying a schema (`.schema("id INT, data
STRING")`) when reading the Iceberg table, as `SupportsCatalogOptions` does not
allow it. (This test was valid before #1783 but invalid after.) Related to
this, in `SparkTable`, there is no use for a `requestedSchema` field anymore
since we cannot specify a schema, so it should not be in the `SparkTable`
constructor either. (I had posed this question above.)
In summary, I concur with your investigation and thank you for sharing your
findings.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]