lcspinter commented on code in PR #3222:
URL: https://github.com/apache/hive/pull/3222#discussion_r853859878


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+    if (toTime != -1) {
+      if (fromTime >= toTime) {

Review Comment:
   I think we can move this check to the beginning of the method, to spare some 
execution time.



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -163,4 +165,32 @@ public static void updateSpec(Configuration configuration, 
Table table) {
   public static boolean isBucketed(Table table) {
     return table.spec().fields().stream().anyMatch(f -> 
f.transform().toString().startsWith("bucket["));
   }
+
+  /**
+   * Returns the snapshot ID which is immediately before (or exactly at) the 
timestamp provided in millis.
+   * If the timestamp provided is before the first snapshot of the table, we 
return an empty optional.
+   * If the timestamp provided is in the future compared to the latest 
snapshot, we return the latest snapshot ID.
+   *
+   * E.g.: if we have snapshots S1, S2, S3 committed at times T3, T6, T9 
respectively (T0 = start of epoch), then:
+   * - from T0 to T2 -> returns empty
+   * - from T3 to T5 -> returns S1
+   * - from T6 to T8 -> returns S2
+   * - from T9 to T∞ -> returns S3
+   *
+   * @param table the table whose snapshot ID we are trying to find
+   * @param time the timestamp provided in milliseconds
+   * @return the snapshot ID corresponding to the time
+   */
+  public static Optional<Long> findSnapshotForTimestamp(Table table, long 
time) {
+    if (table.history().get(0).timestampMillis() > time) {
+      return Optional.empty();
+    }
+
+    for (Snapshot snapshot : table.snapshots()) {

Review Comment:
   Are we certain that the table.snapshots() returns a list sorted by snapshot 
time?



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);

Review Comment:
   nit: Can we move the toTime to the method param? 



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit 
split, TaskAttemptCon
     return new IcebergRecordReader<>();
   }
 
+  private static TableScan scanWithTimeRange(Table table, Configuration conf, 
TableScan scan, long fromTime) {
+    // let's find the corresponding snapshot ID - if the fromTime is before 
the table creation happened, let's use
+    // the first snapshot of the table
+    long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
fromTime)
+        .orElseGet(() -> table.history().get(0).snapshotId());
+    if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+      throw new IllegalArgumentException(
+          "Provided FROM timestamp must be earlier than the latest snapshot of 
the table.");
+    }
+    long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+    if (toTime != -1) {
+      if (fromTime >= toTime) {
+        throw new IllegalArgumentException(
+            "Provided FROM timestamp must precede the provided TO timestamp.");
+      }
+      long toSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table, 
toTime)
+          .orElseThrow(() -> new IllegalArgumentException(
+              "Provided TO timestamp must be after the first snapshot of the 
table."));
+      return scan.appendsBetween(fromSnapshot, toSnapshot);
+    } else {
+      return scan.appendsAfter(fromSnapshot);
+    }
+  }
+
+  private static TableScan scanWithVersionRange(Configuration conf, TableScan 
scan, long fromSnapshot) {
+    long toSnapshot = conf.getLong(InputFormatConfig.TO_VERSION, -1);

Review Comment:
   Nit: move toSnapshot to method param



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to