This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 55cc547cd40f18528d94e933dd7a53229ef00747 Author: slothever <[email protected]> AuthorDate: Fri Dec 30 00:25:21 2022 +0800 [feature-wip](multi-catalog) support Iceberg time travel in external table (#15418) For example SELECT* FROM tbl FOR VERSION AS OF 10963874102873; SELECT* FROM tbl FOR TIME AS OF '1986-10-26 01:21:00'; --- fe/fe-core/src/main/cup/sql_parser.cup | 33 +++++++++- .../org/apache/doris/analysis/BaseTableRef.java | 1 + .../java/org/apache/doris/analysis/TableRef.java | 39 +++++++++++- .../org/apache/doris/analysis/TableSnapshot.java | 70 ++++++++++++++++++++++ .../java/org/apache/doris/common/ErrorCode.java | 5 +- .../org/apache/doris/common/util/TimeUtils.java | 2 +- .../planner/external/IcebergScanProvider.java | 40 +++++++++++++ fe/fe-core/src/main/jflex/sql_scanner.flex | 2 + .../iceberg/test_external_catalog_icebergv2.out | 16 +++++ .../iceberg/test_external_catalog_icebergv2.groovy | 11 ++++ 10 files changed, 214 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 12b4643552..7891b01cfd 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -457,6 +457,7 @@ terminal String KW_NULL, KW_NULLS, KW_OBSERVER, + KW_OF, KW_OFFSET, KW_ON, KW_ONLY, @@ -593,6 +594,7 @@ terminal String KW_VARCHAR, KW_VARIABLES, KW_VERBOSE, + KW_VERSION, KW_VIEW, KW_WARNINGS, KW_WEEK, @@ -685,6 +687,7 @@ nonterminal ArrayList<String> ident_list; nonterminal PartitionNames opt_partition_names, partition_names; nonterminal ArrayList<Long> opt_tablet_list, tablet_list; nonterminal TableSample opt_table_sample, table_sample; +nonterminal TableSnapshot opt_table_snapshot, table_snapshot; nonterminal ClusterName cluster_name; nonterminal ClusterName des_cluster_name; nonterminal TableName table_name, opt_table_name; @@ -5099,9 +5102,31 @@ base_table_ref_list ::= ; base_table_ref ::= - table_name:name opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints + table_name:name opt_table_snapshot:tableSnapshot opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints {: - RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints); + RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints, tableSnapshot); + :} + ; + +opt_table_snapshot ::= + /* empty */ + {: + RESULT = null; + :} + | table_snapshot:tableSnapshot + {: + RESULT = tableSnapshot; + :} + ; + +table_snapshot ::= + KW_FOR KW_VERSION KW_AS KW_OF INTEGER_LITERAL:version + {: + RESULT = new TableSnapshot(version); + :} + | KW_FOR KW_TIME KW_AS KW_OF STRING_LITERAL:time + {: + RESULT = new TableSnapshot(time); :} ; @@ -6676,6 +6701,8 @@ keyword ::= {: RESULT = id; :} | KW_NULLS:id {: RESULT = id; :} + | KW_OF:id + {: RESULT = id; :} | KW_OFFSET:id {: RESULT = id; :} | KW_ONLY:id @@ -6802,6 +6829,8 @@ keyword ::= {: RESULT = id; :} | KW_VERBOSE:id {: RESULT = id; :} + | KW_VERSION:id + {: RESULT = id; :} | KW_VIEW:id {: RESULT = id; :} | KW_WARNINGS:id diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java index ccfc508cd1..05c7b9c588 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java @@ -70,6 +70,7 @@ public class BaseTableRef extends TableRef { name.analyze(analyzer); desc = analyzer.registerTableRef(this); isAnalyzed = true; // true that we have assigned desc + analyzeTableSnapshot(analyzer); analyzeLateralViewRef(analyzer); analyzeJoin(analyzer); analyzeSortHints(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index d960587b13..f2b138b636 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -22,6 +22,7 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; @@ -29,6 +30,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.rewrite.ExprRewriter.ClauseType; @@ -48,6 +50,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.StringJoiner; +import java.util.regex.Matcher; /** * Superclass of all table references, including references to views, base tables @@ -129,6 +132,8 @@ public class TableRef implements ParseNode, Writable { private boolean isPartitionJoin; private String sortColumn = null; + private TableSnapshot tableSnapshot; + // END: Members that need to be reset() // /////////////////////////////////////// @@ -153,12 +158,17 @@ public class TableRef implements ParseNode, Writable { */ public TableRef(TableName name, String alias, PartitionNames partitionNames, ArrayList<Long> sampleTabletIds, TableSample tableSample, ArrayList<String> commonHints) { + this(name, alias, partitionNames, sampleTabletIds, tableSample, commonHints, null); + } + + public TableRef(TableName name, String alias, PartitionNames partitionNames, ArrayList<Long> sampleTabletIds, + TableSample tableSample, ArrayList<String> commonHints, TableSnapshot tableSnapshot) { this.name = name; if (alias != null) { if (Env.isStoredTableNamesLowerCase()) { alias = alias.toLowerCase(); } - aliases = new String[]{alias}; + aliases = new String[] {alias}; hasExplicitAlias = true; } else { hasExplicitAlias = false; @@ -167,6 +177,7 @@ public class TableRef implements ParseNode, Writable { this.sampleTabletIds = sampleTabletIds; this.tableSample = tableSample; this.commonHints = commonHints; + this.tableSnapshot = tableSnapshot; isAnalyzed = false; } @@ -186,6 +197,7 @@ public class TableRef implements ParseNode, Writable { (other.sortHints != null) ? Lists.newArrayList(other.sortHints) : null; onClause = (other.onClause != null) ? other.onClause.clone().reset() : null; partitionNames = (other.partitionNames != null) ? new PartitionNames(other.partitionNames) : null; + tableSnapshot = (other.tableSnapshot != null) ? new TableSnapshot(other.tableSnapshot) : null; tableSample = (other.tableSample != null) ? new TableSample(other.tableSample) : null; commonHints = other.commonHints; @@ -302,6 +314,10 @@ public class TableRef implements ParseNode, Writable { return tableSample; } + public TableSnapshot getTableSnapshot() { + return tableSnapshot; + } + /** * This method should only be called after the TableRef has been analyzed. */ @@ -499,6 +515,27 @@ public class TableRef implements ParseNode, Writable { } } + protected void analyzeTableSnapshot(Analyzer analyzer) throws AnalysisException { + if (tableSnapshot == null) { + return; + } + TableIf.TableType tableType = this.getTable().getType(); + if (tableType != TableIf.TableType.HMS_EXTERNAL_TABLE) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE); + } + HMSExternalTable extTable = (HMSExternalTable) this.getTable(); + if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE); + } + if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) { + String asOfTime = tableSnapshot.getTime(); + Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime); + if (!matcher.matches()) { + throw new AnalysisException("Invalid datetime string: " + asOfTime); + } + } + } + /** * Analyze the join clause. * The join clause can only be analyzed after the left table has been analyzed diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java new file mode 100644 index 0000000000..e5d43b6d0b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +/** + * Snapshot read for time travel + * the version in 2022.12.28 just supports external iceberg table + */ +public class TableSnapshot { + + public enum VersionType { + TIME, VERSION + } + + private final VersionType type; + private String time; + private long version; + + public TableSnapshot(long version) { + this.version = version; + this.type = VersionType.VERSION; + } + + public TableSnapshot(String time) { + this.time = time; + this.type = VersionType.TIME; + } + + public TableSnapshot(TableSnapshot other) { + this.type = other.type; + this.time = other.time; + this.version = other.version; + } + + public VersionType getType() { + return type; + } + + public String getTime() { + return time; + } + + public long getVersion() { + return version; + } + + @Override + public String toString() { + if (this.type == VersionType.VERSION) { + return " FOR VERSION AS OF " + version; + } else { + return " FOR TIME AS OF '" + time + "'"; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java index a6a78d054b..20948c039f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java @@ -1699,7 +1699,10 @@ public enum ErrorCode { ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'}, "Nonsupport hive metastore table named '%s' in database '%s' with catalog '%s'."), ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'}, "Table name length exceeds limit, " - + "the length of table name '%s' is %d which is greater than the configuration 'table_name_length_limit' (%d)."); + + "the length of table name '%s' is %d which is greater than the configuration 'table_name_length_limit' (%d)."), + + ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0', '0'}, "Only iceberg external" + + " table supports time travel in current version"); // This is error code private final int code; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index b5dd620f25..9d73d0b368 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -68,7 +68,7 @@ public class TimeUtils { private static final SimpleDateFormat DATETIME_FORMAT; private static final SimpleDateFormat TIME_FORMAT; - private static final Pattern DATETIME_FORMAT_REG = + public static final Pattern DATETIME_FORMAT_REG = Pattern.compile("^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?" + "((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?" + "((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|(" diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index 5c211c34f5..d71e377de9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -19,12 +19,14 @@ package org.apache.doris.planner.external; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.external.iceberg.util.IcebergUtils; import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileFormatType; @@ -39,13 +41,16 @@ import org.apache.iceberg.BaseTable; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.types.Conversions; import java.nio.ByteBuffer; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -133,6 +138,20 @@ public class IcebergScanProvider extends HiveScanProvider { } TableScan scan = table.newScan(); + TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot(); + if (tableSnapshot != null) { + TableSnapshot.VersionType type = tableSnapshot.getType(); + try { + if (type == TableSnapshot.VersionType.VERSION) { + scan = scan.useSnapshot(tableSnapshot.getVersion()); + } else { + long snapshotId = TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone()); + scan = scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId)); + } + } catch (IllegalArgumentException e) { + throw new UserException(e); + } + } for (Expression predicate : expressions) { scan = scan.filter(predicate); } @@ -155,6 +174,27 @@ public class IcebergScanProvider extends HiveScanProvider { return splits; } + public static long getSnapshotIdAsOfTime(List<HistoryEntry> historyEntries, long asOfTimestamp) { + // find history at or before asOfTimestamp + HistoryEntry latestHistory = null; + for (HistoryEntry entry : historyEntries) { + if (entry.timestampMillis() <= asOfTimestamp) { + if (latestHistory == null) { + latestHistory = entry; + continue; + } + if (entry.timestampMillis() > latestHistory.timestampMillis()) { + latestHistory = entry; + } + } + } + if (latestHistory == null) { + throw new NotFoundException("No version history at or before " + + Instant.ofEpochMilli(asOfTimestamp)); + } + return latestHistory.snapshotId(); + } + private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask spitTask) { List<IcebergDeleteFileFilter> filters = new ArrayList<>(); for (DeleteFile delete : spitTask.deletes()) { diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 7cbc2c896b..25a6a60a6b 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -315,6 +315,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL)); keywordMap.put("nulls", new Integer(SqlParserSymbols.KW_NULLS)); keywordMap.put("observer", new Integer(SqlParserSymbols.KW_OBSERVER)); + keywordMap.put("of", new Integer(SqlParserSymbols.KW_OF)); keywordMap.put("offset", new Integer(SqlParserSymbols.KW_OFFSET)); keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON)); keywordMap.put("only", new Integer(SqlParserSymbols.KW_ONLY)); @@ -453,6 +454,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR)); keywordMap.put("variables", new Integer(SqlParserSymbols.KW_VARIABLES)); keywordMap.put("verbose", new Integer(SqlParserSymbols.KW_VERBOSE)); + keywordMap.put("version", new Integer(SqlParserSymbols.KW_VERSION)); keywordMap.put("view", new Integer(SqlParserSymbols.KW_VIEW)); keywordMap.put("warnings", new Integer(SqlParserSymbols.KW_WARNINGS)); keywordMap.put("week", new Integer(SqlParserSymbols.KW_WEEK)); diff --git a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out index e7158ffd36..efb71b3862 100644 --- a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out +++ b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out @@ -30,7 +30,11 @@ 5908.20 -- !q08 -- +<<<<<<< HEAD 120001848 +======= +1499999990 +>>>>>>> 3ff01ca799 ([feature-wip](multi-catalog) support Iceberg time travel in external table (#15418)) -- !q09 -- 1 @@ -38,9 +42,15 @@ 3 -- !q10 -- +<<<<<<< HEAD 150000000 149999999 149999996 +======= +2 +3 +4 +>>>>>>> 3ff01ca799 ([feature-wip](multi-catalog) support Iceberg time travel in external table (#15418)) -- !q11 -- 1 @@ -48,7 +58,13 @@ 3 -- !q12 -- +<<<<<<< HEAD 150000000 149999999 149999996 +======= +2 +3 +4 +>>>>>>> 3ff01ca799 ([feature-wip](multi-catalog) support Iceberg time travel in external table (#15418)) diff --git a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy index 4d41443aec..4599b1584a 100644 --- a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy +++ b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy @@ -49,8 +49,19 @@ suite("test_external_catalog_icebergv2", "p2") { qt_q11 """ select c_custkey from customer for version as of 906874575350293177 order by c_custkey limit 3 """ qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 order by c_custkey desc limit 3 """ } + // test time travel stmt + def q02 = { + qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' limit 3 """ + qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' limit 3 """ + qt_q11 """ select c_custkey from customer for version as of 906874575350293177 limit 3 """ + qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 limit 3 """ + } sql """ use `tpch_1000_icebergv2`; """ q01() +<<<<<<< HEAD // q02() +======= + q02() +>>>>>>> 3ff01ca799 ([feature-wip](multi-catalog) support Iceberg time travel in external table (#15418)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
