wypoon commented on a change in pull request #1508:
URL: https://github.com/apache/iceberg/pull/1508#discussion_r716967094



##########
File path: core/src/main/java/org/apache/iceberg/BaseTableScan.java
##########
@@ -123,8 +124,14 @@ public TableScan useSnapshot(long scanSnapshotId) {
         "Cannot override snapshot, already set to id=%s", 
context.snapshotId());
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return newRefinedScan(
-        ops, table, schema, context.useSnapshotId(scanSnapshotId));
+    if (this instanceof DataTableScan) {

Review comment:
       Ok, I'll override the method in `DataTableScan`.

##########
File path: core/src/main/java/org/apache/iceberg/BaseTableScan.java
##########
@@ -123,8 +124,14 @@ public TableScan useSnapshot(long scanSnapshotId) {
         "Cannot override snapshot, already set to id=%s", 
context.snapshotId());
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return newRefinedScan(
-        ops, table, schema, context.useSnapshotId(scanSnapshotId));
+    if (this instanceof DataTableScan) {
+      Schema snapshotSchema = SnapshotUtil.schemaFor(table, scanSnapshotId);
+      return newRefinedScan(
+          ops, table, snapshotSchema, context.useSnapshotId(scanSnapshotId));
+    } else {
+      return newRefinedScan(
+          ops, table, schema, context.useSnapshotId(scanSnapshotId));

Review comment:
       You're right. I'll have `BaseTableScan#asOfTime` call 
`SnapshotUtil.snapshotIdAsOfTime` to avoid duplicating the logic.

##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -144,4 +146,76 @@ public static Snapshot snapshotAfter(Table table, long 
snapshotId) {
     throw new IllegalStateException(
         String.format("Cannot find snapshot after %s: not an ancestor of 
table's current snapshot", snapshotId));
   }
+
+  /**
+   * Returns the ID of the most recent snapshot for the table as of the 
timestamp.
+   *
+   * @param table a {@link Table}
+   * @param timestampMillis the timestamp in millis since the Unix epoch
+   * @return the snapshot ID
+   * @throws IllegalArgumentException when no snapshot is found in the table
+   * older than the timestamp
+   */
+  public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
+    Long snapshotId = null;
+    for (HistoryEntry logEntry : table.history()) {
+      if (logEntry.timestampMillis() <= timestampMillis) {
+        snapshotId = logEntry.snapshotId();
+      }
+    }
+
+    Preconditions.checkArgument(snapshotId != null,
+        "Cannot find a snapshot older than %s", timestampMillis);
+    return snapshotId;

Review comment:
       Ok, will do.

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -157,22 +128,88 @@
       this.localityPreferred = false;
     }
 
-    this.schema = table.schema();
-    this.caseSensitive = caseSensitive;
     this.batchSize = 
options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(()
 ->
         PropertyUtil.propertyAsInt(table.properties(),
           TableProperties.PARQUET_BATCH_SIZE, 
TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
     RuntimeConfig sessionConf = SparkSession.active().conf();
     this.readTimestampWithoutZone = 
SparkUtil.canHandleTimestampWithoutZone(options.asMap(), sessionConf);
   }
 
+  private void validateOptions(

Review comment:
       There are two checks here. The first check does not seem to be tested by 
any existing unit test. The second check is tested by an existing unit test. 
Both checks also appear in the spark3 `SparkBatchQueryScan`. For that reason, I 
am preserving them here. I commented in 
https://github.com/wypoon/iceberg/pull/1 that I'd like to defer the clean up of 
duplicate checks to a separate PR, as it should be done for both spark2 and 
spark3 (and here we're only refactoring the spark2 `Reader`). Can that cleanup 
be done after this is merged, since as you remarked before, the change is 
already quite large?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -157,22 +128,88 @@
       this.localityPreferred = false;
     }
 
-    this.schema = table.schema();
-    this.caseSensitive = caseSensitive;
     this.batchSize = 
options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(()
 ->
         PropertyUtil.propertyAsInt(table.properties(),
           TableProperties.PARQUET_BATCH_SIZE, 
TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
     RuntimeConfig sessionConf = SparkSession.active().conf();
     this.readTimestampWithoutZone = 
SparkUtil.canHandleTimestampWithoutZone(options.asMap(), sessionConf);
   }
 
+  private void validateOptions(

Review comment:
       There are two checks here. The first check does not seem to be tested by 
any existing unit test. The second check is tested by an existing unit test. 
Both checks also appear in the spark3 `SparkBatchQueryScan`. For that reason, I 
am preserving them here. I commented in 
https://github.com/wypoon/iceberg/pull/1 that I'd like to defer the clean up of 
duplicate checks to a separate PR, as it should be done for both spark2 and 
spark3 (and here we're only refactoring the spark2 `Reader`). Can that cleanup 
be done after this is merged, since as you remarked before, this change is 
already quite large?

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +103,35 @@ public Table getTable(StructType schema, Transform[] 
partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+    Long snapshotId = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SNAPSHOT_ID, null);
+    Long asOfTimestamp = Spark3Util.propertyAsLong(options, 
SparkReadOptions.AS_OF_TIMESTAMP, null);

Review comment:
       Let me look into this.

##########
File path: core/src/main/java/org/apache/iceberg/BaseTableScan.java
##########
@@ -123,8 +124,14 @@ public TableScan useSnapshot(long scanSnapshotId) {
         "Cannot override snapshot, already set to id=%s", 
context.snapshotId());
     Preconditions.checkArgument(ops.current().snapshot(scanSnapshotId) != null,
         "Cannot find snapshot with ID %s", scanSnapshotId);
-    return newRefinedScan(
-        ops, table, schema, context.useSnapshotId(scanSnapshotId));
+    if (this instanceof DataTableScan) {

Review comment:
       Q. (In your example) why check if `newScan.snapshot().schemaId() != 
null`? We can simply call `SnapshotUtil.schemaFor(table(), scanSnapshotId)` 
without checking, since the util method currently returns the table schema as a 
fallback, pending a TODO to recover the schema for snapshot by reading old 
metadata. This way, once that TODO is implemented, we don't need to revisit 
this code.
   We don't need `newScan` at all; the only reason for it would be to call 
`super.useSnapshot` for its side-effects (argument validation), which I prefer 
to avoid.

##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -101,24 +103,35 @@ public Table getTable(StructType schema, Transform[] 
partitioning, Map<String, S
     SparkSession spark = SparkSession.active();
     setupDefaultSparkCatalog(spark);
     String path = options.get("path");
+    Long snapshotId = Spark3Util.propertyAsLong(options, 
SparkReadOptions.SNAPSHOT_ID, null);
+    Long asOfTimestamp = Spark3Util.propertyAsLong(options, 
SparkReadOptions.AS_OF_TIMESTAMP, null);

Review comment:
       It turns out that you are mistaken.
   The 
[`DataSourceV2Relation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala#L131)
 is 
[created](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala#L175-L176)
 with the output attributes from the schema given by `SparkTable#schema()`.
   The `SparkTable` is loaded using the catalog and identifier, and that is why 
I need the `SnapshotAwareIdentifier` when loading it, so that I can return the 
snapshot schema in `SparkTable#schema()`.
   Otherwise the `SparkScanBuilder` does have the options for `snapshot-id` or 
`as-of-timestamp`, but its `pruneColumns()` will be called by Spark with a 
`requestedSchema` that is a subset of the table schema. Then its `build()` will 
return a `SparkBatchQueryScan` with an incorrect schema.
   Once I remove the modifications to `IcebergSource`, `SparkCatalog` and 
`SparkTable`, then the unit tests I added all fail, as I suspected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to