This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit ec43f65235a7b247302f3a2ce959b68a48b1ec7f Author: Ashin Gau <[email protected]> AuthorDate: Thu Mar 14 18:30:18 2024 +0800 [feature](hudi) support hudi incremental read (#32052) * [feature](hudi) support incremental read for hudi table * fix jdk17 java options --- conf/be.conf | 2 +- docs/en/docs/lakehouse/multi-catalog/hudi.md | 33 ++- docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md | 37 ++- .../java/org/apache/doris/hudi/HudiJniScanner.java | 12 +- .../org/apache/doris/hudi/BaseSplitReader.scala | 8 +- .../apache/doris/hudi/HoodieRecordIterator.scala | 67 ++++-- .../doris/hudi/MORIncrementalSplitReader.scala | 86 +++++++ .../apache/doris/hudi/MORSnapshotSplitReader.scala | 2 +- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 6 +- fe/fe-core/src/main/cup/sql_parser.cup | 16 +- .../java/org/apache/doris/analysis/TableRef.java | 14 ++ .../org/apache/doris/analysis/TableScanParams.java | 46 ++++ .../doris/datasource/hive/HMSExternalTable.java | 100 +++++++- .../hudi/source/COWIncrementalRelation.java | 254 +++++++++++++++++++++ .../doris/datasource/hudi/source/HudiScanNode.java | 175 +++++++++----- .../hudi/source/IncrementalRelation.java | 44 ++++ .../hudi/source/MORIncrementalRelation.java | 217 ++++++++++++++++++ .../doris/nereids/analyzer/UnboundRelation.java | 34 ++- .../doris/nereids/parser/LogicalPlanBuilder.java | 9 +- .../doris/nereids/rules/analysis/BindRelation.java | 8 +- .../doris/nereids/rules/analysis/CheckPolicy.java | 25 +- .../data/external_table_p2/hive/test_hive_hudi.out | 12 + .../external_table_p2/hive/test_hive_hudi.groovy | 6 + 23 files changed, 1095 insertions(+), 118 deletions(-) diff --git a/conf/be.conf b/conf/be.conf index 6ceec00a421..4d8677aece7 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -95,4 +95,4 @@ ssl_private_key_path = "$DORIS_HOME/conf/key.pem" # Default to turn off aws sdk log, because aws sdk errors that need to be cared will be output through Doris logs aws_log_level=0 ## If you are not running in aws cloud, you can disable EC2 metadata -AWS_EC2_METADATA_DISABLED=true \ No newline at end of file +AWS_EC2_METADATA_DISABLED=true diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md b/docs/en/docs/lakehouse/multi-catalog/hudi.md index a52c2370ced..3a5420319b0 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md @@ -33,8 +33,8 @@ under the License. | Table Type | Supported Query types | | ---- | ---- | -| Copy On Write | Snapshot Query + Time Travel | -| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel | +| Copy On Write | Snapshot Query, Time Travel, Icremental Read | +| Merge On Read | Snapshot Queries, Read Optimized Queries, Time Travel, Icremental Read | 2. Doris supports Hive Metastore(Including catalogs compatible with Hive MetaStore, like [AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md)) Catalogs. @@ -94,16 +94,29 @@ Users can view the perfomace of Java SDK through [profile](../../admin-manual/ht ## Time Travel -Supports reading snapshots specified in Hudi table. - -Every write operation to the Hudi table will generate a new snapshot. - -By default, query requests will only read the latest version of the snapshot. +Every write operation to the Hudi table will generate a new snapshot. [Time Travel](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel) supports reading snapshots specified in Hudi table. By default, query requests will only read the latest version of the snapshot. You can use the `FOR TIME AS OF` statement, based on the time of the snapshot to read historical version data. Examples are as follows: +``` +SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37"; +SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037"; +SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07"; +``` +Hudi table does not support the `FOR VERSION AS OF` statement. Using this syntax to query the Hudi table will throw an error. -`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";` +## Incremental Read +Incremental Read obtains a set of records that changed between a start and end commit time, providing you with the "latest state" for each such record as of the end commit time. -`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";` +Doris uses `@incr` syntax to support Incremental Read: +``` +SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], ['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...); +``` +`beginTime` is required, the time format is consistent with [hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query), and also supports "earliest". `endTime` is optional, default to latest commit time. The remaining optional parameters can be [Spark Read Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options). -Hudi table does not support the `FOR VERSION AS OF` statement. Using this syntax to query the Hudi table will throw an error. +Incremental Read should turn on Nereids Planner. Doris translates `@incr` as `predicates` and pushdown to `VHUDI_SCAN_NODE`: +``` +| 0:VHUDI_SCAN_NODE(113) | +| table: lineitem_mor | +| predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), (_hoodie_commit_time[#0] <= '20240311151606605') | +| inputSplitNum=1, totalFileSize=13099711, scanRanges=1 | +``` diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md index 38bb26d3bc7..b7f2776e38d 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md @@ -29,12 +29,12 @@ under the License. ## 使用限制 -1. Hudi 表支持的查询类型如下,后续将支持 Incremental Query。 +1. Hudi 表支持的查询类型如下,后续将支持 CDC。 | 表类型 | 支持的查询类型 | | ---- | ---- | -| Copy On Write | Snapshot Query + Time Travel | -| Merge On Read | Snapshot Queries + Read Optimized Queries + Time Travel | +| Copy On Write | Snapshot Query, Time Travel, Icremental Read | +| Merge On Read | Snapshot Queries, Read Optimized Queries, Time Travel, Icremental Read | 2. 目前支持 Hive Metastore 和兼容 Hive Metastore 类型(例如[AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md))的 Catalog。 @@ -96,16 +96,29 @@ Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java S ## Time Travel -支持读取 Hudi 表指定的 Snapshot。 +每一次对 Hudi 表的写操作都会产生一个新的快照,Time Travel 支持读取 Hudi 表指定的 Snapshot。默认情况下,查询请求只会读取最新版本的快照。 -每一次对 Hudi 表的写操作都会产生一个新的快照。 - -默认情况下,查询请求只会读取最新版本的快照。 - -可以使用 `FOR TIME AS OF` 语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)和Hudi官网保持一致)读取历史版本的数据。示例如下: +可以使用 `FOR TIME AS OF` 语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel)和Hudi官网保持一致)读取历史版本的数据。示例如下: +``` +SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37"; +SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037"; +SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07"; +``` +Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。 -`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";` +## Incremental Read +Incremental Read 可以查询在 startTime 和 endTime 之间变化的数据,返回的结果集是数据在 endTime 的最终状态。 -`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";` +Doris 提供了 `@incr` 语法支持 Incremental Read: +``` +SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], ['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...); +``` +`beginTime` 是必须的,时间格式和 hudi 官网 [hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query) 保持一致,支持 "earliest"。`endTime` 选填,默认最新commitTime。兼容 [Spark Read Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options)。 -Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。 +支持 Incremental Read 需要开启[新优化器](../../query-acceleration/nereids.md),新优化器默认打开。通过 `desc` 查看执行计划,可以发现 Doris 将 `@incr` 转化为 `predicates` 下推给 `VHUDI_SCAN_NODE`: +``` +| 0:VHUDI_SCAN_NODE(113) | +| table: lineitem_mor | +| predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), (_hoodie_commit_time[#0] <= '20240311151606605') | +| inputSplitNum=1, totalFileSize=13099711, scanRanges=1 | +``` diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 0a1b69fcfb0..a284c7adcdd 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -160,9 +160,15 @@ public class HudiJniScanner extends JniScanner { cleanResolverLock.readLock().lock(); try { lastUpdateTime.set(System.currentTimeMillis()); - recordIterator = HadoopUGI.ugiDoAs( - AuthenticationConfig.getKerberosConfig(split.hadoopConf()), () -> new MORSnapshotSplitReader( - split).buildScanIterator(new Filter[0])); + if (split.incrementalRead()) { + recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig( + split.hadoopConf()), + () -> new MORIncrementalSplitReader(split).buildScanIterator(new Filter[0])); + } else { + recordIterator = HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig( + split.hadoopConf()), + () -> new MORSnapshotSplitReader(split).buildScanIterator(new Filter[0])); + } if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), threadId -> AVRO_RESOLVER_CACHE.get()); diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index a730f2cd1b2..8229064163d 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -125,6 +125,10 @@ class HoodieSplit(private val params: jutil.Map[String, String]) { conf } + def incrementalRead: Boolean = { + "true".equalsIgnoreCase(optParams.getOrElse("hoodie.datasource.read.incr.operation", "false")) + } + // NOTE: In cases when Hive Metastore is used as catalog and the table is partitioned, schema in the HMS might contain // Hive-specific partitioning columns created specifically for HMS to handle partitioning appropriately. In that // case we opt in to not be providing catalog's schema, and instead force Hudi relations to fetch the schema @@ -169,6 +173,8 @@ abstract class BaseSplitReader(val split: HoodieSplit) { protected val tableInformation: HoodieTableInformation = cache.get(split) + protected val timeline: HoodieTimeline = tableInformation.timeline + protected val sparkSession: SparkSession = tableInformation.sparkSession protected val sqlContext: SQLContext = sparkSession.sqlContext imbueConfigs(sqlContext) @@ -578,8 +584,6 @@ abstract class BaseSplitReader(val split: HoodieSplit) { ) } - protected val timeline: HoodieTimeline = tableInformation.timeline - protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = { val internalSchema = internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema) val querySchemaString = SerDeHelper.toJson(internalSchema) diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala index 6e2b7b31e54..f393e9e1246 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala @@ -20,11 +20,14 @@ package org.apache.doris.hudi import org.apache.hadoop.conf.Configuration import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator} import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.catalyst.InternalRow import java.io.Closeable +import java.util.function.Predicate /** * Class holding base-file readers for 3 different use-cases: @@ -84,29 +87,61 @@ class HoodieMORRecordIterator(config: Configuration, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, - fileSplit: HoodieMergeOnReadFileSplit) extends Iterator[InternalRow] with Closeable { + fileSplit: HoodieMergeOnReadFileSplit, + includeStartTime: Boolean = false, + startTimestamp: String = null, + endTimestamp: String = null) extends Iterator[InternalRow] with Closeable { protected val maxCompactionMemoryInBytes: Long = config.getLongBytes( "hoodie.compaction.memory", 512 * 1024 * 1024) - protected val recordIterator: Iterator[InternalRow] = fileSplit match { - case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => - val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) - projectedReader(dataFileOnlySplit.dataFile.get) + protected val recordIterator: Iterator[InternalRow] = { + val iter = fileSplit match { + case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => + val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) + projectedReader(dataFileOnlySplit.dataFile.get) - case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => - new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, config) + case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => + new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, config) - case split => mergeType match { - case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => - // val reader = fileReaders.requiredSchemaReaderSkipMerging - // new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) - throw new UnsupportedOperationException("Skip merge is optimized by native read") + case split => mergeType match { + case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => + val reader = fileReaders.requiredSchemaReaderSkipMerging + new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, config) - case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => - val reader = pickBaseFileReader() - new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, config) + case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL => + val reader = pickBaseFileReader() + new RecordMergingFileIterator(split, reader, tableSchema, requiredSchema, tableState, config) - case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)") + case _ => throw new UnsupportedOperationException(s"Not supported merge type ($mergeType)") + } + } + + val commitTimeMetadataFieldIdx = requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + val needsFiltering = commitTimeMetadataFieldIdx >= 0 && !StringUtils.isNullOrEmpty(startTimestamp) && !StringUtils.isNullOrEmpty(endTimestamp) + if (needsFiltering) { + val filterT: Predicate[InternalRow] = getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx) + iter.filter(filterT.test) + } + else { + iter + } + } + + private def getCommitTimeFilter(includeStartTime: Boolean, commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = { + if (includeStartTime) { + new Predicate[InternalRow] { + override def test(row: InternalRow): Boolean = { + val commitTime = row.getString(commitTimeMetadataFieldIdx) + commitTime >= startTimestamp && commitTime <= endTimestamp + } + } + } else { + new Predicate[InternalRow] { + override def test(row: InternalRow): Boolean = { + val commitTime = row.getString(commitTimeMetadataFieldIdx) + commitTime > startTimestamp && commitTime <= endTimestamp + } + } } } diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala new file mode 100644 index 00000000000..73c87e29034 --- /dev/null +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hudi + +import org.apache.hudi.HoodieTableSchema +import org.apache.hudi.common.model.HoodieRecord +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.sources._ + +/** + * Reference to Apache Hudi + * see <a href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala">MergeOnReadIncrementalRelation</a> + */ +class MORIncrementalSplitReader(override val split: HoodieSplit) extends MORSnapshotSplitReader(split) with IncrementalSplitReaderTrait { + + override protected def composeIterator(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + filters: Array[Filter]): Iterator[InternalRow] = { + // The only required filters are ones that make sure we're only fetching records that + // fall into incremental span of the timeline being queried + val requiredFilters = incrementalSpanRecordFilters + val optionalFilters = filters + val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) + + new HoodieMORRecordIterator(split.hadoopConf, + readers, + tableSchema, + requiredSchema, + tableState, + mergeType, + getFileSplit(), + includeStartTime = includeStartTime, + startTimestamp = startTs, + endTimestamp = endTs) + } + +} + +/** + * Reference to Apache Hudi + * see <a href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala">HoodieIncrementalRelationTrait</a> + */ +trait IncrementalSplitReaderTrait extends BaseSplitReader { + protected val includeStartTime: Boolean = "true".equalsIgnoreCase(optParams("hoodie.datasource.read.incr.includeStartTime")) + protected val startTs: String = optParams("hoodie.datasource.read.begin.instanttime") + protected val endTs: String = optParams("hoodie.datasource.read.end.instanttime") + + // Record filters making sure that only records w/in the requested bounds are being fetched as part of the + // scan collected by this relation + protected lazy val incrementalSpanRecordFilters: Seq[Filter] = { + val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + + val largerThanFilter = if (includeStartTime) { + GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs) + } else { + GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs) + } + + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs) + + Seq(isNotNullFilter, largerThanFilter, lessThanFilter) + } + + override lazy val mandatoryFields: Seq[String] = { + // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in + // cases when no columns are requested to be fetched (for ex, when using {@code count()} API) + Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ + preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) + } +} diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala index e9958b231e7..07e236082ce 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala @@ -74,7 +74,7 @@ class MORSnapshotSplitReader(override val split: HoodieSplit) extends BaseSplitR getFileSplit()) } - private def getFileSplit(): HoodieMergeOnReadFileSplit = { + protected def getFileSplit(): HoodieMergeOnReadFileSplit = { val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_)) .sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList val partitionedBaseFile = if (split.dataFilePath.isEmpty) { diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 089e5569e8d..acc6eecb51a 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -477,8 +477,12 @@ identifierSeq : ident+=errorCapturingIdentifier (COMMA ident+=errorCapturingIdentifier)* ; +optScanParams + : ATSIGN funcName=identifier LEFT_PAREN (properties=propertyItemList)? RIGHT_PAREN + ; + relationPrimary - : multipartIdentifier materializedViewName? specifiedPartition? + : multipartIdentifier optScanParams? materializedViewName? specifiedPartition? tabletList? tableAlias sample? relationHint? lateralView* #tableName | LEFT_PAREN query RIGHT_PAREN tableAlias lateralView* #aliasedQuery | tvfName=identifier LEFT_PAREN diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index bec106761e3..f907b762407 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -839,6 +839,7 @@ nonterminal String opt_job_starts; nonterminal String opt_job_ends; nonterminal String job_at_time; nonterminal ColocateGroupName colocate_group_name; +nonterminal TableScanParams opt_scan_params_ref; nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type; @@ -5762,6 +5763,17 @@ colocate_group_name ::= :} ; +opt_scan_params_ref ::= + /* empty */ + {: + RESULT = null; + :} + | AT ident:func_name LPAREN opt_key_value_map_in_paren:properties RPAREN + {: + RESULT = new TableScanParams(func_name, properties); + :} + ; + encryptkey_name ::= ident:name {: @@ -5882,9 +5894,9 @@ base_table_ref_list ::= ; base_table_ref ::= - table_name:name opt_table_snapshot:tableSnapshot opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints + table_name:name opt_scan_params_ref:scanParams opt_table_snapshot:tableSnapshot opt_partition_names:partitionNames opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints {: - RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints, tableSnapshot); + RESULT = new TableRef(name, alias, partitionNames, tabletIds, tableSample, commonHints, tableSnapshot, scanParams); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 0ade6730c0d..13821a510c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -140,6 +140,8 @@ public class TableRef implements ParseNode, Writable { private TableSnapshot tableSnapshot; + private TableScanParams scanParams; + // END: Members that need to be reset() // /////////////////////////////////////// @@ -169,6 +171,12 @@ public class TableRef implements ParseNode, Writable { public TableRef(TableName name, String alias, PartitionNames partitionNames, ArrayList<Long> sampleTabletIds, TableSample tableSample, ArrayList<String> commonHints, TableSnapshot tableSnapshot) { + this(name, alias, partitionNames, sampleTabletIds, tableSample, commonHints, tableSnapshot, null); + } + + public TableRef(TableName name, String alias, PartitionNames partitionNames, + ArrayList<Long> sampleTabletIds, TableSample tableSample, ArrayList<String> commonHints, + TableSnapshot tableSnapshot, TableScanParams scanParams) { this.name = name; if (alias != null) { if (Env.isStoredTableNamesLowerCase()) { @@ -184,6 +192,7 @@ public class TableRef implements ParseNode, Writable { this.tableSample = tableSample; this.commonHints = commonHints; this.tableSnapshot = tableSnapshot; + this.scanParams = scanParams; isAnalyzed = false; } @@ -204,6 +213,7 @@ public class TableRef implements ParseNode, Writable { onClause = (other.onClause != null) ? other.onClause.clone().reset() : null; partitionNames = (other.partitionNames != null) ? new PartitionNames(other.partitionNames) : null; tableSnapshot = (other.tableSnapshot != null) ? new TableSnapshot(other.tableSnapshot) : null; + scanParams = other.scanParams; tableSample = (other.tableSample != null) ? new TableSample(other.tableSample) : null; commonHints = other.commonHints; @@ -333,6 +343,10 @@ public class TableRef implements ParseNode, Writable { return desc != null; } + public TableScanParams getScanParams() { + return scanParams; + } + /** * This method should only be called after the TableRef has been analyzed. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java new file mode 100644 index 00000000000..ab1491ccc3a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class TableScanParams { + public static String INCREMENTAL_READ = "incr"; + + private final String paramType; + private final Map<String, String> params; + + public TableScanParams(String paramType, Map<String, String> params) { + this.paramType = paramType; + this.params = params == null ? ImmutableMap.of() : ImmutableMap.copyOf(params); + } + + public String getParamType() { + return paramType; + } + + public Map<String, String> getParams() { + return params; + } + + public boolean incrementalRead() { + return INCREMENTAL_READ.equals(paramType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index 50eac67deff..a788d9e57bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.analysis.TableScanParams; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; @@ -28,12 +29,23 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.hudi.HudiUtils; +import org.apache.doris.datasource.hudi.source.COWIncrementalRelation; +import org.apache.doris.datasource.hudi.source.IncrementalRelation; +import org.apache.doris.datasource.hudi.source.MORIncrementalRelation; import org.apache.doris.datasource.iceberg.IcebergUtils; import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVTimestampSnapshot; import org.apache.doris.nereids.exceptions.NotSupportedException; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.StringLiteral; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -45,6 +57,7 @@ import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -61,6 +74,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,6 +82,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; import java.time.LocalDate; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -147,6 +162,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI // record the event update time when enable hms event listener protected volatile long eventUpdateTime; + // for hudi incremental read + private TableScanParams scanParams = null; + private IncrementalRelation incrementalRelation = null; + public enum DLAType { UNKNOWN, HIVE, HUDI, ICEBERG } @@ -224,7 +243,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return false; } String inputFormatName = remoteTable.getSd().getInputFormat(); - return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName); + Map<String, String> params = remoteTable.getParameters(); + return "org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName) + || "skip_merge".equals(getCatalogProperties().get("hoodie.datasource.merge.type")) + || (params != null && "COPY_ON_WRITE".equalsIgnoreCase(params.get("flink.table.type"))); } /** @@ -281,6 +303,82 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI return partitionColumns; } + public TableScanParams getScanParams() { + return scanParams; + } + + public void setScanParams(TableScanParams scanParams) { + if (scanParams != null && scanParams.incrementalRead()) { + Map<String, String> optParams = getHadoopProperties(); + if (scanParams.getParams().containsKey("beginTime")) { + optParams.put("hoodie.datasource.read.begin.instanttime", scanParams.getParams().get("beginTime")); + } + if (scanParams.getParams().containsKey("endTime")) { + optParams.put("hoodie.datasource.read.end.instanttime", scanParams.getParams().get("endTime")); + } + scanParams.getParams().forEach((k, v) -> { + if (k.startsWith("hoodie.")) { + optParams.put(k, v); + } + }); + HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(this); + try { + boolean isCowOrRoTable = isHoodieCowTable(); + if (isCowOrRoTable) { + Map<String, String> serd = remoteTable.getSd().getSerdeInfo().getParameters(); + if ("true".equals(serd.get("hoodie.query.as.ro.table")) + && remoteTable.getTableName().endsWith("_ro")) { + // Incremental read RO table as RT table, I don't know why? + isCowOrRoTable = false; + LOG.warn("Execute incremental read on RO table"); + } + } + if (isCowOrRoTable) { + incrementalRelation = new COWIncrementalRelation( + optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient); + } else { + incrementalRelation = new MORIncrementalRelation( + optParams, HiveMetaStoreClientHelper.getConfiguration(this), hudiClient); + } + } catch (Exception e) { + LOG.warn("Failed to create incremental relation", e); + } + } + this.scanParams = scanParams; + } + + public IncrementalRelation getIncrementalRelation() { + return incrementalRelation; + } + + /** + * replace incremental params as AND expression + * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') => + * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= '20240308110677278' + */ + public Set<Expression> generateIncrementalExpression(List<Slot> slots) { + if (incrementalRelation == null) { + return Collections.emptySet(); + } + SlotReference timeField = null; + for (Slot slot : slots) { + if ("_hoodie_commit_time".equals(slot.getName())) { + timeField = (SlotReference) slot; + break; + } + } + if (timeField == null) { + return Collections.emptySet(); + } + StringLiteral upperValue = new StringLiteral(incrementalRelation.getEndTs()); + StringLiteral lowerValue = new StringLiteral(incrementalRelation.getStartTs()); + ComparisonPredicate less = new LessThanEqual(timeField, upperValue); + ComparisonPredicate great = incrementalRelation.isIncludeStartTime() + ? new GreaterThanEqual(timeField, lowerValue) + : new GreaterThan(timeField, lowerValue); + return ImmutableSet.of(great, less); + } + public boolean isHiveTransactionalTable() { return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable) && isSupportedTransactionalFileFormat(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java new file mode 100644 index 00000000000..fa24dc53e56 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.datasource.FileSplit; +import org.apache.doris.spi.Split; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobPattern; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class COWIncrementalRelation implements IncrementalRelation { + private final Map<String, String> optParams; + private final HoodieTableMetaClient metaClient; + private final HollowCommitHandling hollowCommitHandling; + private final boolean startInstantArchived; + private final boolean endInstantArchived; + private final boolean fullTableScan; + private final FileSystem fs; + private final Map<String, HoodieWriteStat> fileToWriteStat; + private final Collection<String> filteredRegularFullPaths; + private final Collection<String> filteredMetaBootstrapFullPaths; + + private final boolean includeStartTime; + private final String startTs; + private final String endTs; + + public COWIncrementalRelation(Map<String, String> optParams, Configuration configuration, + HoodieTableMetaClient metaClient) + throws HoodieException, IOException { + this.optParams = optParams; + this.metaClient = metaClient; + hollowCommitHandling = HollowCommitHandling.valueOf( + optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL")); + HoodieTimeline commitTimeline = TimelineUtils.handleHollowCommitIfNeeded( + metaClient.getCommitTimeline().filterCompletedInstants(), metaClient, hollowCommitHandling); + if (commitTimeline.empty()) { + throw new HoodieException("No instants to incrementally pull"); + } + if (!metaClient.getTableConfig().populateMetaFields()) { + throw new HoodieException("Incremental queries are not supported when meta fields are disabled"); + } + HoodieInstant lastInstant = commitTimeline.lastInstant().get(); + String startInstantTime = optParams.get("hoodie.datasource.read.begin.instanttime"); + if (startInstantTime == null) { + throw new HoodieException("Specify the begin instant time to pull from using " + + "option hoodie.datasource.read.begin.instanttime"); + } + if (EARLIEST_TIME.equals(startInstantTime)) { + startInstantTime = "000"; + } + String endInstantTime = optParams.getOrDefault("hoodie.datasource.read.end.instanttime", + lastInstant.getTimestamp()); + startInstantArchived = commitTimeline.isBeforeTimelineStarts(startInstantTime); + endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime); + + HoodieTimeline commitsTimelineToReturn; + if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) { + commitsTimelineToReturn = commitTimeline.findInstantsInRangeByStateTransitionTime(startInstantTime, + lastInstant.getStateTransitionTime()); + } else { + commitsTimelineToReturn = commitTimeline.findInstantsInRange(startInstantTime, lastInstant.getTimestamp()); + } + List<HoodieInstant> commitsToReturn = commitsTimelineToReturn.getInstants(); + + // todo: support configuration hoodie.datasource.read.incr.filters + Path basePath = metaClient.getBasePathV2(); + Map<String, String> regularFileIdToFullPath = new HashMap<>(); + Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>(); + HoodieTimeline replacedTimeline = commitsTimelineToReturn.getCompletedReplaceTimeline(); + Map<String, String> replacedFile = new HashMap<>(); + for (HoodieInstant instant : replacedTimeline.getInstants()) { + HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(), + HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach( + (key, value) -> value.forEach( + e -> replacedFile.put(e, FSUtils.getPartitionPath(basePath, key).toString()))); + } + + fileToWriteStat = new HashMap<>(); + for (HoodieInstant commit : commitsToReturn) { + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( + commitTimeline.getInstantDetails(commit).get(), HoodieCommitMetadata.class); + metadata.getPartitionToWriteStats().forEach((partition, stats) -> { + for (HoodieWriteStat stat : stats) { + fileToWriteStat.put(FSUtils.getPartitionPath(basePath, stat.getPath()).toString(), stat); + } + }); + if (HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) { + metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> { + if (!(replacedFile.containsKey(k) && v.startsWith(replacedFile.get(k)))) { + metaBootstrapFileIdToFullPath.put(k, v); + } + }); + } else { + metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> { + if (!(replacedFile.containsKey(k) && v.startsWith(replacedFile.get(k)))) { + regularFileIdToFullPath.put(k, v); + } + }); + } + } + + if (!metaBootstrapFileIdToFullPath.isEmpty()) { + // filer out meta bootstrap files that have had more commits since metadata bootstrap + metaBootstrapFileIdToFullPath.entrySet().removeIf(e -> regularFileIdToFullPath.containsKey(e.getKey())); + } + String pathGlobPattern = optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", ""); + if ("".equals(pathGlobPattern)) { + filteredRegularFullPaths = regularFileIdToFullPath.values(); + filteredMetaBootstrapFullPaths = metaBootstrapFileIdToFullPath.values(); + } else { + GlobPattern globMatcher = new GlobPattern("*" + pathGlobPattern); + filteredRegularFullPaths = regularFileIdToFullPath.values().stream().filter(globMatcher::matches) + .collect(Collectors.toList()); + filteredMetaBootstrapFullPaths = metaBootstrapFileIdToFullPath.values().stream() + .filter(globMatcher::matches).collect(Collectors.toList()); + + } + + fs = basePath.getFileSystem(configuration); + fullTableScan = shouldFullTableScan(); + includeStartTime = !fullTableScan; + if (fullTableScan || commitsToReturn.isEmpty()) { + startTs = startInstantTime; + endTs = endInstantTime; + } else { + startTs = commitsToReturn.get(0).getTimestamp(); + endTs = commitsToReturn.get(commitsToReturn.size() - 1).getTimestamp(); + } + } + + private boolean shouldFullTableScan() throws HoodieException, IOException { + boolean fallbackToFullTableScan = Boolean.parseBoolean( + optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable", "false")); + if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) { + if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) { + throw new HoodieException("Cannot use stateTransitionTime while enables full table scan"); + } + return true; + } + if (fallbackToFullTableScan) { + for (String path : filteredMetaBootstrapFullPaths) { + if (!fs.exists(new Path(path))) { + return true; + } + } + for (String path : filteredRegularFullPaths) { + if (!fs.exists(new Path(path))) { + return true; + } + } + } + return false; + } + + @Override + public List<FileSlice> collectFileSlices() throws HoodieException { + throw new UnsupportedOperationException(); + } + + @Override + public List<Split> collectSplits() throws HoodieException { + if (fullTableScan) { + throw new HoodieException("Fallback to full table scan"); + } + if (filteredRegularFullPaths.isEmpty() && filteredMetaBootstrapFullPaths.isEmpty()) { + return Collections.emptyList(); + } + List<Split> splits = new ArrayList<>(); + Option<String[]> partitionColumns = metaClient.getTableConfig().getPartitionFields(); + List<String> partitionNames = partitionColumns.isPresent() ? Arrays.asList(partitionColumns.get()) + : Collections.emptyList(); + for (String baseFile : filteredMetaBootstrapFullPaths) { + HoodieWriteStat stat = fileToWriteStat.get(baseFile); + splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), + new String[0], + HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + } + for (String baseFile : filteredRegularFullPaths) { + HoodieWriteStat stat = fileToWriteStat.get(baseFile); + splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(), + new String[0], + HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath()))); + } + return splits; + } + + @Override + public Map<String, String> getHoodieParams() { + optParams.put("hoodie.datasource.read.incr.operation", "true"); + optParams.put("hoodie.datasource.read.begin.instanttime", startTs); + optParams.put("hoodie.datasource.read.end.instanttime", endTs); + optParams.put("hoodie.datasource.read.incr.includeStartTime", includeStartTime ? "true" : "false"); + return optParams; + } + + @Override + public boolean fallbackFullTableScan() { + return fullTableScan; + } + + @Override + public boolean isIncludeStartTime() { + return includeStartTime; + } + + @Override + public String getStartTs() { + return startTs; + } + + @Override + public String getEndTs() { + return endTs; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java index dfbb12e8584..58068c575d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.hudi.source; +import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.PartitionItem; @@ -50,6 +51,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -63,6 +65,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -78,12 +81,22 @@ public class HudiScanNode extends HiveScanNode { private static final Logger LOG = LogManager.getLogger(HudiScanNode.class); - private final boolean isCowOrRoTable; + private boolean isCowOrRoTable; private final AtomicLong noLogsSplitNum = new AtomicLong(0); private final boolean useHiveSyncPartition; + private HoodieTableMetaClient hudiClient; + private String basePath; + private String inputFormat; + private String serdeLib; + private List<String> columnNames; + private List<String> columnTypes; + + private boolean incrementalRead = false; + private IncrementalRelation incrementalRelation; + /** * External file scan node for Query Hudi table * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv @@ -92,10 +105,7 @@ public class HudiScanNode extends HiveScanNode { */ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, needCheckColumnPriv); - Map<String, String> paras = hmsTable.getRemoteTable().getParameters(); - isCowOrRoTable = hmsTable.isHoodieCowTable() - || "skip_merge".equals(hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type")) - || (paras != null && "COPY_ON_WRITE".equalsIgnoreCase(paras.get("flink.table.type"))); + isCowOrRoTable = hmsTable.isHoodieCowTable(); if (isCowOrRoTable) { if (LOG.isDebugEnabled()) { LOG.debug("Hudi table {} can read as cow/read optimize table", hmsTable.getName()); @@ -129,12 +139,66 @@ public class HudiScanNode extends HiveScanNode { computeColumnsFilter(); initBackendPolicy(); initSchemaParams(); + + hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); + hudiClient.reloadActiveTimeline(); + basePath = hmsTable.getRemoteTable().getSd().getLocation(); + inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); + serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib(); + columnNames = new ArrayList<>(); + columnTypes = new ArrayList<>(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient); + Schema hudiSchema; + try { + hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); + } catch (Exception e) { + throw new UserException("Cannot get hudi table schema."); + } + for (Schema.Field hudiField : hudiSchema.getFields()) { + columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); + String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema()); + columnTypes.add(columnType); + } + + TableScanParams scanParams = desc.getRef().getScanParams(); + if (scanParams != null) { + throw new UserException("Incremental read should turn on nereids planner"); + } + scanParams = hmsTable.getScanParams(); + if (scanParams != null) { + if (scanParams.incrementalRead()) { + incrementalRead = true; + } else { + throw new UserException("Not support function '" + scanParams.getParamType() + "' in hudi table"); + } + } + if (incrementalRead) { + if (isCowOrRoTable) { + try { + Map<String, String> serd = hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters(); + if ("true".equals(serd.get("hoodie.query.as.ro.table")) + && hmsTable.getRemoteTable().getTableName().endsWith("_ro")) { + // Incremental read RO table as RT table, I don't know why? + isCowOrRoTable = false; + LOG.warn("Execute incremental read on RO table"); + } + } catch (Exception e) { + // ignore + } + } + incrementalRelation = hmsTable.getIncrementalRelation(); + if (incrementalRelation == null) { + throw new UserException("Failed to create incremental relation"); + } + } else { + incrementalRelation = null; + } } @Override protected Map<String, String> getLocationProperties() throws UserException { - if (isCowOrRoTable) { - return super.getLocationProperties(); + if (incrementalRead) { + return incrementalRelation.getHoodieParams(); } else { // HudiJniScanner uses hadoop client to read data. return hmsTable.getHadoopProperties(); @@ -176,7 +240,7 @@ public class HudiScanNode extends HiveScanNode { TablePartitionValues partitionValues; if (snapshotTimestamp.isPresent()) { partitionValues = processor.getSnapshotPartitionValues( - hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); + hmsTable, metaClient, snapshotTimestamp.get(), useHiveSyncPartition); } else { partitionValues = processor.getPartitionValues(hmsTable, metaClient, useHiveSyncPartition); } @@ -222,28 +286,24 @@ public class HudiScanNode extends HiveScanNode { return Lists.newArrayList(dummyPartition); } - @Override - public List<Split> getSplits() throws UserException { - HoodieTableMetaClient hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable); - hudiClient.reloadActiveTimeline(); - String basePath = hmsTable.getRemoteTable().getSd().getLocation(); - String inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat(); - String serdeLib = hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib(); - - TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient); - Schema hudiSchema; - try { - hudiSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema()); - } catch (Exception e) { - throw new RuntimeException("Cannot get hudi table schema."); + private List<Split> getIncrementalSplits() { + if (isCowOrRoTable) { + List<Split> splits = incrementalRelation.collectSplits(); + noLogsSplitNum.addAndGet(splits.size()); + return splits; } + Option<String[]> partitionColumns = hudiClient.getTableConfig().getPartitionFields(); + List<String> partitionNames = partitionColumns.isPresent() ? Arrays.asList(partitionColumns.get()) + : Collections.emptyList(); + return incrementalRelation.collectFileSlices().stream().map(fileSlice -> generateHudiSplit(fileSlice, + HudiPartitionProcessor.parsePartitionValues(partitionNames, fileSlice.getPartitionPath()), + incrementalRelation.getEndTs())).collect(Collectors.toList()); + } - List<String> columnNames = new ArrayList<>(); - List<String> columnTypes = new ArrayList<>(); - for (Schema.Field hudiField : hudiSchema.getFields()) { - columnNames.add(hudiField.name().toLowerCase(Locale.ROOT)); - String columnType = HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema()); - columnTypes.add(columnType); + @Override + public List<Split> getSplits() throws UserException { + if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) { + return getIncrementalSplits(); } HoodieTimeline timeline = hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); @@ -300,33 +360,9 @@ public class HudiScanNode extends HiveScanNode { new String[0], partition.getPartitionValues())); }); } else { - fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> { - Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional(); - String filePath = baseFile.map(BaseFile::getPath).orElse(""); - long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); - - List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) - .map(Path::toString) - .collect(Collectors.toList()); - if (logs.isEmpty()) { - noLogsSplitNum.incrementAndGet(); - } - - // no base file, use log file to parse file type - String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; - HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, - new String[0], partition.getPartitionValues()); - split.setTableFormatType(TableFormatType.HUDI); - split.setDataFilePath(filePath); - split.setHudiDeltaLogs(logs); - split.setInputFormat(inputFormat); - split.setSerde(serdeLib); - split.setBasePath(basePath); - split.setHudiColumnNames(columnNames); - split.setHudiColumnTypes(columnTypes); - split.setInstantTime(queryInstant); - splits.add(split); - }); + fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant) + .forEach(fileSlice -> splits.add( + generateHudiSplit(fileSlice, partition.getPartitionValues(), queryInstant))); } countDownLatch.countDown(); })); @@ -338,6 +374,35 @@ public class HudiScanNode extends HiveScanNode { return splits; } + private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> partitionValues, String queryInstant) { + Optional<HoodieBaseFile> baseFile = fileSlice.getBaseFile().toJavaOptional(); + String filePath = baseFile.map(BaseFile::getPath).orElse(""); + long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L); + fileSlice.getPartitionPath(); + + List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath) + .map(Path::toString) + .collect(Collectors.toList()); + if (logs.isEmpty()) { + noLogsSplitNum.incrementAndGet(); + } + + // no base file, use log file to parse file type + String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath; + HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize, + new String[0], partitionValues); + split.setTableFormatType(TableFormatType.HUDI); + split.setDataFilePath(filePath); + split.setHudiDeltaLogs(logs); + split.setInputFormat(inputFormat); + split.setSerde(serdeLib); + split.setBasePath(basePath); + split.setHudiColumnNames(columnNames); + split.setHudiColumnTypes(columnTypes); + split.setInstantTime(queryInstant); + return split; + } + @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { return super.getNodeExplainString(prefix, detailLevel) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java new file mode 100644 index 00000000000..4a707064fb6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.spi.Split; + +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.exception.HoodieException; + +import java.util.List; +import java.util.Map; + +public interface IncrementalRelation { + public static String EARLIEST_TIME = "earliest"; + + List<FileSlice> collectFileSlices() throws HoodieException; + + List<Split> collectSplits() throws HoodieException; + + Map<String, String> getHoodieParams(); + + boolean fallbackFullTableScan(); + + boolean isIncludeStartTime(); + + String getStartTs(); + + String getEndTs(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java new file mode 100644 index 00000000000..c06fcc2a578 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hudi.source; + +import org.apache.doris.spi.Split; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.GlobPattern; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.BaseFile; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class MORIncrementalRelation implements IncrementalRelation { + private final Map<String, String> optParams; + private final HoodieTableMetaClient metaClient; + private final HoodieTimeline timeline; + private final HollowCommitHandling hollowCommitHandling; + private String startTimestamp; + private final String endTimestamp; + private final boolean startInstantArchived; + private final boolean endInstantArchived; + private final List<HoodieInstant> includedCommits; + private final List<HoodieCommitMetadata> commitsMetadata; + private final FileStatus[] affectedFilesInCommits; + private final boolean fullTableScan; + private final String globPattern; + private final boolean includeStartTime; + private final String startTs; + private final String endTs; + + + public MORIncrementalRelation(Map<String, String> optParams, Configuration configuration, + HoodieTableMetaClient metaClient) + throws HoodieException, IOException { + this.optParams = optParams; + this.metaClient = metaClient; + timeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); + if (timeline.empty()) { + throw new HoodieException("No instants to incrementally pull"); + } + if (!metaClient.getTableConfig().populateMetaFields()) { + throw new HoodieException("Incremental queries are not supported when meta fields are disabled"); + } + hollowCommitHandling = HollowCommitHandling.valueOf( + optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL")); + + startTimestamp = optParams.get("hoodie.datasource.read.begin.instanttime"); + if (startTimestamp == null) { + throw new HoodieException("Specify the begin instant time to pull from using " + + "option hoodie.datasource.read.begin.instanttime"); + } + if (EARLIEST_TIME.equals(startTimestamp)) { + startTimestamp = "000"; + } + endTimestamp = optParams.getOrDefault("hoodie.datasource.read.end.instanttime", + hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME + ? timeline.lastInstant().get().getStateTransitionTime() + : timeline.lastInstant().get().getTimestamp()); + + startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp); + endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp); + + includedCommits = getIncludedCommits(); + commitsMetadata = getCommitsMetadata(); + affectedFilesInCommits = HoodieInputFormatUtils.listAffectedFilesForCommits(configuration, + new Path(metaClient.getBasePath()), commitsMetadata); + fullTableScan = shouldFullTableScan(); + if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME && fullTableScan) { + throw new HoodieException("Cannot use stateTransitionTime while enables full table scan"); + } + globPattern = optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", ""); + + if (startInstantArchived) { + includeStartTime = false; + startTs = startTimestamp; + } else { + includeStartTime = true; + startTs = includedCommits.isEmpty() ? startTimestamp : includedCommits.get(0).getTimestamp(); + } + endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp + : includedCommits.get(includedCommits.size() - 1).getTimestamp(); + } + + @Override + public Map<String, String> getHoodieParams() { + optParams.put("hoodie.datasource.read.incr.operation", "true"); + optParams.put("hoodie.datasource.read.begin.instanttime", startTs); + optParams.put("hoodie.datasource.read.end.instanttime", endTs); + optParams.put("hoodie.datasource.read.incr.includeStartTime", includeStartTime ? "true" : "false"); + return optParams; + } + + private List<HoodieInstant> getIncludedCommits() { + if (!startInstantArchived || !endInstantArchived) { + // If endTimestamp commit is not archived, will filter instants + // before endTimestamp. + if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) { + return timeline.findInstantsInRangeByStateTransitionTime(startTimestamp, endTimestamp).getInstants(); + } else { + return timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants(); + } + } else { + return timeline.getInstants(); + } + } + + private List<HoodieCommitMetadata> getCommitsMetadata() throws IOException { + List<HoodieCommitMetadata> result = new ArrayList<>(); + for (HoodieInstant commit : includedCommits) { + result.add(TimelineUtils.getCommitMetadata(commit, timeline)); + } + return result; + } + + private boolean shouldFullTableScan() throws IOException { + boolean should = Boolean.parseBoolean( + optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable", "false")) && ( + startInstantArchived || endInstantArchived); + if (should) { + return true; + } + for (FileStatus fileStatus : affectedFilesInCommits) { + if (!metaClient.getFs().exists(fileStatus.getPath())) { + return true; + } + } + return false; + } + + @Override + public boolean fallbackFullTableScan() { + return fullTableScan; + } + + @Override + public boolean isIncludeStartTime() { + return includeStartTime; + } + + @Override + public String getStartTs() { + return startTs; + } + + @Override + public String getEndTs() { + return endTs; + } + + @Override + public List<FileSlice> collectFileSlices() throws HoodieException { + if (includedCommits.isEmpty()) { + return Collections.emptyList(); + } else if (fullTableScan) { + throw new HoodieException("Fallback to full table scan"); + } + HoodieTimeline scanTimeline; + if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) { + scanTimeline = metaClient.getCommitsAndCompactionTimeline() + .findInstantsInRangeByStateTransitionTime(startTimestamp, endTimestamp); + } else { + scanTimeline = TimelineUtils.handleHollowCommitIfNeeded( + metaClient.getCommitsAndCompactionTimeline(), metaClient, hollowCommitHandling) + .findInstantsInRange(startTimestamp, endTimestamp); + } + String latestCommit = includedCommits.get(includedCommits.size() - 1).getTimestamp(); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, scanTimeline, + affectedFilesInCommits); + Stream<FileSlice> fileSlices = HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata) + .stream().flatMap(relativePartitionPath -> + fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit)); + if ("".equals(globPattern)) { + return fileSlices.collect(Collectors.toList()); + } + GlobPattern globMatcher = new GlobPattern("*" + globPattern); + return fileSlices.filter(fileSlice -> globMatcher.matches(fileSlice.getBaseFile().map(BaseFile::getPath) + .or(fileSlice.getLatestLogFile().map(f -> f.getPath().toString())).get())).collect(Collectors.toList()); + } + + @Override + public List<Split> collectSplits() throws HoodieException { + throw new UnsupportedOperationException(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java index 74f85e31651..4514ea05bfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.analyzer; +import org.apache.doris.analysis.TableScanParams; import org.apache.doris.nereids.exceptions.UnboundException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; @@ -53,21 +54,36 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu private final List<String> hints; private final Optional<TableSample> tableSample; private final Optional<String> indexName; + private TableScanParams scanParams; public UnboundRelation(RelationId id, List<String> nameParts) { this(id, nameParts, Optional.empty(), Optional.empty(), ImmutableList.of(), false, ImmutableList.of(), - ImmutableList.of(), Optional.empty(), Optional.empty()); + ImmutableList.of(), Optional.empty(), Optional.empty(), null); } public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart) { this(id, nameParts, Optional.empty(), Optional.empty(), partNames, isTempPart, ImmutableList.of(), - ImmutableList.of(), Optional.empty(), Optional.empty()); + ImmutableList.of(), Optional.empty(), Optional.empty(), null); } public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart, List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) { this(id, nameParts, Optional.empty(), Optional.empty(), - partNames, isTempPart, tabletIds, hints, tableSample, indexName); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, null); + } + + public UnboundRelation(RelationId id, List<String> nameParts, List<String> partNames, boolean isTempPart, + List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName, + TableScanParams scanParams) { + this(id, nameParts, Optional.empty(), Optional.empty(), + partNames, isTempPart, tabletIds, hints, tableSample, indexName, scanParams); + } + + public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart, + List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) { + this(id, nameParts, groupExpression, logicalProperties, partNames, + isTempPart, tabletIds, hints, tableSample, indexName, null); } /** @@ -75,7 +91,8 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu */ public UnboundRelation(RelationId id, List<String> nameParts, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<String> partNames, boolean isTempPart, - List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName) { + List<Long> tabletIds, List<String> hints, Optional<TableSample> tableSample, Optional<String> indexName, + TableScanParams scanParams) { super(id, PlanType.LOGICAL_UNBOUND_RELATION, groupExpression, logicalProperties); this.nameParts = ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not null")); this.partNames = ImmutableList.copyOf(Objects.requireNonNull(partNames, "partNames should not null")); @@ -84,6 +101,7 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu this.hints = ImmutableList.copyOf(Objects.requireNonNull(hints, "hints should not be null.")); this.tableSample = tableSample; this.indexName = indexName; + this.scanParams = scanParams; } public List<String> getNameParts() { @@ -104,14 +122,14 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { return new UnboundRelation(relationId, nameParts, groupExpression, Optional.of(getLogicalProperties()), - partNames, isTempPart, tabletIds, hints, tableSample, indexName); + partNames, isTempPart, tabletIds, hints, tableSample, indexName, null); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { return new UnboundRelation(relationId, nameParts, groupExpression, logicalProperties, partNames, - isTempPart, tabletIds, hints, tableSample, indexName); + isTempPart, tabletIds, hints, tableSample, indexName, null); } @Override @@ -165,4 +183,8 @@ public class UnboundRelation extends LogicalRelation implements Unbound, BlockFu public Optional<TableSample> getTableSample() { return tableSample; } + + public TableScanParams getScanParams() { + return scanParams; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 4fe38f409b0..211583f4832 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.ArithmeticExpr.Operator; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TableScanParams; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.BuiltinAggregateFunctions; @@ -1308,11 +1309,17 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { relationHints = ImmutableList.of(); } + TableScanParams scanParams = null; + if (ctx.optScanParams() != null) { + Map<String, String> map = visitPropertyItemList(ctx.optScanParams().properties); + scanParams = new TableScanParams(ctx.optScanParams().funcName.getText(), map); + } + TableSample tableSample = ctx.sample() == null ? null : (TableSample) visit(ctx.sample()); LogicalPlan checkedRelation = LogicalPlanBuilderAssistant.withCheckPolicy( new UnboundRelation(StatementScopeIdGenerator.newRelationId(), tableId, partitionNames, isTempPart, tabletIdLists, relationHints, - Optional.ofNullable(tableSample), indexName)); + Optional.ofNullable(tableSample), indexName, scanParams)); LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias()); for (LateralViewContext lateralViewContext : ctx.lateralView()) { plan = withGenerate(plan, lateralViewContext); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index d74c487d296..e41c4283d60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -249,12 +249,14 @@ public class BindRelation extends OneAnalysisRuleFactory { LogicalView<Plan> logicalView = new LogicalView<>(view, viewBody); return new LogicalSubQueryAlias<>(tableQualifier, logicalView); case HMS_EXTERNAL_TABLE: - if (Config.enable_query_hive_views && ((HMSExternalTable) table).isView()) { - String hiveCatalog = ((HMSExternalTable) table).getCatalog().getName(); - String ddlSql = ((HMSExternalTable) table).getViewText(); + HMSExternalTable hmsTable = (HMSExternalTable) table; + if (Config.enable_query_hive_views && hmsTable.isView()) { + String hiveCatalog = hmsTable.getCatalog().getName(); + String ddlSql = hmsTable.getViewText(); Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, ddlSql, cascadesContext); return new LogicalSubQueryAlias<>(tableQualifier, hiveViewPlan); } + hmsTable.setScanParams(unboundRelation.getScanParams()); return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table, tableQualifier, unboundRelation.getTableSample()); case ICEBERG_EXTERNAL_TABLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java index 24f8396cf21..aa2917ae1e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java @@ -17,20 +17,22 @@ package org.apache.doris.nereids.rules.analysis; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.collect.ImmutableList; +import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; /** @@ -58,11 +60,26 @@ public class CheckPolicy implements AnalysisRuleFactory { return ctx.root.child(); } LogicalRelation relation = (LogicalRelation) child; - Optional<Expression> filter = checkPolicy.getFilter(relation, ctx.connectContext); - if (!filter.isPresent()) { + Set<Expression> combineFilter = new HashSet<>(); + + // replace incremental params as AND expression + if (relation instanceof LogicalFileScan) { + LogicalFileScan fileScan = (LogicalFileScan) relation; + if (fileScan.getTable() instanceof HMSExternalTable) { + HMSExternalTable hmsTable = (HMSExternalTable) fileScan.getTable(); + combineFilter.addAll(hmsTable.generateIncrementalExpression( + fileScan.getLogicalProperties().getOutput())); + } + } + + // row policy + checkPolicy.getFilter(relation, ctx.connectContext) + .ifPresent(expression -> combineFilter.addAll( + ExpressionUtils.extractConjunctionToSet(expression))); + + if (combineFilter.isEmpty()) { return ctx.root.child(); } - Set<Expression> combineFilter = ExpressionUtils.extractConjunctionToSet(filter.get()); if (upperFilter != null) { combineFilter.addAll(upperFilter.getConjuncts()); } diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out b/regression-test/data/external_table_p2/hive/test_hive_hudi.out index 9202749e617..d217a289d8d 100644 --- a/regression-test/data/external_table_p2/hive/test_hive_hudi.out +++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out @@ -135,6 +135,18 @@ row_4 2021-02-01 4 v_4 20240221111000868 20240221111000868_0_8 e3cf430c-889d-4015-bc98-59bdce1e530c sao_paulo c97347e9-033a-4c19-a033-94ac1de9f892 1695516137016 e3cf430c-889d-4015-bc98-59bdce1e530c rider-F driver-P 34.15 sao_paulo 20240221111000868 20240221111000868_0_4 e96c4396-3fad-413a-a942-4cb36106d721 san_francisco 3efcaa94-3e58-436a-b489-1232731ed088 1695091554788 e96c4396-3fad-413a-a942-4cb36106d721 rider-C driver-M 27.7 san_francisco +-- !incr_mor_table -- +20240312163737521 20240312163737521_0_1 20240312163737521_1_0 pid=p2 92e677af-6487-4213-b42f-ee56c5a2acdc-0_0-86-568_20240312163737521.parquet 4 k4 4.992 p2 +20240312163737521 20240312163737521_1_0 20240312163737521_0_0 pid=p3 22af1878-d8e0-4829-b4af-c7c9693d33f3-0_1-86-569_20240312163737521.parquet 5 k5 5.5 p3 +20240312163613712 20240312163613712_0_1 20240312163541346_0_0 pid=p1 ead436a2-f99b-4d35-8b3e-9d67e4828dd5-0_0-99-636_20240312163738946.parquet 1 k1 1.134 p1 + +-- !inc_cow_table -- +20240312164834145 20240312164834145_0_0 20240312164538551_2_0 pid=p2 b8c2db38-e6b8-41f4-8132-620106e8c7e9-0_0-222-1414_20240312164938557.parquet 3 k3-3 3.77 p2 +20240312164938557 20240312164938557_0_1 20240312164938557_1_0 pid=p2 b8c2db38-e6b8-41f4-8132-620106e8c7e9-0_0-222-1414_20240312164938557.parquet 4 k4 4.992 p2 +20240312164938557 20240312164938557_1_0 20240312164938557_0_0 pid=p3 1335e747-d611-4575-8612-15e491224a0e-0_1-222-1415_20240312164938557.parquet 5 k5 5.5 p3 +20240312164613448 20240312164613448_0_0 20240312164538551_0_0 pid=p1 372f896b-10cb-4803-b109-d467189326b8-0_0-166-1042_20240312164650751.parquet 1 k1 1.37 p1 +20240312164650751 20240312164650751_0_1 20240312164538551_1_0 pid=p1 372f896b-10cb-4803-b109-d467189326b8-0_0-166-1042_20240312164650751.parquet 2 k2 2.2 p1 + -- !skip_merge -- 20230605145009209 20230605145009209_0_0 rowId:row_1 partitionId=2021-01-01/versionId=v_0 65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet row_1 2021-01-01 0 bob v_0 toBeDel0 0 1000000 20230605145403388 20230605145403388_2_0 rowId:row_1 partitionId=2011-11-11/versionId=v_1 dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet row_1 2011-11-11 1 bob v_1 toBeDel1 0 1000001 diff --git a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy index d852e604df5..75c22b26cd7 100644 --- a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy +++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy @@ -48,6 +48,12 @@ suite("test_hive_hudi", "p2,external,hive,hudi") { // hudi table created by flink hudi catalog qt_flink_hudi_catalog """select * from hudi_ctl_table order by uuid""" + // incremental read for MOR table + qt_incr_mor_table """select * from incr_mor_partition@incr('beginTime'='20240312163541346')""" + + // incremental read for COW table + qt_inc_cow_table """select * from incr_cow_partition@incr('beginTime'='20240312164538551')""" + // skip logs sql """drop catalog if exists ${catalog_name};""" sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
