wypoon commented on a change in pull request #1508:
URL: https://github.com/apache/iceberg/pull/1508#discussion_r563955001



##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -60,10 +61,13 @@ public SparkTable getTable(StructType schema, Transform[] 
partitioning, Map<Stri
     // Get Iceberg table from options
     Configuration conf = SparkSession.active().sessionState().newHadoopConf();
     Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
+    CaseInsensitiveStringMap cIOptions = new CaseInsensitiveStringMap(options);
+    Long snapshotId = Spark3Util.propertyAsLong(cIOptions, "snapshot-id", 
null);
+    Long asOfTimestamp = Spark3Util.propertyAsLong(cIOptions, 
"as-of-timestamp", null);
 
     // Build Spark table based on Iceberg table, and return it
     // Eagerly refresh the table before reading to ensure views containing 
this table show up-to-date data
-    return new SparkTable(icebergTable, schema, true);
+    return new SparkTable(icebergTable, schema, snapshotId, asOfTimestamp, 
true);

Review comment:
       @waterlx, you are correct that when we invoke `DataFrameReader#load` in 
Spark 3, `IcebergSource#getTable` is not called; rather, 
`SparkCatalog#loadTable` is called directly. This is because #1783 changed the 
spark3 `IcebergSource` to be a `SupportsCatalogOptions`, not just a 
`TableProvider`. Therefore in `DataFrameReader#load`, this 
[logic](https://github.com/apache/spark/blob/2b147c4cd50da32fe2b4167f97c8142102a0510d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L265-L271)
 is executed (which calls `SparkCatalog#loadTable`), where before, when 
`IcebergSource` was only a `TableProvider`, this [other 
logic](https://github.com/apache/spark/blob/2b147c4cd50da32fe2b4167f97c8142102a0510d/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L272-L275)
 was executed (which calls `IcebergSource#getTable`). (You already know this.)
   Given this, in order for the `SparkTable` returned by 
`SparkCatalog#loadTable(Identifier)` to be aware of the snapshot, the 
information about the snapshot (such as is specified by 
`df.read.format("iceberg").option("snapshot-id", ...).load(...)`) needs to be 
present in the `Identifier`, as you concluded. I too don't see another way to 
do it. I have an implementation along this line that works. (At the moment, my 
implementation causes some failures in the spark3-extensions unit tests around 
relation cache refresh in snapshot-related procedures, so I need to figure that 
out.)
   As for the failure that results in `iceberg does not support user specified 
schema`, I simply need to remove specifying a schema (`.schema("id INT, data 
STRING")`) when reading the Iceberg table, as `SupportsCatalogOptions` does not 
allow it. (This test was valid before #1783 but invalid after.) Related to 
this, in `SparkTable`, there is no use for a `requestedSchema` field anymore 
since we cannot specify a schema, so it should not be in the `SparkTable` 
constructor either. (I had posed this question above.)
   In summary, I concur with your investigation and thank you for sharing your 
findings.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to