This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff01ca799 [feature-wip](multi-catalog) support Iceberg time travel in
external table (#15418)
3ff01ca799 is described below
commit 3ff01ca799de96f459f79520d92518c3604140b4
Author: slothever <[email protected]>
AuthorDate: Fri Dec 30 00:25:21 2022 +0800
[feature-wip](multi-catalog) support Iceberg time travel in external table
(#15418)
For example
SELECT* FROM tbl FOR VERSION AS OF 10963874102873;
SELECT* FROM tbl FOR TIME AS OF '1986-10-26 01:21:00';
---
fe/fe-core/src/main/cup/sql_parser.cup | 33 +++++++++-
.../org/apache/doris/analysis/BaseTableRef.java | 1 +
.../java/org/apache/doris/analysis/TableRef.java | 37 ++++++++++++
.../org/apache/doris/analysis/TableSnapshot.java | 70 ++++++++++++++++++++++
.../java/org/apache/doris/common/ErrorCode.java | 5 +-
.../org/apache/doris/common/util/TimeUtils.java | 2 +-
.../planner/external/IcebergScanProvider.java | 40 +++++++++++++
fe/fe-core/src/main/jflex/sql_scanner.flex | 2 +
.../iceberg/test_external_catalog_icebergv2.out | 22 ++++++-
.../iceberg/test_external_catalog_icebergv2.groovy | 8 +++
10 files changed, 215 insertions(+), 5 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index b6bba6a5d7..8f6aaa63b8 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -457,6 +457,7 @@ terminal String
KW_NULL,
KW_NULLS,
KW_OBSERVER,
+ KW_OF,
KW_OFFSET,
KW_ON,
KW_ONLY,
@@ -593,6 +594,7 @@ terminal String
KW_VARCHAR,
KW_VARIABLES,
KW_VERBOSE,
+ KW_VERSION,
KW_VIEW,
KW_WARNINGS,
KW_WEEK,
@@ -685,6 +687,7 @@ nonterminal ArrayList<String> ident_list;
nonterminal PartitionNames opt_partition_names, partition_names;
nonterminal ArrayList<Long> opt_tablet_list, tablet_list;
nonterminal TableSample opt_table_sample, table_sample;
+nonterminal TableSnapshot opt_table_snapshot, table_snapshot;
nonterminal ClusterName cluster_name;
nonterminal ClusterName des_cluster_name;
nonterminal TableName table_name, opt_table_name;
@@ -5100,9 +5103,31 @@ base_table_ref_list ::=
;
base_table_ref ::=
- table_name:name opt_partition_names:partitionNames
opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample
opt_common_hints:commonHints
+ table_name:name opt_table_snapshot:tableSnapshot
opt_partition_names:partitionNames opt_tablet_list:tabletIds
opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints
{:
- RESULT = new TableRef(name, alias, partitionNames, tabletIds,
tableSample, commonHints);
+ RESULT = new TableRef(name, alias, partitionNames, tabletIds,
tableSample, commonHints, tableSnapshot);
+ :}
+ ;
+
+opt_table_snapshot ::=
+ /* empty */
+ {:
+ RESULT = null;
+ :}
+ | table_snapshot:tableSnapshot
+ {:
+ RESULT = tableSnapshot;
+ :}
+ ;
+
+table_snapshot ::=
+ KW_FOR KW_VERSION KW_AS KW_OF INTEGER_LITERAL:version
+ {:
+ RESULT = new TableSnapshot(version);
+ :}
+ | KW_FOR KW_TIME KW_AS KW_OF STRING_LITERAL:time
+ {:
+ RESULT = new TableSnapshot(time);
:}
;
@@ -6693,6 +6718,8 @@ keyword ::=
{: RESULT = id; :}
| KW_NULLS:id
{: RESULT = id; :}
+ | KW_OF:id
+ {: RESULT = id; :}
| KW_OFFSET:id
{: RESULT = id; :}
| KW_ONLY:id
@@ -6817,6 +6844,8 @@ keyword ::=
{: RESULT = id; :}
| KW_VERBOSE:id
{: RESULT = id; :}
+ | KW_VERSION:id
+ {: RESULT = id; :}
| KW_VIEW:id
{: RESULT = id; :}
| KW_WARNINGS:id
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
index ccfc508cd1..05c7b9c588 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BaseTableRef.java
@@ -70,6 +70,7 @@ public class BaseTableRef extends TableRef {
name.analyze(analyzer);
desc = analyzer.registerTableRef(this);
isAnalyzed = true; // true that we have assigned desc
+ analyzeTableSnapshot(analyzer);
analyzeLateralViewRef(analyzer);
analyzeJoin(analyzer);
analyzeSortHints();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 3b5577fd6b..bf43f552ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -22,6 +22,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@@ -29,6 +30,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.rewrite.ExprRewriter.ClauseType;
@@ -48,6 +50,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.regex.Matcher;
/**
* Superclass of all table references, including references to views, base
tables
@@ -129,6 +132,8 @@ public class TableRef implements ParseNode, Writable {
private boolean isPartitionJoin;
private String sortColumn = null;
+ private TableSnapshot tableSnapshot;
+
// END: Members that need to be reset()
// ///////////////////////////////////////
@@ -153,6 +158,11 @@ public class TableRef implements ParseNode, Writable {
*/
public TableRef(TableName name, String alias, PartitionNames
partitionNames, ArrayList<Long> sampleTabletIds,
TableSample tableSample, ArrayList<String> commonHints) {
+ this(name, alias, partitionNames, sampleTabletIds, tableSample,
commonHints, null);
+ }
+
+ public TableRef(TableName name, String alias, PartitionNames
partitionNames, ArrayList<Long> sampleTabletIds,
+ TableSample tableSample, ArrayList<String> commonHints,
TableSnapshot tableSnapshot) {
this.name = name;
if (alias != null) {
if (Env.isStoredTableNamesLowerCase()) {
@@ -167,6 +177,7 @@ public class TableRef implements ParseNode, Writable {
this.sampleTabletIds = sampleTabletIds;
this.tableSample = tableSample;
this.commonHints = commonHints;
+ this.tableSnapshot = tableSnapshot;
isAnalyzed = false;
}
@@ -186,6 +197,7 @@ public class TableRef implements ParseNode, Writable {
(other.sortHints != null) ?
Lists.newArrayList(other.sortHints) : null;
onClause = (other.onClause != null) ? other.onClause.clone().reset() :
null;
partitionNames = (other.partitionNames != null) ? new
PartitionNames(other.partitionNames) : null;
+ tableSnapshot = (other.tableSnapshot != null) ? new
TableSnapshot(other.tableSnapshot) : null;
tableSample = (other.tableSample != null) ? new
TableSample(other.tableSample) : null;
commonHints = other.commonHints;
@@ -302,6 +314,10 @@ public class TableRef implements ParseNode, Writable {
return tableSample;
}
+ public TableSnapshot getTableSnapshot() {
+ return tableSnapshot;
+ }
+
/**
* This method should only be called after the TableRef has been analyzed.
*/
@@ -499,6 +515,27 @@ public class TableRef implements ParseNode, Writable {
}
}
+ protected void analyzeTableSnapshot(Analyzer analyzer) throws
AnalysisException {
+ if (tableSnapshot == null) {
+ return;
+ }
+ TableIf.TableType tableType = this.getTable().getType();
+ if (tableType != TableIf.TableType.HMS_EXTERNAL_TABLE) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
+ }
+ HMSExternalTable extTable = (HMSExternalTable) this.getTable();
+ if (extTable.getDlaType() != HMSExternalTable.DLAType.ICEBERG) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_NONSUPPORT_TIME_TRAVEL_TABLE);
+ }
+ if (tableSnapshot.getType() == TableSnapshot.VersionType.TIME) {
+ String asOfTime = tableSnapshot.getTime();
+ Matcher matcher = TimeUtils.DATETIME_FORMAT_REG.matcher(asOfTime);
+ if (!matcher.matches()) {
+ throw new AnalysisException("Invalid datetime string: " +
asOfTime);
+ }
+ }
+ }
+
/**
* Analyze the join clause.
* The join clause can only be analyzed after the left table has been
analyzed
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
new file mode 100644
index 0000000000..e5d43b6d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableSnapshot.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+/**
+ * Snapshot read for time travel
+ * the version in 2022.12.28 just supports external iceberg table
+ */
+public class TableSnapshot {
+
+ public enum VersionType {
+ TIME, VERSION
+ }
+
+ private final VersionType type;
+ private String time;
+ private long version;
+
+ public TableSnapshot(long version) {
+ this.version = version;
+ this.type = VersionType.VERSION;
+ }
+
+ public TableSnapshot(String time) {
+ this.time = time;
+ this.type = VersionType.TIME;
+ }
+
+ public TableSnapshot(TableSnapshot other) {
+ this.type = other.type;
+ this.time = other.time;
+ this.version = other.version;
+ }
+
+ public VersionType getType() {
+ return type;
+ }
+
+ public String getTime() {
+ return time;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ if (this.type == VersionType.VERSION) {
+ return " FOR VERSION AS OF " + version;
+ } else {
+ return " FOR TIME AS OF '" + time + "'";
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
index a6a78d054b..20948c039f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ErrorCode.java
@@ -1699,7 +1699,10 @@ public enum ErrorCode {
ERR_NONSUPPORT_HMS_TABLE(5088, new byte[]{'4', '2', '0', '0', '0'},
"Nonsupport hive metastore table named '%s' in database '%s' with
catalog '%s'."),
ERR_TABLE_NAME_LENGTH_LIMIT(5089, new byte[]{'4', '2', '0', '0', '0'},
"Table name length exceeds limit, "
- + "the length of table name '%s' is %d which is greater than the
configuration 'table_name_length_limit' (%d).");
+ + "the length of table name '%s' is %d which is greater than the
configuration 'table_name_length_limit' (%d)."),
+
+ ERR_NONSUPPORT_TIME_TRAVEL_TABLE(5090, new byte[]{'4', '2', '0', '0',
'0'}, "Only iceberg external"
+ + " table supports time travel in current version");
// This is error code
private final int code;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index b5dd620f25..9d73d0b368 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -68,7 +68,7 @@ public class TimeUtils {
private static final SimpleDateFormat DATETIME_FORMAT;
private static final SimpleDateFormat TIME_FORMAT;
- private static final Pattern DATETIME_FORMAT_REG =
+ public static final Pattern DATETIME_FORMAT_REG =
Pattern.compile("^((\\d{2}(([02468][048])|([13579][26]))[\\-\\/\\s]?((((0?[13578])|(1[02]))[\\-\\/\\s]?"
+
"((0?[1-9])|([1-2][0-9])|(3[01])))|(((0?[469])|(11))[\\-\\/\\s]?"
+
"((0?[1-9])|([1-2][0-9])|(30)))|(0?2[\\-\\/\\s]?((0?[1-9])|([1-2][0-9])))))|("
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index e51b39144f..850cc14d42 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
+import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HMSResource;
@@ -35,6 +36,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileFormatType;
@@ -54,14 +56,17 @@ import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Conversions;
import java.nio.ByteBuffer;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -168,6 +173,20 @@ public class IcebergScanProvider extends HiveScanProvider {
org.apache.iceberg.Table table = getIcebergTable();
TableScan scan = table.newScan();
+ TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
+ if (tableSnapshot != null) {
+ TableSnapshot.VersionType type = tableSnapshot.getType();
+ try {
+ if (type == TableSnapshot.VersionType.VERSION) {
+ scan = scan.useSnapshot(tableSnapshot.getVersion());
+ } else {
+ long snapshotId =
TimeUtils.timeStringToLong(tableSnapshot.getTime(), TimeUtils.getTimeZone());
+ scan =
scan.useSnapshot(getSnapshotIdAsOfTime(table.history(), snapshotId));
+ }
+ } catch (IllegalArgumentException e) {
+ throw new UserException(e);
+ }
+ }
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
}
@@ -199,6 +218,27 @@ public class IcebergScanProvider extends HiveScanProvider {
return splits;
}
+ public static long getSnapshotIdAsOfTime(List<HistoryEntry>
historyEntries, long asOfTimestamp) {
+ // find history at or before asOfTimestamp
+ HistoryEntry latestHistory = null;
+ for (HistoryEntry entry : historyEntries) {
+ if (entry.timestampMillis() <= asOfTimestamp) {
+ if (latestHistory == null) {
+ latestHistory = entry;
+ continue;
+ }
+ if (entry.timestampMillis() > latestHistory.timestampMillis())
{
+ latestHistory = entry;
+ }
+ }
+ }
+ if (latestHistory == null) {
+ throw new NotFoundException("No version history at or before "
+ + Instant.ofEpochMilli(asOfTimestamp));
+ }
+ return latestHistory.snapshotId();
+ }
+
private List<IcebergDeleteFileFilter> getDeleteFileFilters(FileScanTask
spitTask) {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 730e6affb3..e481039c35 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -317,6 +317,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("null", new Integer(SqlParserSymbols.KW_NULL));
keywordMap.put("nulls", new Integer(SqlParserSymbols.KW_NULLS));
keywordMap.put("observer", new Integer(SqlParserSymbols.KW_OBSERVER));
+ keywordMap.put("of", new Integer(SqlParserSymbols.KW_OF));
keywordMap.put("offset", new Integer(SqlParserSymbols.KW_OFFSET));
keywordMap.put("on", new Integer(SqlParserSymbols.KW_ON));
keywordMap.put("only", new Integer(SqlParserSymbols.KW_ONLY));
@@ -455,6 +456,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR));
keywordMap.put("variables", new
Integer(SqlParserSymbols.KW_VARIABLES));
keywordMap.put("verbose", new Integer(SqlParserSymbols.KW_VERBOSE));
+ keywordMap.put("version", new Integer(SqlParserSymbols.KW_VERSION));
keywordMap.put("view", new Integer(SqlParserSymbols.KW_VIEW));
keywordMap.put("warnings", new Integer(SqlParserSymbols.KW_WARNINGS));
keywordMap.put("week", new Integer(SqlParserSymbols.KW_WEEK));
diff --git
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
index 4d3a4b9176..7956396740 100644
---
a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
+++
b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out
@@ -30,4 +30,24 @@
2736865
-- !q08 --
-1499999990
\ No newline at end of file
+1499999990
+
+-- !q09 --
+1
+2
+3
+
+-- !q10 --
+2
+3
+4
+
+-- !q11 --
+1
+2
+3
+
+-- !q12 --
+2
+3
+4
diff --git
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
index 816e75b85e..a3a0511613 100644
---
a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
+++
b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy
@@ -42,7 +42,15 @@ suite("test_external_catalog_icebergv2", "p2") {
qt_q07 """ select o_orderkey from orders where o_custkey < 3357
limit 3"""
qt_q08 """ select count(1) as c from customer;"""
}
+ // test time travel stmt
+ def q02 = {
+ qt_q09 """ select c_custkey from customer for time as of
'2022-12-27 10:21:36' limit 3 """
+ qt_q10 """ select c_custkey from customer for time as of
'2022-12-28 10:21:36' limit 3 """
+ qt_q11 """ select c_custkey from customer for version as of
906874575350293177 limit 3 """
+ qt_q12 """ select c_custkey from customer for version as of
6352416983354893547 limit 3 """
+ }
sql """ use `tpch_1000_icebergv2`; """
q01()
+ q02()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]