rdblue commented on a change in pull request #3722:
URL: https://github.com/apache/iceberg/pull/3722#discussion_r770068550



##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +105,54 @@ public Table getTable(StructType schema, Transform[] 
partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+
+    Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
+    Long asOfTimestamp = propertyAsLong(options, 
SparkReadOptions.AS_OF_TIMESTAMP);
+    Preconditions.checkArgument(asOfTimestamp == null || snapshotId == null,
+        "Cannot specify both snapshot-id (%s) and as-of-timestamp (%s)", 
snapshotId, asOfTimestamp);
+
+    String selector = null;
+
+    if (snapshotId != null) {
+      selector = SNAPSHOT_ID + snapshotId;
+    }
+
+    if (asOfTimestamp != null) {
+      selector = AT_TIMESTAMP + asOfTimestamp;
+    }
+
     CatalogManager catalogManager = spark.sessionState().catalogManager();
 
     if (path.contains("/")) {
       // contains a path. Return iceberg default catalog and a PathIdentifier
+      String newPath = (selector == null) ? path : path + "#" + selector;
       return new 
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          new PathIdentifier(path));
+          new PathIdentifier(newPath));
     }
 
     final Spark3Util.CatalogAndIdentifier catalogAndIdentifier = 
Spark3Util.catalogAndIdentifier(
         "path or identifier", spark, path);
 
+    Identifier ident = 
identifierWithSelector(catalogAndIdentifier.identifier(), selector);
     if (catalogAndIdentifier.catalog().name().equals("spark_catalog") &&
         !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
       // catalog is a session catalog but does not support Iceberg. Use 
Iceberg instead.
       return new 
Spark3Util.CatalogAndIdentifier(catalogManager.catalog(DEFAULT_CATALOG_NAME),
-          catalogAndIdentifier.identifier());
+          ident);
     } else {
-      return catalogAndIdentifier;
+      return new 
Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(),
+          ident);

Review comment:
       Nit: looks like some of these args don't need to be wrapped to the next 
line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to