This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ec43f65235a7b247302f3a2ce959b68a48b1ec7f
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Mar 14 18:30:18 2024 +0800

    [feature](hudi) support hudi incremental read (#32052)
    
    * [feature](hudi) support incremental read for hudi table
    
    * fix jdk17 java options
---
 conf/be.conf                                       |   2 +-
 docs/en/docs/lakehouse/multi-catalog/hudi.md       |  33 ++-
 docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md    |  37 ++-
 .../java/org/apache/doris/hudi/HudiJniScanner.java |  12 +-
 .../org/apache/doris/hudi/BaseSplitReader.scala    |   8 +-
 .../apache/doris/hudi/HoodieRecordIterator.scala   |  67 ++++--
 .../doris/hudi/MORIncrementalSplitReader.scala     |  86 +++++++
 .../apache/doris/hudi/MORSnapshotSplitReader.scala |   2 +-
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   6 +-
 fe/fe-core/src/main/cup/sql_parser.cup             |  16 +-
 .../java/org/apache/doris/analysis/TableRef.java   |  14 ++
 .../org/apache/doris/analysis/TableScanParams.java |  46 ++++
 .../doris/datasource/hive/HMSExternalTable.java    | 100 +++++++-
 .../hudi/source/COWIncrementalRelation.java        | 254 +++++++++++++++++++++
 .../doris/datasource/hudi/source/HudiScanNode.java | 175 +++++++++-----
 .../hudi/source/IncrementalRelation.java           |  44 ++++
 .../hudi/source/MORIncrementalRelation.java        | 217 ++++++++++++++++++
 .../doris/nereids/analyzer/UnboundRelation.java    |  34 ++-
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   9 +-
 .../doris/nereids/rules/analysis/BindRelation.java |   8 +-
 .../doris/nereids/rules/analysis/CheckPolicy.java  |  25 +-
 .../data/external_table_p2/hive/test_hive_hudi.out |  12 +
 .../external_table_p2/hive/test_hive_hudi.groovy   |   6 +
 23 files changed, 1095 insertions(+), 118 deletions(-)

diff --git a/conf/be.conf b/conf/be.conf
index 6ceec00a421..4d8677aece7 100644
--- a/conf/be.conf
+++ b/conf/be.conf
@@ -95,4 +95,4 @@ ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
 # Default to turn off aws sdk log, because aws sdk errors that need to be 
cared will be output through Doris logs
 aws_log_level=0
 ## If you are not running in aws cloud, you can disable EC2 metadata
-AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
+AWS_EC2_METADATA_DISABLED=true
diff --git a/docs/en/docs/lakehouse/multi-catalog/hudi.md 
b/docs/en/docs/lakehouse/multi-catalog/hudi.md
index a52c2370ced..3a5420319b0 100644
--- a/docs/en/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/en/docs/lakehouse/multi-catalog/hudi.md
@@ -33,8 +33,8 @@ under the License.
 
 |  Table Type   | Supported Query types  |
 |  ----  | ----  |
-| Copy On Write  | Snapshot Query + Time Travel |
-| Merge On Read  | Snapshot Queries + Read Optimized Queries + Time Travel |
+| Copy On Write  | Snapshot Query, Time Travel, Icremental Read |
+| Merge On Read  | Snapshot Queries, Read Optimized Queries, Time Travel, 
Icremental Read |
 
 2. Doris supports Hive Metastore(Including catalogs compatible with Hive 
MetaStore, like [AWS Glue](./hive.md)/[Alibaba DLF](./dlf.md)) Catalogs.
 
@@ -94,16 +94,29 @@ Users can view the perfomace of Java SDK through 
[profile](../../admin-manual/ht
 
 ## Time Travel
 
-Supports reading snapshots specified in Hudi table.
-
-Every write operation to the Hudi table will generate a new snapshot.
-
-By default, query requests will only read the latest version of the snapshot.
+Every write operation to the Hudi table will generate a new snapshot. [Time 
Travel](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel) 
supports reading snapshots specified in Hudi table. By default, query requests 
will only read the latest version of the snapshot.
 
 You can use the `FOR TIME AS OF` statement, based on the time of the snapshot 
to read historical version data. Examples are as follows:
+```
+SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";
+SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";
+SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07";
+```
+Hudi table does not support the `FOR VERSION AS OF` statement. Using this 
syntax to query the Hudi table will throw an error.
 
-`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
+## Incremental Read
+Incremental Read obtains a set of records that changed between a start and end 
commit time, providing you with the "latest state" for each such record as of 
the end commit time.
 
-`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
+Doris uses `@incr` syntax to support Incremental Read:
+```
+SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], 
['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...);
+```
+`beginTime` is required, the time format is consistent with 
[hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query),
 and also supports "earliest". `endTime` is optional, default to latest commit 
time. The remaining optional parameters can be [Spark Read 
Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options).
 
-Hudi table does not support the `FOR VERSION AS OF` statement. Using this 
syntax to query the Hudi table will throw an error.
+Incremental Read should turn on Nereids Planner. Doris translates `@incr` as 
`predicates` and pushdown to `VHUDI_SCAN_NODE`:
+```
+|   0:VHUDI_SCAN_NODE(113)                                                     
                                       |
+|      table: lineitem_mor                                                     
                                       |
+|      predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), 
(_hoodie_commit_time[#0] <= '20240311151606605') |
+|      inputSplitNum=1, totalFileSize=13099711, scanRanges=1                   
                                       |
+```
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md 
b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
index 38bb26d3bc7..b7f2776e38d 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hudi.md
@@ -29,12 +29,12 @@ under the License.
 
 ## 使用限制
 
-1. Hudi 表支持的查询类型如下,后续将支持 Incremental Query。
+1. Hudi 表支持的查询类型如下,后续将支持 CDC。
 
 |  表类型   | 支持的查询类型  |
 |  ----  | ----  |
-| Copy On Write  | Snapshot Query + Time Travel |
-| Merge On Read  | Snapshot Queries + Read Optimized Queries + Time Travel |
+| Copy On Write  | Snapshot Query, Time Travel, Icremental Read |
+| Merge On Read  | Snapshot Queries, Read Optimized Queries, Time Travel, 
Icremental Read |
 
 2. 目前支持 Hive Metastore 和兼容 Hive Metastore 类型(例如[AWS Glue](./hive.md)/[Alibaba 
DLF](./dlf.md))的 Catalog。
 
@@ -96,16 +96,29 @@ Doris 使用 parquet native reader 读取 COW 表的数据文件,使用 Java S
 
 ## Time Travel
 
-支持读取 Hudi 表指定的 Snapshot。
+每一次对 Hudi 表的写操作都会产生一个新的快照,Time Travel 支持读取 Hudi 表指定的 
Snapshot。默认情况下,查询请求只会读取最新版本的快照。
 
-每一次对 Hudi 表的写操作都会产生一个新的快照。
-
-默认情况下,查询请求只会读取最新版本的快照。
-
-可以使用 `FOR TIME AS OF` 
语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/quick-start-guide#time-travel-query)和Hudi官网保持一致)读取历史版本的数据。示例如下:
+可以使用 `FOR TIME AS OF` 
语句,根据快照的时间([时间格式](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#timetravel)和Hudi官网保持一致)读取历史版本的数据。示例如下:
+```
+SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";
+SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";
+SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07";
+```
+Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。
 
-`SELECT * FROM hudi_tbl FOR TIME AS OF "2022-10-07 17:20:37";`
+## Incremental Read
+Incremental Read 可以查询在 startTime 和 endTime 之间变化的数据,返回的结果集是数据在 endTime 的最终状态。
 
-`SELECT * FROM hudi_tbl FOR TIME AS OF "20221007172037";`
+Doris 提供了 `@incr` 语法支持 Incremental Read:
+```
+SELECT * from hudi_table@incr('beginTime'='xxx', ['endTime'='xxx'], 
['hoodie.read.timeline.holes.resolution.policy'='FAIL'], ...);
+```
+`beginTime` 是必须的,时间格式和 hudi 官网 
[hudi_table_changes](https://hudi.apache.org/docs/0.14.0/quick-start-guide/#incremental-query)
 保持一致,支持 "earliest"。`endTime` 选填,默认最新commitTime。兼容 [Spark Read 
Options](https://hudi.apache.org/docs/0.14.0/configurations#Read-Options)。
 
-Hudi 表不支持 `FOR VERSION AS OF` 语句,使用该语法查询 Hudi 表将抛错。
+支持 Incremental Read 
需要开启[新优化器](../../query-acceleration/nereids.md),新优化器默认打开。通过 `desc` 查看执行计划,可以发现 
Doris 将 `@incr` 转化为 `predicates` 下推给 `VHUDI_SCAN_NODE`:
+```
+|   0:VHUDI_SCAN_NODE(113)                                                     
                                       |
+|      table: lineitem_mor                                                     
                                       |
+|      predicates: (_hoodie_commit_time[#0] >= '20240311151019723'), 
(_hoodie_commit_time[#0] <= '20240311151606605') |
+|      inputSplitNum=1, totalFileSize=13099711, scanRanges=1                   
                                       |
+```
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index 0a1b69fcfb0..a284c7adcdd 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -160,9 +160,15 @@ public class HudiJniScanner extends JniScanner {
             cleanResolverLock.readLock().lock();
             try {
                 lastUpdateTime.set(System.currentTimeMillis());
-                recordIterator = HadoopUGI.ugiDoAs(
-                        
AuthenticationConfig.getKerberosConfig(split.hadoopConf()), () -> new 
MORSnapshotSplitReader(
-                                split).buildScanIterator(new Filter[0]));
+                if (split.incrementalRead()) {
+                    recordIterator = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
+                                    split.hadoopConf()),
+                            () -> new 
MORIncrementalSplitReader(split).buildScanIterator(new Filter[0]));
+                } else {
+                    recordIterator = 
HadoopUGI.ugiDoAs(AuthenticationConfig.getKerberosConfig(
+                                    split.hadoopConf()),
+                            () -> new 
MORSnapshotSplitReader(split).buildScanIterator(new Filter[0]));
+                }
                 if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() 
!= null) {
                     
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
                             threadId -> AVRO_RESOLVER_CACHE.get());
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index a730f2cd1b2..8229064163d 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -125,6 +125,10 @@ class HoodieSplit(private val params: jutil.Map[String, 
String]) {
     conf
   }
 
+  def incrementalRead: Boolean = {
+    
"true".equalsIgnoreCase(optParams.getOrElse("hoodie.datasource.read.incr.operation",
 "false"))
+  }
+
   // NOTE: In cases when Hive Metastore is used as catalog and the table is 
partitioned, schema in the HMS might contain
   //       Hive-specific partitioning columns created specifically for HMS to 
handle partitioning appropriately. In that
   //       case  we opt in to not be providing catalog's schema, and instead 
force Hudi relations to fetch the schema
@@ -169,6 +173,8 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
 
   protected val tableInformation: HoodieTableInformation = cache.get(split)
 
+  protected val timeline: HoodieTimeline = tableInformation.timeline
+
   protected val sparkSession: SparkSession = tableInformation.sparkSession
   protected val sqlContext: SQLContext = sparkSession.sqlContext
   imbueConfigs(sqlContext)
@@ -578,8 +584,6 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
     )
   }
 
-  protected val timeline: HoodieTimeline = tableInformation.timeline
-
   protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: 
Option[InternalSchema]): Configuration = {
     val internalSchema = 
internalSchemaOpt.getOrElse(InternalSchema.getEmptyInternalSchema)
     val querySchemaString = SerDeHelper.toJson(internalSchema)
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
index 6e2b7b31e54..f393e9e1246 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/HoodieRecordIterator.scala
@@ -20,11 +20,14 @@ package org.apache.doris.hudi
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
 import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.{DataSourceReadOptions, HoodieMergeOnReadFileSplit, 
HoodieTableSchema, HoodieTableState, LogFileIterator, RecordMergingFileIterator}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.catalyst.InternalRow
 
 import java.io.Closeable
+import java.util.function.Predicate
 
 /**
  * Class holding base-file readers for 3 different use-cases:
@@ -84,29 +87,61 @@ class HoodieMORRecordIterator(config: Configuration,
                               requiredSchema: HoodieTableSchema,
                               tableState: HoodieTableState,
                               mergeType: String,
-                              fileSplit: HoodieMergeOnReadFileSplit) extends 
Iterator[InternalRow] with Closeable {
+                              fileSplit: HoodieMergeOnReadFileSplit,
+                              includeStartTime: Boolean = false,
+                              startTimestamp: String = null,
+                              endTimestamp: String = null) extends 
Iterator[InternalRow] with Closeable {
   protected val maxCompactionMemoryInBytes: Long = config.getLongBytes(
     "hoodie.compaction.memory", 512 * 1024 * 1024)
 
-  protected val recordIterator: Iterator[InternalRow] = fileSplit match {
-    case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-      val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
-      projectedReader(dataFileOnlySplit.dataFile.get)
+  protected val recordIterator: Iterator[InternalRow] = {
+    val iter = fileSplit match {
+      case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
+        val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
+        projectedReader(dataFileOnlySplit.dataFile.get)
 
-    case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-      new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, config)
+      case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
+        new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, config)
 
-    case split => mergeType match {
-      case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
-        // val reader = fileReaders.requiredSchemaReaderSkipMerging
-        // new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, config)
-        throw new UnsupportedOperationException("Skip merge is optimized by 
native read")
+      case split => mergeType match {
+        case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
+          val reader = fileReaders.requiredSchemaReaderSkipMerging
+          new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, config)
 
-      case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
-        val reader = pickBaseFileReader()
-        new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, config)
+        case DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+          val reader = pickBaseFileReader()
+          new RecordMergingFileIterator(split, reader, tableSchema, 
requiredSchema, tableState, config)
 
-      case _ => throw new UnsupportedOperationException(s"Not supported merge 
type ($mergeType)")
+        case _ => throw new UnsupportedOperationException(s"Not supported 
merge type ($mergeType)")
+      }
+    }
+
+    val commitTimeMetadataFieldIdx = 
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+    val needsFiltering = commitTimeMetadataFieldIdx >= 0 && 
!StringUtils.isNullOrEmpty(startTimestamp) && 
!StringUtils.isNullOrEmpty(endTimestamp)
+    if (needsFiltering) {
+      val filterT: Predicate[InternalRow] = 
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
+      iter.filter(filterT.test)
+    }
+    else {
+      iter
+    }
+  }
+
+  private def getCommitTimeFilter(includeStartTime: Boolean, 
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
+    if (includeStartTime) {
+      new Predicate[InternalRow] {
+        override def test(row: InternalRow): Boolean = {
+          val commitTime = row.getString(commitTimeMetadataFieldIdx)
+          commitTime >= startTimestamp && commitTime <= endTimestamp
+        }
+      }
+    } else {
+      new Predicate[InternalRow] {
+        override def test(row: InternalRow): Boolean = {
+          val commitTime = row.getString(commitTimeMetadataFieldIdx)
+          commitTime > startTimestamp && commitTime <= endTimestamp
+        }
+      }
     }
   }
 
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
new file mode 100644
index 00000000000..73c87e29034
--- /dev/null
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORIncrementalSplitReader.scala
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.hudi
+
+import org.apache.hudi.HoodieTableSchema
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sources._
+
+/**
+ * Reference to Apache Hudi
+ * see <a 
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala";>MergeOnReadIncrementalRelation</a>
+ */
+class MORIncrementalSplitReader(override val split: HoodieSplit) extends 
MORSnapshotSplitReader(split) with IncrementalSplitReaderTrait {
+
+  override protected def composeIterator(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema,
+                                         requestedColumns: Array[String],
+                                         filters: Array[Filter]): 
Iterator[InternalRow] = {
+    // The only required filters are ones that make sure we're only fetching 
records that
+    // fall into incremental span of the timeline being queried
+    val requiredFilters = incrementalSpanRecordFilters
+    val optionalFilters = filters
+    val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
+
+    new HoodieMORRecordIterator(split.hadoopConf,
+      readers,
+      tableSchema,
+      requiredSchema,
+      tableState,
+      mergeType,
+      getFileSplit(),
+      includeStartTime = includeStartTime,
+      startTimestamp = startTs,
+      endTimestamp = endTs)
+  }
+
+}
+
+/**
+ * Reference to Apache Hudi
+ * see <a 
href="https://github.com/apache/hudi/blob/release-0.14.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala";>HoodieIncrementalRelationTrait</a>
+ */
+trait IncrementalSplitReaderTrait extends BaseSplitReader {
+  protected val includeStartTime: Boolean = 
"true".equalsIgnoreCase(optParams("hoodie.datasource.read.incr.includeStartTime"))
+  protected val startTs: String = 
optParams("hoodie.datasource.read.begin.instanttime")
+  protected val endTs: String = 
optParams("hoodie.datasource.read.end.instanttime")
+
+  // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
+  // scan collected by this relation
+  protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
+    val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+
+    val largerThanFilter = if (includeStartTime) {
+      GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
+    } else {
+      GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
+    }
+
+    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs)
+
+    Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
+  }
+
+  override lazy val mandatoryFields: Seq[String] = {
+    // NOTE: This columns are required for Incremental flow to be able to 
handle the rows properly, even in
+    //       cases when no columns are requested to be fetched (for ex, when 
using {@code count()} API)
+    Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
+      preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
+  }
+}
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
index e9958b231e7..07e236082ce 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/MORSnapshotSplitReader.scala
@@ -74,7 +74,7 @@ class MORSnapshotSplitReader(override val split: HoodieSplit) 
extends BaseSplitR
       getFileSplit())
   }
 
-  private def getFileSplit(): HoodieMergeOnReadFileSplit = {
+  protected def getFileSplit(): HoodieMergeOnReadFileSplit = {
     val logFiles = split.deltaFilePaths.map(new HoodieLogFile(_))
       
.sorted(Ordering.comparatorToOrdering(HoodieLogFile.getLogFileComparator)).toList
     val partitionedBaseFile = if (split.dataFilePath.isEmpty) {
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 089e5569e8d..acc6eecb51a 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -477,8 +477,12 @@ identifierSeq
     : ident+=errorCapturingIdentifier (COMMA ident+=errorCapturingIdentifier)*
     ;
 
+optScanParams
+    : ATSIGN funcName=identifier LEFT_PAREN (properties=propertyItemList)? 
RIGHT_PAREN
+    ;
+
 relationPrimary
-    : multipartIdentifier materializedViewName? specifiedPartition?
+    : multipartIdentifier optScanParams? materializedViewName? 
specifiedPartition?
        tabletList? tableAlias sample? relationHint? lateralView*           
#tableName
     | LEFT_PAREN query RIGHT_PAREN tableAlias lateralView*                 
#aliasedQuery
     | tvfName=identifier LEFT_PAREN
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index bec106761e3..f907b762407 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -839,6 +839,7 @@ nonterminal String opt_job_starts;
 nonterminal String opt_job_ends;
 nonterminal String job_at_time;
 nonterminal ColocateGroupName colocate_group_name;
+nonterminal TableScanParams opt_scan_params_ref;
 
 nonterminal LoadTask.MergeType opt_merge_type, opt_with_merge_type;
 
@@ -5762,6 +5763,17 @@ colocate_group_name ::=
     :}
     ;
 
+opt_scan_params_ref ::=
+    /* empty */
+    {:
+        RESULT = null;
+    :}
+    | AT ident:func_name LPAREN opt_key_value_map_in_paren:properties RPAREN
+    {:
+        RESULT = new TableScanParams(func_name, properties);
+    :}
+    ;
+
 encryptkey_name ::=
     ident:name
     {:
@@ -5882,9 +5894,9 @@ base_table_ref_list ::=
   ;
 
 base_table_ref ::=
-    table_name:name opt_table_snapshot:tableSnapshot 
opt_partition_names:partitionNames opt_tablet_list:tabletIds 
opt_table_alias:alias opt_table_sample:tableSample opt_common_hints:commonHints
+    table_name:name  opt_scan_params_ref:scanParams 
opt_table_snapshot:tableSnapshot opt_partition_names:partitionNames 
opt_tablet_list:tabletIds opt_table_alias:alias opt_table_sample:tableSample 
opt_common_hints:commonHints
     {:
-        RESULT = new TableRef(name, alias, partitionNames, tabletIds, 
tableSample, commonHints, tableSnapshot);
+        RESULT = new TableRef(name, alias, partitionNames, tabletIds, 
tableSample, commonHints, tableSnapshot, scanParams);
     :}
     ;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 0ade6730c0d..13821a510c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -140,6 +140,8 @@ public class TableRef implements ParseNode, Writable {
 
     private TableSnapshot tableSnapshot;
 
+    private TableScanParams scanParams;
+
     // END: Members that need to be reset()
     // ///////////////////////////////////////
 
@@ -169,6 +171,12 @@ public class TableRef implements ParseNode, Writable {
 
     public TableRef(TableName name, String alias, PartitionNames 
partitionNames, ArrayList<Long> sampleTabletIds,
                     TableSample tableSample, ArrayList<String> commonHints, 
TableSnapshot tableSnapshot) {
+        this(name, alias, partitionNames, sampleTabletIds, tableSample, 
commonHints, tableSnapshot, null);
+    }
+
+    public TableRef(TableName name, String alias, PartitionNames 
partitionNames,
+                    ArrayList<Long> sampleTabletIds, TableSample tableSample, 
ArrayList<String> commonHints,
+                    TableSnapshot tableSnapshot, TableScanParams scanParams) {
         this.name = name;
         if (alias != null) {
             if (Env.isStoredTableNamesLowerCase()) {
@@ -184,6 +192,7 @@ public class TableRef implements ParseNode, Writable {
         this.tableSample = tableSample;
         this.commonHints = commonHints;
         this.tableSnapshot = tableSnapshot;
+        this.scanParams = scanParams;
         isAnalyzed = false;
     }
 
@@ -204,6 +213,7 @@ public class TableRef implements ParseNode, Writable {
         onClause = (other.onClause != null) ? other.onClause.clone().reset() : 
null;
         partitionNames = (other.partitionNames != null) ? new 
PartitionNames(other.partitionNames) : null;
         tableSnapshot = (other.tableSnapshot != null) ? new 
TableSnapshot(other.tableSnapshot) : null;
+        scanParams = other.scanParams;
         tableSample = (other.tableSample != null) ? new 
TableSample(other.tableSample) : null;
         commonHints = other.commonHints;
 
@@ -333,6 +343,10 @@ public class TableRef implements ParseNode, Writable {
         return desc != null;
     }
 
+    public TableScanParams getScanParams() {
+        return scanParams;
+    }
+
     /**
      * This method should only be called after the TableRef has been analyzed.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
new file mode 100644
index 00000000000..ab1491ccc3a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableScanParams.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public class TableScanParams {
+    public static String INCREMENTAL_READ = "incr";
+
+    private final String paramType;
+    private final Map<String, String> params;
+
+    public TableScanParams(String paramType, Map<String, String> params) {
+        this.paramType = paramType;
+        this.params = params == null ? ImmutableMap.of() : 
ImmutableMap.copyOf(params);
+    }
+
+    public String getParamType() {
+        return paramType;
+    }
+
+    public Map<String, String> getParams() {
+        return params;
+    }
+
+    public boolean incrementalRead() {
+        return INCREMENTAL_READ.equals(paramType);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 50eac67deff..a788d9e57bc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.hive;
 
+import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.ListPartitionItem;
@@ -28,12 +29,23 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.datasource.hudi.HudiUtils;
+import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
+import org.apache.doris.datasource.hudi.source.IncrementalRelation;
+import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
 import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.mtmv.MTMVSnapshotIf;
 import org.apache.doris.mtmv.MTMVTimestampSnapshot;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.LessThanEqual;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
 import org.apache.doris.statistics.AnalysisInfo;
 import org.apache.doris.statistics.BaseAnalysisTask;
 import org.apache.doris.statistics.ColumnStatistic;
@@ -45,6 +57,7 @@ import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -61,6 +74,7 @@ import 
org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -68,6 +82,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.time.LocalDate;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -147,6 +162,10 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     // record the event update time when enable hms event listener
     protected volatile long eventUpdateTime;
 
+    // for hudi incremental read
+    private TableScanParams scanParams = null;
+    private IncrementalRelation incrementalRelation = null;
+
     public enum DLAType {
         UNKNOWN, HIVE, HUDI, ICEBERG
     }
@@ -224,7 +243,10 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
             return false;
         }
         String inputFormatName = remoteTable.getSd().getInputFormat();
-        return 
"org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName);
+        Map<String, String> params = remoteTable.getParameters();
+        return 
"org.apache.hudi.hadoop.HoodieParquetInputFormat".equals(inputFormatName)
+                || 
"skip_merge".equals(getCatalogProperties().get("hoodie.datasource.merge.type"))
+                || (params != null && 
"COPY_ON_WRITE".equalsIgnoreCase(params.get("flink.table.type")));
     }
 
     /**
@@ -281,6 +303,82 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         return partitionColumns;
     }
 
+    public TableScanParams getScanParams() {
+        return scanParams;
+    }
+
+    public void setScanParams(TableScanParams scanParams) {
+        if (scanParams != null && scanParams.incrementalRead()) {
+            Map<String, String> optParams = getHadoopProperties();
+            if (scanParams.getParams().containsKey("beginTime")) {
+                optParams.put("hoodie.datasource.read.begin.instanttime", 
scanParams.getParams().get("beginTime"));
+            }
+            if (scanParams.getParams().containsKey("endTime")) {
+                optParams.put("hoodie.datasource.read.end.instanttime", 
scanParams.getParams().get("endTime"));
+            }
+            scanParams.getParams().forEach((k, v) -> {
+                if (k.startsWith("hoodie.")) {
+                    optParams.put(k, v);
+                }
+            });
+            HoodieTableMetaClient hudiClient = 
HiveMetaStoreClientHelper.getHudiClient(this);
+            try {
+                boolean isCowOrRoTable = isHoodieCowTable();
+                if (isCowOrRoTable) {
+                    Map<String, String> serd = 
remoteTable.getSd().getSerdeInfo().getParameters();
+                    if ("true".equals(serd.get("hoodie.query.as.ro.table"))
+                            && remoteTable.getTableName().endsWith("_ro")) {
+                        // Incremental read RO table as RT table, I don't know 
why?
+                        isCowOrRoTable = false;
+                        LOG.warn("Execute incremental read on RO table");
+                    }
+                }
+                if (isCowOrRoTable) {
+                    incrementalRelation = new COWIncrementalRelation(
+                            optParams, 
HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
+                } else {
+                    incrementalRelation = new MORIncrementalRelation(
+                            optParams, 
HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to create incremental relation", e);
+            }
+        }
+        this.scanParams = scanParams;
+    }
+
+    public IncrementalRelation getIncrementalRelation() {
+        return incrementalRelation;
+    }
+
+    /**
+     * replace incremental params as AND expression
+     * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
+     * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <= 
'20240308110677278'
+     */
+    public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
+        if (incrementalRelation == null) {
+            return Collections.emptySet();
+        }
+        SlotReference timeField = null;
+        for (Slot slot : slots) {
+            if ("_hoodie_commit_time".equals(slot.getName())) {
+                timeField = (SlotReference) slot;
+                break;
+            }
+        }
+        if (timeField == null) {
+            return Collections.emptySet();
+        }
+        StringLiteral upperValue = new 
StringLiteral(incrementalRelation.getEndTs());
+        StringLiteral lowerValue = new 
StringLiteral(incrementalRelation.getStartTs());
+        ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
+        ComparisonPredicate great = incrementalRelation.isIncludeStartTime()
+                ? new GreaterThanEqual(timeField, lowerValue)
+                : new GreaterThan(timeField, lowerValue);
+        return ImmutableSet.of(great, less);
+    }
+
     public boolean isHiveTransactionalTable() {
         return dlaType == DLAType.HIVE && 
AcidUtils.isTransactionalTable(remoteTable)
                 && isSupportedTransactionalFileFormat();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
new file mode 100644
index 00000000000..fa24dc53e56
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/COWIncrementalRelation.java
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.datasource.FileSplit;
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class COWIncrementalRelation implements IncrementalRelation {
+    private final Map<String, String> optParams;
+    private final HoodieTableMetaClient metaClient;
+    private final HollowCommitHandling hollowCommitHandling;
+    private final boolean startInstantArchived;
+    private final boolean endInstantArchived;
+    private final boolean fullTableScan;
+    private final FileSystem fs;
+    private final Map<String, HoodieWriteStat> fileToWriteStat;
+    private final Collection<String> filteredRegularFullPaths;
+    private final Collection<String> filteredMetaBootstrapFullPaths;
+
+    private final boolean includeStartTime;
+    private final String startTs;
+    private final String endTs;
+
+    public COWIncrementalRelation(Map<String, String> optParams, Configuration 
configuration,
+            HoodieTableMetaClient metaClient)
+            throws HoodieException, IOException {
+        this.optParams = optParams;
+        this.metaClient = metaClient;
+        hollowCommitHandling = HollowCommitHandling.valueOf(
+                
optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL"));
+        HoodieTimeline commitTimeline = 
TimelineUtils.handleHollowCommitIfNeeded(
+                metaClient.getCommitTimeline().filterCompletedInstants(), 
metaClient, hollowCommitHandling);
+        if (commitTimeline.empty()) {
+            throw new HoodieException("No instants to incrementally pull");
+        }
+        if (!metaClient.getTableConfig().populateMetaFields()) {
+            throw new HoodieException("Incremental queries are not supported 
when meta fields are disabled");
+        }
+        HoodieInstant lastInstant = commitTimeline.lastInstant().get();
+        String startInstantTime = 
optParams.get("hoodie.datasource.read.begin.instanttime");
+        if (startInstantTime == null) {
+            throw new HoodieException("Specify the begin instant time to pull 
from using "
+                    + "option hoodie.datasource.read.begin.instanttime");
+        }
+        if (EARLIEST_TIME.equals(startInstantTime)) {
+            startInstantTime = "000";
+        }
+        String endInstantTime = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
+                lastInstant.getTimestamp());
+        startInstantArchived = 
commitTimeline.isBeforeTimelineStarts(startInstantTime);
+        endInstantArchived = 
commitTimeline.isBeforeTimelineStarts(endInstantTime);
+
+        HoodieTimeline commitsTimelineToReturn;
+        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
+            commitsTimelineToReturn = 
commitTimeline.findInstantsInRangeByStateTransitionTime(startInstantTime,
+                    lastInstant.getStateTransitionTime());
+        } else {
+            commitsTimelineToReturn = 
commitTimeline.findInstantsInRange(startInstantTime, 
lastInstant.getTimestamp());
+        }
+        List<HoodieInstant> commitsToReturn = 
commitsTimelineToReturn.getInstants();
+
+        // todo: support configuration hoodie.datasource.read.incr.filters
+        Path basePath = metaClient.getBasePathV2();
+        Map<String, String> regularFileIdToFullPath = new HashMap<>();
+        Map<String, String> metaBootstrapFileIdToFullPath = new HashMap<>();
+        HoodieTimeline replacedTimeline = 
commitsTimelineToReturn.getCompletedReplaceTimeline();
+        Map<String, String> replacedFile = new HashMap<>();
+        for (HoodieInstant instant : replacedTimeline.getInstants()) {
+            
HoodieReplaceCommitMetadata.fromBytes(metaClient.getActiveTimeline().getInstantDetails(instant).get(),
+                    
HoodieReplaceCommitMetadata.class).getPartitionToReplaceFileIds().forEach(
+                        (key, value) -> value.forEach(
+                            e -> replacedFile.put(e, 
FSUtils.getPartitionPath(basePath, key).toString())));
+        }
+
+        fileToWriteStat = new HashMap<>();
+        for (HoodieInstant commit : commitsToReturn) {
+            HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
+                    commitTimeline.getInstantDetails(commit).get(), 
HoodieCommitMetadata.class);
+            metadata.getPartitionToWriteStats().forEach((partition, stats) -> {
+                for (HoodieWriteStat stat : stats) {
+                    fileToWriteStat.put(FSUtils.getPartitionPath(basePath, 
stat.getPath()).toString(), stat);
+                }
+            });
+            if 
(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS.equals(commit.getTimestamp())) {
+                metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
+                    if (!(replacedFile.containsKey(k) && 
v.startsWith(replacedFile.get(k)))) {
+                        metaBootstrapFileIdToFullPath.put(k, v);
+                    }
+                });
+            } else {
+                metadata.getFileIdAndFullPaths(basePath).forEach((k, v) -> {
+                    if (!(replacedFile.containsKey(k) && 
v.startsWith(replacedFile.get(k)))) {
+                        regularFileIdToFullPath.put(k, v);
+                    }
+                });
+            }
+        }
+
+        if (!metaBootstrapFileIdToFullPath.isEmpty()) {
+            // filer out meta bootstrap files that have had more commits since 
metadata bootstrap
+            metaBootstrapFileIdToFullPath.entrySet().removeIf(e -> 
regularFileIdToFullPath.containsKey(e.getKey()));
+        }
+        String pathGlobPattern = 
optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");
+        if ("".equals(pathGlobPattern)) {
+            filteredRegularFullPaths = regularFileIdToFullPath.values();
+            filteredMetaBootstrapFullPaths = 
metaBootstrapFileIdToFullPath.values();
+        } else {
+            GlobPattern globMatcher = new GlobPattern("*" + pathGlobPattern);
+            filteredRegularFullPaths = 
regularFileIdToFullPath.values().stream().filter(globMatcher::matches)
+                    .collect(Collectors.toList());
+            filteredMetaBootstrapFullPaths = 
metaBootstrapFileIdToFullPath.values().stream()
+                    .filter(globMatcher::matches).collect(Collectors.toList());
+
+        }
+
+        fs = basePath.getFileSystem(configuration);
+        fullTableScan = shouldFullTableScan();
+        includeStartTime = !fullTableScan;
+        if (fullTableScan || commitsToReturn.isEmpty()) {
+            startTs = startInstantTime;
+            endTs = endInstantTime;
+        } else {
+            startTs = commitsToReturn.get(0).getTimestamp();
+            endTs = commitsToReturn.get(commitsToReturn.size() - 
1).getTimestamp();
+        }
+    }
+
+    private boolean shouldFullTableScan() throws HoodieException, IOException {
+        boolean fallbackToFullTableScan = Boolean.parseBoolean(
+                
optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable",
 "false"));
+        if (fallbackToFullTableScan && (startInstantArchived || 
endInstantArchived)) {
+            if (hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME) {
+                throw new HoodieException("Cannot use stateTransitionTime 
while enables full table scan");
+            }
+            return true;
+        }
+        if (fallbackToFullTableScan) {
+            for (String path : filteredMetaBootstrapFullPaths) {
+                if (!fs.exists(new Path(path))) {
+                    return true;
+                }
+            }
+            for (String path : filteredRegularFullPaths) {
+                if (!fs.exists(new Path(path))) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public List<FileSlice> collectFileSlices() throws HoodieException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<Split> collectSplits() throws HoodieException {
+        if (fullTableScan) {
+            throw new HoodieException("Fallback to full table scan");
+        }
+        if (filteredRegularFullPaths.isEmpty() && 
filteredMetaBootstrapFullPaths.isEmpty()) {
+            return Collections.emptyList();
+        }
+        List<Split> splits = new ArrayList<>();
+        Option<String[]> partitionColumns = 
metaClient.getTableConfig().getPartitionFields();
+        List<String> partitionNames = partitionColumns.isPresent() ? 
Arrays.asList(partitionColumns.get())
+                : Collections.emptyList();
+        for (String baseFile : filteredMetaBootstrapFullPaths) {
+            HoodieWriteStat stat = fileToWriteStat.get(baseFile);
+            splits.add(new FileSplit(new Path(baseFile), 0, 
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
+                    new String[0],
+                    
HudiPartitionProcessor.parsePartitionValues(partitionNames, 
stat.getPartitionPath())));
+        }
+        for (String baseFile : filteredRegularFullPaths) {
+            HoodieWriteStat stat = fileToWriteStat.get(baseFile);
+            splits.add(new FileSplit(new Path(baseFile), 0, 
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
+                    new String[0],
+                    
HudiPartitionProcessor.parsePartitionValues(partitionNames, 
stat.getPartitionPath())));
+        }
+        return splits;
+    }
+
+    @Override
+    public Map<String, String> getHoodieParams() {
+        optParams.put("hoodie.datasource.read.incr.operation", "true");
+        optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
+        optParams.put("hoodie.datasource.read.end.instanttime", endTs);
+        optParams.put("hoodie.datasource.read.incr.includeStartTime", 
includeStartTime ? "true" : "false");
+        return optParams;
+    }
+
+    @Override
+    public boolean fallbackFullTableScan() {
+        return fullTableScan;
+    }
+
+    @Override
+    public boolean isIncludeStartTime() {
+        return includeStartTime;
+    }
+
+    @Override
+    public String getStartTs() {
+        return startTs;
+    }
+
+    @Override
+    public String getEndTs() {
+        return endTs;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index dfbb12e8584..58068c575d7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.hudi.source;
 
+import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.PartitionItem;
@@ -50,6 +51,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -63,6 +65,7 @@ import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -78,12 +81,22 @@ public class HudiScanNode extends HiveScanNode {
 
     private static final Logger LOG = LogManager.getLogger(HudiScanNode.class);
 
-    private final boolean isCowOrRoTable;
+    private boolean isCowOrRoTable;
 
     private final AtomicLong noLogsSplitNum = new AtomicLong(0);
 
     private final boolean useHiveSyncPartition;
 
+    private HoodieTableMetaClient hudiClient;
+    private String basePath;
+    private String inputFormat;
+    private String serdeLib;
+    private List<String> columnNames;
+    private List<String> columnTypes;
+
+    private boolean incrementalRead = false;
+    private IncrementalRelation incrementalRelation;
+
     /**
      * External file scan node for Query Hudi table
      * needCheckColumnPriv: Some of ExternalFileScanNode do not need to check 
column priv
@@ -92,10 +105,7 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
         super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv);
-        Map<String, String> paras = hmsTable.getRemoteTable().getParameters();
-        isCowOrRoTable = hmsTable.isHoodieCowTable()
-                || 
"skip_merge".equals(hmsTable.getCatalogProperties().get("hoodie.datasource.merge.type"))
-                || (paras != null && 
"COPY_ON_WRITE".equalsIgnoreCase(paras.get("flink.table.type")));
+        isCowOrRoTable = hmsTable.isHoodieCowTable();
         if (isCowOrRoTable) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Hudi table {} can read as cow/read optimize table", 
hmsTable.getName());
@@ -129,12 +139,66 @@ public class HudiScanNode extends HiveScanNode {
         computeColumnsFilter();
         initBackendPolicy();
         initSchemaParams();
+
+        hudiClient = HiveMetaStoreClientHelper.getHudiClient(hmsTable);
+        hudiClient.reloadActiveTimeline();
+        basePath = hmsTable.getRemoteTable().getSd().getLocation();
+        inputFormat = hmsTable.getRemoteTable().getSd().getInputFormat();
+        serdeLib = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
+        columnNames = new ArrayList<>();
+        columnTypes = new ArrayList<>();
+        TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient);
+        Schema hudiSchema;
+        try {
+            hudiSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
+        } catch (Exception e) {
+            throw new UserException("Cannot get hudi table schema.");
+        }
+        for (Schema.Field hudiField : hudiSchema.getFields()) {
+            columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
+            String columnType = 
HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
+            columnTypes.add(columnType);
+        }
+
+        TableScanParams scanParams = desc.getRef().getScanParams();
+        if (scanParams != null) {
+            throw new UserException("Incremental read should turn on nereids 
planner");
+        }
+        scanParams = hmsTable.getScanParams();
+        if (scanParams != null) {
+            if (scanParams.incrementalRead()) {
+                incrementalRead = true;
+            } else {
+                throw new UserException("Not support function '" + 
scanParams.getParamType() + "' in hudi table");
+            }
+        }
+        if (incrementalRead) {
+            if (isCowOrRoTable) {
+                try {
+                    Map<String, String> serd = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters();
+                    if ("true".equals(serd.get("hoodie.query.as.ro.table"))
+                            && 
hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
+                        // Incremental read RO table as RT table, I don't know 
why?
+                        isCowOrRoTable = false;
+                        LOG.warn("Execute incremental read on RO table");
+                    }
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            incrementalRelation = hmsTable.getIncrementalRelation();
+            if (incrementalRelation == null) {
+                throw new UserException("Failed to create incremental 
relation");
+            }
+        } else {
+            incrementalRelation = null;
+        }
     }
 
     @Override
     protected Map<String, String> getLocationProperties() throws UserException 
{
-        if (isCowOrRoTable) {
-            return super.getLocationProperties();
+        if (incrementalRead) {
+            return incrementalRelation.getHoodieParams();
         } else {
             // HudiJniScanner uses hadoop client to read data.
             return hmsTable.getHadoopProperties();
@@ -176,7 +240,7 @@ public class HudiScanNode extends HiveScanNode {
             TablePartitionValues partitionValues;
             if (snapshotTimestamp.isPresent()) {
                 partitionValues = processor.getSnapshotPartitionValues(
-                    hmsTable, metaClient, snapshotTimestamp.get(), 
useHiveSyncPartition);
+                        hmsTable, metaClient, snapshotTimestamp.get(), 
useHiveSyncPartition);
             } else {
                 partitionValues = processor.getPartitionValues(hmsTable, 
metaClient, useHiveSyncPartition);
             }
@@ -222,28 +286,24 @@ public class HudiScanNode extends HiveScanNode {
         return Lists.newArrayList(dummyPartition);
     }
 
-    @Override
-    public List<Split> getSplits() throws UserException {
-        HoodieTableMetaClient hudiClient = 
HiveMetaStoreClientHelper.getHudiClient(hmsTable);
-        hudiClient.reloadActiveTimeline();
-        String basePath = hmsTable.getRemoteTable().getSd().getLocation();
-        String inputFormat = 
hmsTable.getRemoteTable().getSd().getInputFormat();
-        String serdeLib = 
hmsTable.getRemoteTable().getSd().getSerdeInfo().getSerializationLib();
-
-        TableSchemaResolver schemaUtil = new TableSchemaResolver(hudiClient);
-        Schema hudiSchema;
-        try {
-            hudiSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema());
-        } catch (Exception e) {
-            throw new RuntimeException("Cannot get hudi table schema.");
+    private List<Split> getIncrementalSplits() {
+        if (isCowOrRoTable) {
+            List<Split> splits = incrementalRelation.collectSplits();
+            noLogsSplitNum.addAndGet(splits.size());
+            return splits;
         }
+        Option<String[]> partitionColumns = 
hudiClient.getTableConfig().getPartitionFields();
+        List<String> partitionNames = partitionColumns.isPresent() ? 
Arrays.asList(partitionColumns.get())
+                : Collections.emptyList();
+        return incrementalRelation.collectFileSlices().stream().map(fileSlice 
-> generateHudiSplit(fileSlice,
+                HudiPartitionProcessor.parsePartitionValues(partitionNames, 
fileSlice.getPartitionPath()),
+                incrementalRelation.getEndTs())).collect(Collectors.toList());
+    }
 
-        List<String> columnNames = new ArrayList<>();
-        List<String> columnTypes = new ArrayList<>();
-        for (Schema.Field hudiField : hudiSchema.getFields()) {
-            columnNames.add(hudiField.name().toLowerCase(Locale.ROOT));
-            String columnType = 
HudiUtils.fromAvroHudiTypeToHiveTypeString(hudiField.schema());
-            columnTypes.add(columnType);
+    @Override
+    public List<Split> getSplits() throws UserException {
+        if (incrementalRead && !incrementalRelation.fallbackFullTableScan()) {
+            return getIncrementalSplits();
         }
 
         HoodieTimeline timeline = 
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
@@ -300,33 +360,9 @@ public class HudiScanNode extends HiveScanNode {
                             new String[0], partition.getPartitionValues()));
                 });
             } else {
-                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, 
queryInstant).forEach(fileSlice -> {
-                    Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
-                    String filePath = 
baseFile.map(BaseFile::getPath).orElse("");
-                    long fileSize = 
baseFile.map(BaseFile::getFileSize).orElse(0L);
-
-                    List<String> logs = 
fileSlice.getLogFiles().map(HoodieLogFile::getPath)
-                            .map(Path::toString)
-                            .collect(Collectors.toList());
-                    if (logs.isEmpty()) {
-                        noLogsSplitNum.incrementAndGet();
-                    }
-
-                    // no base file, use log file to parse file type
-                    String agencyPath = filePath.isEmpty() ? logs.get(0) : 
filePath;
-                    HudiSplit split = new HudiSplit(new Path(agencyPath), 0, 
fileSize, fileSize,
-                            new String[0], partition.getPartitionValues());
-                    split.setTableFormatType(TableFormatType.HUDI);
-                    split.setDataFilePath(filePath);
-                    split.setHudiDeltaLogs(logs);
-                    split.setInputFormat(inputFormat);
-                    split.setSerde(serdeLib);
-                    split.setBasePath(basePath);
-                    split.setHudiColumnNames(columnNames);
-                    split.setHudiColumnTypes(columnTypes);
-                    split.setInstantTime(queryInstant);
-                    splits.add(split);
-                });
+                
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant)
+                        .forEach(fileSlice -> splits.add(
+                                generateHudiSplit(fileSlice, 
partition.getPartitionValues(), queryInstant)));
             }
             countDownLatch.countDown();
         }));
@@ -338,6 +374,35 @@ public class HudiScanNode extends HiveScanNode {
         return splits;
     }
 
+    private HudiSplit generateHudiSplit(FileSlice fileSlice, List<String> 
partitionValues, String queryInstant) {
+        Optional<HoodieBaseFile> baseFile = 
fileSlice.getBaseFile().toJavaOptional();
+        String filePath = baseFile.map(BaseFile::getPath).orElse("");
+        long fileSize = baseFile.map(BaseFile::getFileSize).orElse(0L);
+        fileSlice.getPartitionPath();
+
+        List<String> logs = fileSlice.getLogFiles().map(HoodieLogFile::getPath)
+                .map(Path::toString)
+                .collect(Collectors.toList());
+        if (logs.isEmpty()) {
+            noLogsSplitNum.incrementAndGet();
+        }
+
+        // no base file, use log file to parse file type
+        String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
+        HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, 
fileSize,
+                new String[0], partitionValues);
+        split.setTableFormatType(TableFormatType.HUDI);
+        split.setDataFilePath(filePath);
+        split.setHudiDeltaLogs(logs);
+        split.setInputFormat(inputFormat);
+        split.setSerde(serdeLib);
+        split.setBasePath(basePath);
+        split.setHudiColumnNames(columnNames);
+        split.setHudiColumnTypes(columnTypes);
+        split.setInstantTime(queryInstant);
+        return split;
+    }
+
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         return super.getNodeExplainString(prefix, detailLevel)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
new file mode 100644
index 00000000000..4a707064fb6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/IncrementalRelation.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.spi.Split;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.List;
+import java.util.Map;
+
+public interface IncrementalRelation {
+    public static String EARLIEST_TIME = "earliest";
+
+    List<FileSlice> collectFileSlices() throws HoodieException;
+
+    List<Split> collectSplits() throws HoodieException;
+
+    Map<String, String> getHoodieParams();
+
+    boolean fallbackFullTableScan();
+
+    boolean isIncludeStartTime();
+
+    String getStartTs();
+
+    String getEndTs();
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
new file mode 100644
index 00000000000..c06fcc2a578
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/MORIncrementalRelation.java
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hudi.source;
+
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MORIncrementalRelation implements IncrementalRelation {
+    private final Map<String, String> optParams;
+    private final HoodieTableMetaClient metaClient;
+    private final HoodieTimeline timeline;
+    private final HollowCommitHandling hollowCommitHandling;
+    private String startTimestamp;
+    private final String endTimestamp;
+    private final boolean startInstantArchived;
+    private final boolean endInstantArchived;
+    private final List<HoodieInstant> includedCommits;
+    private final List<HoodieCommitMetadata> commitsMetadata;
+    private final FileStatus[] affectedFilesInCommits;
+    private final boolean fullTableScan;
+    private final String globPattern;
+    private final boolean includeStartTime;
+    private final String startTs;
+    private final String endTs;
+
+
+    public MORIncrementalRelation(Map<String, String> optParams, Configuration 
configuration,
+            HoodieTableMetaClient metaClient)
+            throws HoodieException, IOException {
+        this.optParams = optParams;
+        this.metaClient = metaClient;
+        timeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+        if (timeline.empty()) {
+            throw new HoodieException("No instants to incrementally pull");
+        }
+        if (!metaClient.getTableConfig().populateMetaFields()) {
+            throw new HoodieException("Incremental queries are not supported 
when meta fields are disabled");
+        }
+        hollowCommitHandling = HollowCommitHandling.valueOf(
+                
optParams.getOrDefault("hoodie.read.timeline.holes.resolution.policy", "FAIL"));
+
+        startTimestamp = 
optParams.get("hoodie.datasource.read.begin.instanttime");
+        if (startTimestamp == null) {
+            throw new HoodieException("Specify the begin instant time to pull 
from using "
+                    + "option hoodie.datasource.read.begin.instanttime");
+        }
+        if (EARLIEST_TIME.equals(startTimestamp)) {
+            startTimestamp = "000";
+        }
+        endTimestamp = 
optParams.getOrDefault("hoodie.datasource.read.end.instanttime",
+                hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME
+                        ? timeline.lastInstant().get().getStateTransitionTime()
+                        : timeline.lastInstant().get().getTimestamp());
+
+        startInstantArchived = timeline.isBeforeTimelineStarts(startTimestamp);
+        endInstantArchived = timeline.isBeforeTimelineStarts(endTimestamp);
+
+        includedCommits = getIncludedCommits();
+        commitsMetadata = getCommitsMetadata();
+        affectedFilesInCommits = 
HoodieInputFormatUtils.listAffectedFilesForCommits(configuration,
+                new Path(metaClient.getBasePath()), commitsMetadata);
+        fullTableScan = shouldFullTableScan();
+        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME 
&& fullTableScan) {
+            throw new HoodieException("Cannot use stateTransitionTime while 
enables full table scan");
+        }
+        globPattern = 
optParams.getOrDefault("hoodie.datasource.read.incr.path.glob", "");
+
+        if (startInstantArchived) {
+            includeStartTime = false;
+            startTs = startTimestamp;
+        } else {
+            includeStartTime = true;
+            startTs = includedCommits.isEmpty() ? startTimestamp : 
includedCommits.get(0).getTimestamp();
+        }
+        endTs = endInstantArchived || includedCommits.isEmpty() ? endTimestamp
+                : includedCommits.get(includedCommits.size() - 
1).getTimestamp();
+    }
+
+    @Override
+    public Map<String, String> getHoodieParams() {
+        optParams.put("hoodie.datasource.read.incr.operation", "true");
+        optParams.put("hoodie.datasource.read.begin.instanttime", startTs);
+        optParams.put("hoodie.datasource.read.end.instanttime", endTs);
+        optParams.put("hoodie.datasource.read.incr.includeStartTime", 
includeStartTime ? "true" : "false");
+        return optParams;
+    }
+
+    private List<HoodieInstant> getIncludedCommits() {
+        if (!startInstantArchived || !endInstantArchived) {
+            // If endTimestamp commit is not archived, will filter instants
+            // before endTimestamp.
+            if (hollowCommitHandling == 
HollowCommitHandling.USE_TRANSITION_TIME) {
+                return 
timeline.findInstantsInRangeByStateTransitionTime(startTimestamp, 
endTimestamp).getInstants();
+            } else {
+                return timeline.findInstantsInRange(startTimestamp, 
endTimestamp).getInstants();
+            }
+        } else {
+            return timeline.getInstants();
+        }
+    }
+
+    private List<HoodieCommitMetadata> getCommitsMetadata() throws IOException 
{
+        List<HoodieCommitMetadata> result = new ArrayList<>();
+        for (HoodieInstant commit : includedCommits) {
+            result.add(TimelineUtils.getCommitMetadata(commit, timeline));
+        }
+        return result;
+    }
+
+    private boolean shouldFullTableScan() throws IOException {
+        boolean should = Boolean.parseBoolean(
+                
optParams.getOrDefault("hoodie.datasource.read.incr.fallback.fulltablescan.enable",
 "false")) && (
+                startInstantArchived || endInstantArchived);
+        if (should) {
+            return true;
+        }
+        for (FileStatus fileStatus : affectedFilesInCommits) {
+            if (!metaClient.getFs().exists(fileStatus.getPath())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public boolean fallbackFullTableScan() {
+        return fullTableScan;
+    }
+
+    @Override
+    public boolean isIncludeStartTime() {
+        return includeStartTime;
+    }
+
+    @Override
+    public String getStartTs() {
+        return startTs;
+    }
+
+    @Override
+    public String getEndTs() {
+        return endTs;
+    }
+
+    @Override
+    public List<FileSlice> collectFileSlices() throws HoodieException {
+        if (includedCommits.isEmpty()) {
+            return Collections.emptyList();
+        } else if (fullTableScan) {
+            throw new HoodieException("Fallback to full table scan");
+        }
+        HoodieTimeline scanTimeline;
+        if (hollowCommitHandling == HollowCommitHandling.USE_TRANSITION_TIME) {
+            scanTimeline = metaClient.getCommitsAndCompactionTimeline()
+                    .findInstantsInRangeByStateTransitionTime(startTimestamp, 
endTimestamp);
+        } else {
+            scanTimeline = TimelineUtils.handleHollowCommitIfNeeded(
+                            metaClient.getCommitsAndCompactionTimeline(), 
metaClient, hollowCommitHandling)
+                    .findInstantsInRange(startTimestamp, endTimestamp);
+        }
+        String latestCommit = includedCommits.get(includedCommits.size() - 
1).getTimestamp();
+        HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, scanTimeline,
+                affectedFilesInCommits);
+        Stream<FileSlice> fileSlices = 
HoodieInputFormatUtils.getWritePartitionPaths(commitsMetadata)
+                .stream().flatMap(relativePartitionPath ->
+                        
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, 
latestCommit));
+        if ("".equals(globPattern)) {
+            return fileSlices.collect(Collectors.toList());
+        }
+        GlobPattern globMatcher = new GlobPattern("*" + globPattern);
+        return fileSlices.filter(fileSlice -> 
globMatcher.matches(fileSlice.getBaseFile().map(BaseFile::getPath)
+                .or(fileSlice.getLatestLogFile().map(f -> 
f.getPath().toString())).get())).collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Split> collectSplits() throws HoodieException {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java
index 74f85e31651..4514ea05bfb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundRelation.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.nereids.analyzer;
 
+import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.nereids.exceptions.UnboundException;
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
@@ -53,21 +54,36 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
     private final List<String> hints;
     private final Optional<TableSample> tableSample;
     private final Optional<String> indexName;
+    private TableScanParams scanParams;
 
     public UnboundRelation(RelationId id, List<String> nameParts) {
         this(id, nameParts, Optional.empty(), Optional.empty(), 
ImmutableList.of(), false, ImmutableList.of(),
-                ImmutableList.of(), Optional.empty(), Optional.empty());
+                ImmutableList.of(), Optional.empty(), Optional.empty(), null);
     }
 
     public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames, boolean isTempPart) {
         this(id, nameParts, Optional.empty(), Optional.empty(), partNames, 
isTempPart, ImmutableList.of(),
-                ImmutableList.of(), Optional.empty(), Optional.empty());
+                ImmutableList.of(), Optional.empty(), Optional.empty(), null);
     }
 
     public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames, boolean isTempPart,
             List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName) {
         this(id, nameParts, Optional.empty(), Optional.empty(),
-                partNames, isTempPart, tabletIds, hints, tableSample, 
indexName);
+                partNames, isTempPart, tabletIds, hints, tableSample, 
indexName, null);
+    }
+
+    public UnboundRelation(RelationId id, List<String> nameParts, List<String> 
partNames, boolean isTempPart,
+            List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName,
+            TableScanParams scanParams) {
+        this(id, nameParts, Optional.empty(), Optional.empty(),
+                partNames, isTempPart, tabletIds, hints, tableSample, 
indexName, scanParams);
+    }
+
+    public UnboundRelation(RelationId id, List<String> nameParts, 
Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<String> 
partNames, boolean isTempPart,
+            List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName) {
+        this(id, nameParts, groupExpression, logicalProperties, partNames,
+                isTempPart, tabletIds, hints, tableSample, indexName, null);
     }
 
     /**
@@ -75,7 +91,8 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
      */
     public UnboundRelation(RelationId id, List<String> nameParts, 
Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<String> 
partNames, boolean isTempPart,
-            List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName) {
+            List<Long> tabletIds, List<String> hints, Optional<TableSample> 
tableSample, Optional<String> indexName,
+            TableScanParams scanParams) {
         super(id, PlanType.LOGICAL_UNBOUND_RELATION, groupExpression, 
logicalProperties);
         this.nameParts = 
ImmutableList.copyOf(Objects.requireNonNull(nameParts, "nameParts should not 
null"));
         this.partNames = 
ImmutableList.copyOf(Objects.requireNonNull(partNames, "partNames should not 
null"));
@@ -84,6 +101,7 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
         this.hints = ImmutableList.copyOf(Objects.requireNonNull(hints, "hints 
should not be null."));
         this.tableSample = tableSample;
         this.indexName = indexName;
+        this.scanParams = scanParams;
     }
 
     public List<String> getNameParts() {
@@ -104,14 +122,14 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
         return new UnboundRelation(relationId, nameParts,
                 groupExpression, Optional.of(getLogicalProperties()),
-                partNames, isTempPart, tabletIds, hints, tableSample, 
indexName);
+                partNames, isTempPart, tabletIds, hints, tableSample, 
indexName, null);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         return new UnboundRelation(relationId, nameParts, groupExpression, 
logicalProperties, partNames,
-                isTempPart, tabletIds, hints, tableSample, indexName);
+                isTempPart, tabletIds, hints, tableSample, indexName, null);
     }
 
     @Override
@@ -165,4 +183,8 @@ public class UnboundRelation extends LogicalRelation 
implements Unbound, BlockFu
     public Optional<TableSample> getTableSample() {
         return tableSample;
     }
+
+    public TableScanParams getScanParams() {
+        return scanParams;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 4fe38f409b0..211583f4832 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.ArithmeticExpr.Operator;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableScanParams;
 import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.BuiltinAggregateFunctions;
@@ -1308,11 +1309,17 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             relationHints = ImmutableList.of();
         }
 
+        TableScanParams scanParams = null;
+        if (ctx.optScanParams() != null) {
+            Map<String, String> map = 
visitPropertyItemList(ctx.optScanParams().properties);
+            scanParams = new 
TableScanParams(ctx.optScanParams().funcName.getText(), map);
+        }
+
         TableSample tableSample = ctx.sample() == null ? null : (TableSample) 
visit(ctx.sample());
         LogicalPlan checkedRelation = 
LogicalPlanBuilderAssistant.withCheckPolicy(
                 new UnboundRelation(StatementScopeIdGenerator.newRelationId(),
                         tableId, partitionNames, isTempPart, tabletIdLists, 
relationHints,
-                        Optional.ofNullable(tableSample), indexName));
+                        Optional.ofNullable(tableSample), indexName, 
scanParams));
         LogicalPlan plan = withTableAlias(checkedRelation, ctx.tableAlias());
         for (LateralViewContext lateralViewContext : ctx.lateralView()) {
             plan = withGenerate(plan, lateralViewContext);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index d74c487d296..e41c4283d60 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -249,12 +249,14 @@ public class BindRelation extends OneAnalysisRuleFactory {
                 LogicalView<Plan> logicalView = new LogicalView<>(view, 
viewBody);
                 return new LogicalSubQueryAlias<>(tableQualifier, logicalView);
             case HMS_EXTERNAL_TABLE:
-                if (Config.enable_query_hive_views && ((HMSExternalTable) 
table).isView()) {
-                    String hiveCatalog = ((HMSExternalTable) 
table).getCatalog().getName();
-                    String ddlSql = ((HMSExternalTable) table).getViewText();
+                HMSExternalTable hmsTable = (HMSExternalTable) table;
+                if (Config.enable_query_hive_views && hmsTable.isView()) {
+                    String hiveCatalog = hmsTable.getCatalog().getName();
+                    String ddlSql = hmsTable.getViewText();
                     Plan hiveViewPlan = parseAndAnalyzeHiveView(hiveCatalog, 
ddlSql, cascadesContext);
                     return new LogicalSubQueryAlias<>(tableQualifier, 
hiveViewPlan);
                 }
+                hmsTable.setScanParams(unboundRelation.getScanParams());
                 return new LogicalFileScan(unboundRelation.getRelationId(), 
(HMSExternalTable) table, tableQualifier,
                     unboundRelation.getTableSample());
             case ICEBERG_EXTERNAL_TABLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
index 24f8396cf21..aa2917ae1e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
@@ -17,20 +17,22 @@
 
 package org.apache.doris.nereids.rules.analysis;
 
+import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.util.ExpressionUtils;
 
 import com.google.common.collect.ImmutableList;
 
+import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -58,11 +60,26 @@ public class CheckPolicy implements AnalysisRuleFactory {
                                 return ctx.root.child();
                             }
                             LogicalRelation relation = (LogicalRelation) child;
-                            Optional<Expression> filter = 
checkPolicy.getFilter(relation, ctx.connectContext);
-                            if (!filter.isPresent()) {
+                            Set<Expression> combineFilter = new HashSet<>();
+
+                            // replace incremental params as AND expression
+                            if (relation instanceof LogicalFileScan) {
+                                LogicalFileScan fileScan = (LogicalFileScan) 
relation;
+                                if (fileScan.getTable() instanceof 
HMSExternalTable) {
+                                    HMSExternalTable hmsTable = 
(HMSExternalTable) fileScan.getTable();
+                                    
combineFilter.addAll(hmsTable.generateIncrementalExpression(
+                                            
fileScan.getLogicalProperties().getOutput()));
+                                }
+                            }
+
+                            // row policy
+                            checkPolicy.getFilter(relation, ctx.connectContext)
+                                    .ifPresent(expression -> 
combineFilter.addAll(
+                                            
ExpressionUtils.extractConjunctionToSet(expression)));
+
+                            if (combineFilter.isEmpty()) {
                                 return ctx.root.child();
                             }
-                            Set<Expression> combineFilter = 
ExpressionUtils.extractConjunctionToSet(filter.get());
                             if (upperFilter != null) {
                                 
combineFilter.addAll(upperFilter.getConjuncts());
                             }
diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi.out 
b/regression-test/data/external_table_p2/hive/test_hive_hudi.out
index 9202749e617..d217a289d8d 100644
--- a/regression-test/data/external_table_p2/hive/test_hive_hudi.out
+++ b/regression-test/data/external_table_p2/hive/test_hive_hudi.out
@@ -135,6 +135,18 @@ row_4      2021-02-01      4       v_4
 20240221111000868      20240221111000868_0_8   
e3cf430c-889d-4015-bc98-59bdce1e530c    sao_paulo       
c97347e9-033a-4c19-a033-94ac1de9f892    1695516137016   
e3cf430c-889d-4015-bc98-59bdce1e530c    rider-F driver-P        34.15   
sao_paulo
 20240221111000868      20240221111000868_0_4   
e96c4396-3fad-413a-a942-4cb36106d721    san_francisco   
3efcaa94-3e58-436a-b489-1232731ed088    1695091554788   
e96c4396-3fad-413a-a942-4cb36106d721    rider-C driver-M        27.7    
san_francisco
 
+-- !incr_mor_table --
+20240312163737521      20240312163737521_0_1   20240312163737521_1_0   pid=p2  
92e677af-6487-4213-b42f-ee56c5a2acdc-0_0-86-568_20240312163737521.parquet       
4       k4      4.992   p2
+20240312163737521      20240312163737521_1_0   20240312163737521_0_0   pid=p3  
22af1878-d8e0-4829-b4af-c7c9693d33f3-0_1-86-569_20240312163737521.parquet       
5       k5      5.5     p3
+20240312163613712      20240312163613712_0_1   20240312163541346_0_0   pid=p1  
ead436a2-f99b-4d35-8b3e-9d67e4828dd5-0_0-99-636_20240312163738946.parquet       
1       k1      1.134   p1
+
+-- !inc_cow_table --
+20240312164834145      20240312164834145_0_0   20240312164538551_2_0   pid=p2  
b8c2db38-e6b8-41f4-8132-620106e8c7e9-0_0-222-1414_20240312164938557.parquet     
3       k3-3    3.77    p2
+20240312164938557      20240312164938557_0_1   20240312164938557_1_0   pid=p2  
b8c2db38-e6b8-41f4-8132-620106e8c7e9-0_0-222-1414_20240312164938557.parquet     
4       k4      4.992   p2
+20240312164938557      20240312164938557_1_0   20240312164938557_0_0   pid=p3  
1335e747-d611-4575-8612-15e491224a0e-0_1-222-1415_20240312164938557.parquet     
5       k5      5.5     p3
+20240312164613448      20240312164613448_0_0   20240312164538551_0_0   pid=p1  
372f896b-10cb-4803-b109-d467189326b8-0_0-166-1042_20240312164650751.parquet     
1       k1      1.37    p1
+20240312164650751      20240312164650751_0_1   20240312164538551_1_0   pid=p1  
372f896b-10cb-4803-b109-d467189326b8-0_0-166-1042_20240312164650751.parquet     
2       k2      2.2     p1
+
 -- !skip_merge --
 20230605145009209      20230605145009209_0_0   rowId:row_1     
partitionId=2021-01-01/versionId=v_0    
65ffc5d9-397a-456e-a735-30f3ad37466f-0_0-33-96_20230605145009209.parquet        
row_1   2021-01-01      0       bob     v_0     toBeDel0        0       1000000
 20230605145403388      20230605145403388_2_0   rowId:row_1     
partitionId=2011-11-11/versionId=v_1    
dbff8acb-42bc-400c-be33-47d9e0bae9b7-0_2-83-222_20230605145403388.parquet       
row_1   2011-11-11      1       bob     v_1     toBeDel1        0       1000001
diff --git 
a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy 
b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
index d852e604df5..75c22b26cd7 100644
--- a/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
+++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi.groovy
@@ -48,6 +48,12 @@ suite("test_hive_hudi", "p2,external,hive,hudi") {
         // hudi table created by flink hudi catalog
         qt_flink_hudi_catalog """select * from hudi_ctl_table order by uuid"""
 
+        // incremental read for MOR table
+        qt_incr_mor_table """select * from 
incr_mor_partition@incr('beginTime'='20240312163541346')"""
+
+        // incremental read for COW table
+        qt_inc_cow_table """select * from 
incr_cow_partition@incr('beginTime'='20240312164538551')"""
+
         // skip logs
         sql """drop catalog if exists ${catalog_name};"""
         sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to