[
https://issues.apache.org/jira/browse/HIVE-26151?focusedWorklogId=759056&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759056
]
ASF GitHub Bot logged work on HIVE-26151:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Apr/22 09:00
Start Date: 20/Apr/22 09:00
Worklog Time Spent: 10m
Work Description: lcspinter commented on code in PR #3222:
URL: https://github.com/apache/hive/pull/3222#discussion_r853859878
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+ if (toTime != -1) {
+ if (fromTime >= toTime) {
Review Comment:
I think we can move this check to the beginning of the method, to spare some
execution time.
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java:
##########
@@ -163,4 +165,32 @@ public static void updateSpec(Configuration configuration,
Table table) {
public static boolean isBucketed(Table table) {
return table.spec().fields().stream().anyMatch(f ->
f.transform().toString().startsWith("bucket["));
}
+
+ /**
+ * Returns the snapshot ID which is immediately before (or exactly at) the
timestamp provided in millis.
+ * If the timestamp provided is before the first snapshot of the table, we
return an empty optional.
+ * If the timestamp provided is in the future compared to the latest
snapshot, we return the latest snapshot ID.
+ *
+ * E.g.: if we have snapshots S1, S2, S3 committed at times T3, T6, T9
respectively (T0 = start of epoch), then:
+ * - from T0 to T2 -> returns empty
+ * - from T3 to T5 -> returns S1
+ * - from T6 to T8 -> returns S2
+ * - from T9 to T∞ -> returns S3
+ *
+ * @param table the table whose snapshot ID we are trying to find
+ * @param time the timestamp provided in milliseconds
+ * @return the snapshot ID corresponding to the time
+ */
+ public static Optional<Long> findSnapshotForTimestamp(Table table, long
time) {
+ if (table.history().get(0).timestampMillis() > time) {
+ return Optional.empty();
+ }
+
+ for (Snapshot snapshot : table.snapshots()) {
Review Comment:
Are we certain that the table.snapshots() returns a list sorted by snapshot
time?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
Review Comment:
nit: Can we move the toTime to the method param?
##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -207,6 +218,39 @@ public RecordReader<Void, T> createRecordReader(InputSplit
split, TaskAttemptCon
return new IcebergRecordReader<>();
}
+ private static TableScan scanWithTimeRange(Table table, Configuration conf,
TableScan scan, long fromTime) {
+ // let's find the corresponding snapshot ID - if the fromTime is before
the table creation happened, let's use
+ // the first snapshot of the table
+ long fromSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
fromTime)
+ .orElseGet(() -> table.history().get(0).snapshotId());
+ if (fromSnapshot == table.currentSnapshot().snapshotId()) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must be earlier than the latest snapshot of
the table.");
+ }
+ long toTime = conf.getLong(InputFormatConfig.TO_TIMESTAMP, -1);
+ if (toTime != -1) {
+ if (fromTime >= toTime) {
+ throw new IllegalArgumentException(
+ "Provided FROM timestamp must precede the provided TO timestamp.");
+ }
+ long toSnapshot = IcebergTableUtil.findSnapshotForTimestamp(table,
toTime)
+ .orElseThrow(() -> new IllegalArgumentException(
+ "Provided TO timestamp must be after the first snapshot of the
table."));
+ return scan.appendsBetween(fromSnapshot, toSnapshot);
+ } else {
+ return scan.appendsAfter(fromSnapshot);
+ }
+ }
+
+ private static TableScan scanWithVersionRange(Configuration conf, TableScan
scan, long fromSnapshot) {
+ long toSnapshot = conf.getLong(InputFormatConfig.TO_VERSION, -1);
Review Comment:
Nit: move toSnapshot to method param
Issue Time Tracking
-------------------
Worklog Id: (was: 759056)
Time Spent: 20m (was: 10m)
> Support range-based time travel queries for Iceberg
> ---------------------------------------------------
>
> Key: HIVE-26151
> URL: https://issues.apache.org/jira/browse/HIVE-26151
> Project: Hive
> Issue Type: New Feature
> Reporter: Marton Bod
> Assignee: Marton Bod
> Priority: Major
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Allow querying which records have been inserted during a certain time window
> for Iceberg tables. The Iceberg TableScan API provides an implementation for
> that, so most of the work would go into adding syntax support and
> transporting the startTime and endTime parameters to the Iceberg input format.
> Proposed new syntax:
> SELECT * FROM table FOR SYSTEM_TIME FROM '<startTime>' TO '<endTime>'
> SELECT * FROM table FOR SYSTEM_VERSION FROM <startVersion> TO <endVersion>
> (the TO clause is optional in both cases)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)