This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 27d74616441 [Opt](Iceberg) Simplify the code of getting time travel
snapshotId (#34299) (#38101)
27d74616441 is described below
commit 27d746164418df92fee1f0a0dc8c7fcb456963fc
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Jul 19 09:45:56 2024 +0800
[Opt](Iceberg) Simplify the code of getting time travel snapshotId (#34299)
(#38101)
bp #34299
Co-authored-by: Butao Zhang <[email protected]>
---
.../datasource/iceberg/source/IcebergScanNode.java | 29 +++-------------------
1 file changed, 3 insertions(+), 26 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index bfb2a5aeb34..6ea58014003 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -58,25 +58,23 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
-import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -287,8 +285,8 @@ public class IcebergScanNode extends FileQueryScanNode {
if (type == TableSnapshot.VersionType.VERSION) {
return tableSnapshot.getVersion();
} else {
- long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
- return getSnapshotIdAsOfTime(icebergTable.history(),
snapshotId);
+ long timestamp =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+ return SnapshotUtil.snapshotIdAsOfTime(icebergTable,
timestamp);
}
} catch (IllegalArgumentException e) {
throw new UserException(e);
@@ -297,27 +295,6 @@ public class IcebergScanNode extends FileQueryScanNode {
return null;
}
- private long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long
asOfTimestamp) {
- // find history at or before asOfTimestamp
- HistoryEntry latestHistory = null;
- for (HistoryEntry entry : historyEntries) {
- if (entry.timestampMillis() <= asOfTimestamp) {
- if (latestHistory == null) {
- latestHistory = entry;
- continue;
- }
- if (entry.timestampMillis() > latestHistory.timestampMillis())
{
- latestHistory = entry;
- }
- }
- }
- if (latestHistory == null) {
- throw new NotFoundException("No version history at or before "
- + Instant.ofEpochMilli(asOfTimestamp));
- }
- return latestHistory.snapshotId();
- }
-
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]