This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 50eaf238bf7 [fix](hudi) move wrong members in HMSExternalTable #36187
(#36282)
50eaf238bf7 is described below
commit 50eaf238bf721914a3a4227b1fee40f7144c0028
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Jun 17 11:28:55 2024 +0800
[fix](hudi) move wrong members in HMSExternalTable #36187 (#36282)
pick from branch-2.1 #36187
---
.../doris/datasource/hive/HMSExternalTable.java | 94 ---------
.../hudi/source/HudiCachedPartitionProcessor.java | 3 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 45 ++--
.../glue/translator/PhysicalPlanTranslator.java | 86 +++++---
.../org/apache/doris/nereids/rules/RuleSet.java | 2 +
.../org/apache/doris/nereids/rules/RuleType.java | 1 +
.../doris/nereids/rules/analysis/BindRelation.java | 17 +-
.../doris/nereids/rules/analysis/CheckPolicy.java | 13 +-
.../LogicalFileScanToPhysicalFileScan.java | 3 +-
...java => LogicalHudiScanToPhysicalHudiScan.java} | 34 ++--
.../doris/nereids/stats/StatsCalculator.java | 6 +
.../apache/doris/nereids/trees/plans/PlanType.java | 1 +
.../trees/plans/logical/LogicalFileScan.java | 8 +-
.../trees/plans/logical/LogicalHudiScan.java | 226 +++++++++++++++++++++
.../trees/plans/physical/PhysicalFileScan.java | 44 +++-
...PhysicalFileScan.java => PhysicalHudiScan.java} | 125 +++++-------
.../trees/plans/visitor/RelationVisitor.java | 10 +
.../apache/doris/nereids/util/RelationUtil.java | 16 +-
.../apache/doris/planner/SingleNodePlanner.java | 10 +-
.../apache/doris/external/hms/HmsCatalogTest.java | 5 +
20 files changed, 483 insertions(+), 266 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 4cee3d311fd..2c879fba503 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -17,7 +17,6 @@
package org.apache.doris.datasource.hive;
-import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ListPartitionItem;
@@ -31,23 +30,12 @@ import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
-import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
-import org.apache.doris.datasource.hudi.source.IncrementalRelation;
-import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
-import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.GreaterThan;
-import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
-import org.apache.doris.nereids.trees.expressions.LessThanEqual;
-import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@@ -60,7 +48,6 @@ import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -76,7 +63,6 @@ import
org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -162,10 +148,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
// record the event update time when enable hms event listener
protected volatile long eventUpdateTime;
- // for hudi incremental read
- private TableScanParams scanParams = null;
- private IncrementalRelation incrementalRelation = null;
-
public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG
}
@@ -305,82 +287,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
.orElse(Collections.emptyList());
}
- public TableScanParams getScanParams() {
- return scanParams;
- }
-
- public void setScanParams(TableScanParams scanParams) {
- if (scanParams != null && scanParams.incrementalRead()) {
- Map<String, String> optParams = getHadoopProperties();
- if (scanParams.getParams().containsKey("beginTime")) {
- optParams.put("hoodie.datasource.read.begin.instanttime",
scanParams.getParams().get("beginTime"));
- }
- if (scanParams.getParams().containsKey("endTime")) {
- optParams.put("hoodie.datasource.read.end.instanttime",
scanParams.getParams().get("endTime"));
- }
- scanParams.getParams().forEach((k, v) -> {
- if (k.startsWith("hoodie.")) {
- optParams.put(k, v);
- }
- });
- HoodieTableMetaClient hudiClient =
HiveMetaStoreClientHelper.getHudiClient(this);
- try {
- boolean isCowOrRoTable = isHoodieCowTable();
- if (isCowOrRoTable) {
- Map<String, String> serd =
remoteTable.getSd().getSerdeInfo().getParameters();
- if ("true".equals(serd.get("hoodie.query.as.ro.table"))
- && remoteTable.getTableName().endsWith("_ro")) {
- // Incremental read RO table as RT table, I don't know
why?
- isCowOrRoTable = false;
- LOG.warn("Execute incremental read on RO table");
- }
- }
- if (isCowOrRoTable) {
- incrementalRelation = new COWIncrementalRelation(
- optParams,
HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
- } else {
- incrementalRelation = new MORIncrementalRelation(
- optParams,
HiveMetaStoreClientHelper.getConfiguration(this), hudiClient);
- }
- } catch (Exception e) {
- LOG.warn("Failed to create incremental relation", e);
- }
- }
- this.scanParams = scanParams;
- }
-
- public IncrementalRelation getIncrementalRelation() {
- return incrementalRelation;
- }
-
- /**
- * replace incremental params as AND expression
- * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
- * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <=
'20240308110677278'
- */
- public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
- if (incrementalRelation == null) {
- return Collections.emptySet();
- }
- SlotReference timeField = null;
- for (Slot slot : slots) {
- if ("_hoodie_commit_time".equals(slot.getName())) {
- timeField = (SlotReference) slot;
- break;
- }
- }
- if (timeField == null) {
- return Collections.emptySet();
- }
- StringLiteral upperValue = new
StringLiteral(incrementalRelation.getEndTs());
- StringLiteral lowerValue = new
StringLiteral(incrementalRelation.getStartTs());
- ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
- ComparisonPredicate great = incrementalRelation.isIncludeStartTime()
- ? new GreaterThanEqual(timeField, lowerValue)
- : new GreaterThan(timeField, lowerValue);
- return ImmutableSet.of(great, less);
- }
-
public boolean isHiveTransactionalTable() {
return dlaType == DLAType.HIVE &&
AcidUtils.isTransactionalTable(remoteTable)
&& isSupportedTransactionalFileFormat();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
index c8220349019..d9c1c208271 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiCachedPartitionProcessor.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource.hudi.source;
import org.apache.doris.common.CacheFactory;
import org.apache.doris.common.Config;
+import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.TablePartitionValues.TablePartitionKey;
@@ -163,7 +164,7 @@ public class HudiCachedPartitionProcessor extends
HudiPartitionProcessor {
}
} catch (Exception e) {
LOG.warn("Failed to get hudi partitions", e);
- throw new CacheException("Failed to get hudi partitions", e);
+ throw new CacheException("Failed to get hudi partitions: " +
Util.getRootCauseMessage(e), e);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 61edc333f6c..8f2b3e598b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -97,9 +97,6 @@ public class HudiScanNode extends HiveScanNode {
private List<String> columnNames;
private List<String> columnTypes;
- private boolean incrementalRead = false;
- private IncrementalRelation incrementalRelation;
-
private boolean partitionInit = false;
private HoodieTimeline timeline;
private Option<String> snapshotTimestamp;
@@ -108,25 +105,32 @@ public class HudiScanNode extends HiveScanNode {
private Iterator<HivePartition> prunedPartitionsIter;
private int numSplitsPerPartition = NUM_SPLITS_PER_PARTITION;
+ private boolean incrementalRead = false;
+ private TableScanParams scanParams;
+ private IncrementalRelation incrementalRelation;
+
/**
* External file scan node for Query Hudi table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check
column priv
* eg: s3 tvf
* These scan nodes do not have corresponding catalog/database/table info,
so no need to do priv check
*/
- public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv,
+ Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation) {
super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
needCheckColumnPriv);
isCowOrRoTable = hmsTable.isHoodieCowTable();
- if (isCowOrRoTable) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hudi table {} can read as cow/read optimize table",
hmsTable.getName());
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Hudi table {} is a mor table, and will use JNI to
read data in BE", hmsTable.getName());
+ if (LOG.isDebugEnabled()) {
+ if (isCowOrRoTable) {
+ LOG.debug("Hudi table {} can read as cow/read optimize table",
hmsTable.getFullQualifiers());
+ } else {
+ LOG.debug("Hudi table {} is a mor table, and will use JNI to
read data in BE",
+ hmsTable.getFullQualifiers());
}
}
useHiveSyncPartition = hmsTable.useHiveSyncPartition();
+ this.scanParams = scanParams.orElse(null);
+ this.incrementalRelation = incrementalRelation.orElse(null);
+ this.incrementalRead = (this.scanParams != null &&
this.scanParams.incrementalRead());
}
@Override
@@ -171,17 +175,9 @@ public class HudiScanNode extends HiveScanNode {
columnTypes.add(columnType);
}
- TableScanParams scanParams = desc.getRef().getScanParams();
- if (scanParams != null) {
- throw new UserException("Incremental read should turn on nereids
planner");
- }
- scanParams = hmsTable.getScanParams();
- if (scanParams != null) {
- if (scanParams.incrementalRead()) {
- incrementalRead = true;
- } else {
- throw new UserException("Not support function '" +
scanParams.getParamType() + "' in hudi table");
- }
+ if (scanParams != null && !scanParams.incrementalRead()) {
+ // Only support incremental read
+ throw new UserException("Not support function '" +
scanParams.getParamType() + "' in hudi table");
}
if (incrementalRead) {
if (isCowOrRoTable) {
@@ -191,18 +187,15 @@ public class HudiScanNode extends HiveScanNode {
&&
hmsTable.getRemoteTable().getTableName().endsWith("_ro")) {
// Incremental read RO table as RT table, I don't know
why?
isCowOrRoTable = false;
- LOG.warn("Execute incremental read on RO table");
+ LOG.warn("Execute incremental read on RO table: {}",
hmsTable.getFullQualifiers());
}
} catch (Exception e) {
// ignore
}
}
- incrementalRelation = hmsTable.getIncrementalRelation();
if (incrementalRelation == null) {
throw new UserException("Failed to create incremental
relation");
}
- } else {
- incrementalRelation = null;
}
timeline =
hudiClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 2ef4fc1debf..664233baa88 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -51,6 +51,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.source.EsScanNode;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
@@ -125,6 +126,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
@@ -209,6 +211,7 @@ import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Collection;
@@ -569,9 +572,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
ScanNode scanNode;
if (table instanceof HMSExternalTable) {
switch (((HMSExternalTable) table).getDlaType()) {
- case HUDI:
- scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
- break;
case ICEBERG:
scanNode = new IcebergScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
IcebergScanNode icebergScanNode = (IcebergScanNode)
scanNode;
@@ -607,33 +607,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
- scanNode.setNereidsId(fileScan.getId());
-
scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts()));
-
scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));
-
- TableName tableName = new TableName(null, "", "");
- TableRef ref = new TableRef(tableName, null, null);
- BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
- tupleDescriptor.setRef(tableRef);
- if (fileScan.getStats() != null) {
- scanNode.setCardinality((long) fileScan.getStats().getRowCount());
- }
- Utils.execWithUncheckedException(scanNode::init);
- context.addScanNode(scanNode, fileScan);
- ScanNode finalScanNode = scanNode;
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
- expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
- )
- );
- context.getTopnFilterContext().translateTarget(fileScan, scanNode,
context);
- Utils.execWithUncheckedException(scanNode::finalizeForNereids);
- // Create PlanFragment
- DataPartition dataPartition = DataPartition.RANDOM;
- PlanFragment planFragment = createPlanFragment(scanNode,
dataPartition, fileScan);
- context.addPlanFragment(planFragment);
- updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
- return planFragment;
+ return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode,
table, tupleDescriptor);
}
@Override
@@ -680,6 +654,58 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
return planFragment;
}
+ @Override
+ public PlanFragment visitPhysicalHudiScan(PhysicalHudiScan fileScan,
PlanTranslatorContext context) {
+ List<Slot> slots = fileScan.getOutput();
+ ExternalTable table = fileScan.getTable();
+ TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table,
context);
+
+ if (!(table instanceof HMSExternalTable) || ((HMSExternalTable)
table).getDlaType() != DLAType.HUDI) {
+ throw new RuntimeException("Invalid table type for Hudi scan: " +
table.getType());
+ }
+ Preconditions.checkState(fileScan instanceof PhysicalHudiScan,
+ "Invalid physical scan: " + fileScan.getClass().getSimpleName()
+ + " for Hudi table");
+ PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
+ ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(),
tupleDescriptor, false,
+ hudiScan.getScanParams(), hudiScan.getIncrementalRelation());
+
+ return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode,
table, tupleDescriptor);
+ }
+
+ @NotNull
+ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan
fileScan, PlanTranslatorContext context,
+ ScanNode scanNode,
+ ExternalTable table, TupleDescriptor tupleDescriptor) {
+ scanNode.setNereidsId(fileScan.getId());
+
scanNode.addConjuncts(translateToLegacyConjuncts(fileScan.getConjuncts()));
+
scanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(fileScan.getRelationId()));
+
+ TableName tableName = new TableName(null, "", "");
+ TableRef ref = new TableRef(tableName, null, null);
+ BaseTableRef tableRef = new BaseTableRef(ref, table, tableName);
+ tupleDescriptor.setRef(tableRef);
+ if (fileScan.getStats() != null) {
+ scanNode.setCardinality((long) fileScan.getStats().getRowCount());
+ }
+ Utils.execWithUncheckedException(scanNode::init);
+ context.addScanNode(scanNode, fileScan);
+ ScanNode finalScanNode = scanNode;
+ context.getRuntimeTranslator().ifPresent(
+ runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
+ expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
+ )
+ );
+ context.getTopnFilterContext().translateTarget(fileScan, scanNode,
context);
+ Utils.execWithUncheckedException(scanNode::finalizeForNereids);
+ // Create PlanFragment
+ DataPartition dataPartition = DataPartition.RANDOM;
+ PlanFragment planFragment = createPlanFragment(scanNode,
dataPartition, fileScan);
+ context.addPlanFragment(planFragment);
+ updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
+ return planFragment;
+ }
+
@Override
public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan,
PlanTranslatorContext context) {
List<Slot> slots = jdbcScan.getOutput();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index a5fb0b8736a..40f4b135837 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -63,6 +63,7 @@ import
org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFi
import
org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import
org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import
org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
+import
org.apache.doris.nereids.rules.implementation.LogicalHudiScanToPhysicalHudiScan;
import
org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import
org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import
org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
@@ -164,6 +165,7 @@ public class RuleSet {
.add(new LogicalOlapScanToPhysicalOlapScan())
.add(new
LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan())
.add(new LogicalSchemaScanToPhysicalSchemaScan())
+ .add(new LogicalHudiScanToPhysicalHudiScan())
.add(new LogicalFileScanToPhysicalFileScan())
.add(new LogicalJdbcScanToPhysicalJdbcScan())
.add(new LogicalOdbcScanToPhysicalOdbcScan())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index bde86b61c29..94f11cb2d88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -429,6 +429,7 @@ public enum RuleType {
LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
+ LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 444f7f83776..d66b20e36a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.es.EsExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.nereids.CTEContext;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
@@ -55,6 +56,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -267,10 +269,17 @@ public class BindRelation extends OneAnalysisRuleFactory {
Plan hiveViewPlan = parseAndAnalyzeHiveView(hmsTable,
hiveCatalog, ddlSql, cascadesContext);
return new LogicalSubQueryAlias<>(tableQualifier,
hiveViewPlan);
}
- hmsTable.setScanParams(unboundRelation.getScanParams());
- return new
LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
- qualifierWithoutTableName,
unboundRelation.getTableSample(),
- unboundRelation.getTableSnapshot());
+ if (hmsTable.getDlaType() == DLAType.HUDI) {
+ LogicalHudiScan hudiScan = new
LogicalHudiScan(unboundRelation.getRelationId(), hmsTable,
+ qualifierWithoutTableName,
unboundRelation.getTableSample(),
+ unboundRelation.getTableSnapshot());
+ hudiScan = hudiScan.withScanParams(hmsTable,
unboundRelation.getScanParams());
+ return hudiScan;
+ } else {
+ return new
LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
+ qualifierWithoutTableName,
unboundRelation.getTableSample(),
+ unboundRelation.getTableSnapshot());
+ }
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case MAX_COMPUTE_EXTERNAL_TABLE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
index 1e7d4dbb09d..94f7c36b108 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CheckPolicy.java
@@ -25,8 +25,8 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import
org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy.RelatedPolicy;
-import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.util.ExpressionUtils;
@@ -65,12 +65,11 @@ public class CheckPolicy implements AnalysisRuleFactory {
Set<Expression> combineFilter = new
LinkedHashSet<>();
// replace incremental params as AND expression
- if (relation instanceof LogicalFileScan) {
- LogicalFileScan fileScan = (LogicalFileScan)
relation;
- if (fileScan.getTable() instanceof
HMSExternalTable) {
- HMSExternalTable hmsTable =
(HMSExternalTable) fileScan.getTable();
-
combineFilter.addAll(hmsTable.generateIncrementalExpression(
-
fileScan.getLogicalProperties().getOutput()));
+ if (relation instanceof LogicalHudiScan) {
+ LogicalHudiScan hudiScan = (LogicalHudiScan)
relation;
+ if (hudiScan.getTable() instanceof
HMSExternalTable) {
+
combineFilter.addAll(hudiScan.generateIncrementalExpression(
+
hudiScan.getLogicalProperties().getOutput()));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
index 8edb683151e..4946dcd56c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.properties.DistributionSpecAny;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
import java.util.Optional;
@@ -30,7 +31,7 @@ import java.util.Optional;
public class LogicalFileScanToPhysicalFileScan extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalFileScan().then(fileScan ->
+ return logicalFileScan().when(plan -> !(plan instanceof
LogicalHudiScan)).then(fileScan ->
new PhysicalFileScan(
fileScan.getRelationId(),
fileScan.getTable(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java
similarity index 53%
copy from
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java
index 8edb683151e..a5d676eab67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalFileScanToPhysicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHudiScanToPhysicalHudiScan.java
@@ -20,28 +20,30 @@ package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.properties.DistributionSpecAny;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import java.util.Optional;
/**
- * Implementation rule that convert logical FileScan to physical FileScan.
+ * Implementation rule that convert logical HudiScan to physical HudiScan.
*/
-public class LogicalFileScanToPhysicalFileScan extends
OneImplementationRuleFactory {
+public class LogicalHudiScanToPhysicalHudiScan extends
OneImplementationRuleFactory {
@Override
public Rule build() {
- return logicalFileScan().then(fileScan ->
- new PhysicalFileScan(
- fileScan.getRelationId(),
- fileScan.getTable(),
- fileScan.getQualifier(),
- DistributionSpecAny.INSTANCE,
- Optional.empty(),
- fileScan.getLogicalProperties(),
- fileScan.getConjuncts(),
- fileScan.getSelectedPartitions(),
- fileScan.getTableSample(),
- fileScan.getTableSnapshot())
- ).toRule(RuleType.LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE);
+ return logicalHudiScan().then(fileScan ->
+ new PhysicalHudiScan(
+ fileScan.getRelationId(),
+ fileScan.getTable(),
+ fileScan.getQualifier(),
+ DistributionSpecAny.INSTANCE,
+ Optional.empty(),
+ fileScan.getLogicalProperties(),
+ fileScan.getConjuncts(),
+ fileScan.getSelectedPartitions(),
+ fileScan.getTableSample(),
+ fileScan.getTableSnapshot(),
+ fileScan.getScanParams(),
+ fileScan.getIncrementalRelation())
+ ).toRule(RuleType.LOGICAL_HUDI_SCAN_TO_PHYSICAL_HUDI_SCAN_RULE);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index 04e71b53d03..e42c58ca6b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -69,6 +69,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -318,6 +319,11 @@ public class StatsCalculator extends
DefaultPlanVisitor<Statistics, Void> {
return computeCatalogRelation(fileScan);
}
+ @Override
+ public Statistics visitLogicalHudiScan(LogicalHudiScan fileScan, Void
context) {
+ return computeCatalogRelation(fileScan);
+ }
+
@Override
public Statistics visitLogicalTVFRelation(LogicalTVFRelation tvfRelation,
Void context) {
return tvfRelation.getFunction().computeStats(tvfRelation.getOutput());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 13fd64a8798..4df122c9fc3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -90,6 +90,7 @@ public enum PlanType {
PHYSICAL_EMPTY_RELATION,
PHYSICAL_ES_SCAN,
PHYSICAL_FILE_SCAN,
+ PHYSICAL_HUDI_SCAN,
PHYSICAL_JDBC_SCAN,
PHYSICAL_ODBC_SCAN,
PHYSICAL_ONE_ROW_RELATION,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index 06d349fe2a6..8dd47c44b15 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -45,14 +45,14 @@ import java.util.Set;
*/
public class LogicalFileScan extends LogicalExternalRelation {
- private final SelectedPartitions selectedPartitions;
- private final Optional<TableSample> tableSample;
- private final Optional<TableSnapshot> tableSnapshot;
+ protected final SelectedPartitions selectedPartitions;
+ protected final Optional<TableSample> tableSample;
+ protected final Optional<TableSnapshot> tableSnapshot;
/**
* Constructor for LogicalFileScan.
*/
- public LogicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ protected LogicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
Optional<TableSample> tableSample,
Optional<TableSnapshot> tableSnapshot) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
new file mode 100644
index 00000000000..8659ad3d9c3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.analysis.TableScanParams;
+import org.apache.doris.analysis.TableSnapshot;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
+import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
+import org.apache.doris.datasource.hudi.source.IncrementalRelation;
+import org.apache.doris.datasource.hudi.source.MORIncrementalRelation;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.TableSample;
+import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
+import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
+import org.apache.doris.nereids.trees.expressions.LessThanEqual;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.StringLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Logical Hudi scan for Hudi table
+ */
+public class LogicalHudiScan extends LogicalFileScan {
+ private static final Logger LOG =
LogManager.getLogger(LogicalHudiScan.class);
+
+ // for hudi incremental read
+ private final Optional<TableScanParams> scanParams;
+ private final Optional<IncrementalRelation> incrementalRelation;
+
+ /**
+ * Constructor for LogicalHudiScan.
+ */
+ protected LogicalHudiScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
+ Set<Expression> conjuncts, SelectedPartitions selectedPartitions,
Optional<TableSample> tableSample,
+ Optional<TableSnapshot> tableSnapshot,
+ Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation) {
+ super(id, table, qualifier, groupExpression, logicalProperties,
conjuncts,
+ selectedPartitions, tableSample, tableSnapshot);
+ Objects.requireNonNull(scanParams, "scanParams should not null");
+ Objects.requireNonNull(incrementalRelation, "incrementalRelation
should not null");
+ this.scanParams = scanParams;
+ this.incrementalRelation = incrementalRelation;
+ }
+
+ public LogicalHudiScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot) {
+ this(id, table, qualifier, Optional.empty(), Optional.empty(),
+ Sets.newHashSet(), SelectedPartitions.NOT_PRUNED, tableSample,
tableSnapshot,
+ Optional.empty(), Optional.empty());
+ }
+
+ public Optional<TableScanParams> getScanParams() {
+ return scanParams;
+ }
+
+ public Optional<IncrementalRelation> getIncrementalRelation() {
+ return incrementalRelation;
+ }
+
+ /**
+ * replace incremental params as AND expression
+ * incr('beginTime'='20240308110257169', 'endTime'='20240308110677278') =>
+ * _hoodie_commit_time >= 20240308110257169 and _hoodie_commit_time <=
'20240308110677278'
+ */
+ public Set<Expression> generateIncrementalExpression(List<Slot> slots) {
+ if (!incrementalRelation.isPresent()) {
+ return Collections.emptySet();
+ }
+ SlotReference timeField = null;
+ for (Slot slot : slots) {
+ if ("_hoodie_commit_time".equals(slot.getName())) {
+ timeField = (SlotReference) slot;
+ break;
+ }
+ }
+ if (timeField == null) {
+ return Collections.emptySet();
+ }
+ StringLiteral upperValue = new
StringLiteral(incrementalRelation.get().getEndTs());
+ StringLiteral lowerValue = new
StringLiteral(incrementalRelation.get().getStartTs());
+ ComparisonPredicate less = new LessThanEqual(timeField, upperValue);
+ ComparisonPredicate great =
incrementalRelation.get().isIncludeStartTime()
+ ? new GreaterThanEqual(timeField, lowerValue)
+ : new GreaterThan(timeField, lowerValue);
+ return ImmutableSet.of(great, less);
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toSqlString("LogicalHudiScan",
+ "qualified", qualifiedName(),
+ "output", getOutput()
+ );
+ }
+
+ @Override
+ public LogicalHudiScan withGroupExpression(Optional<GroupExpression>
groupExpression) {
+ return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier, groupExpression,
+ Optional.of(getLogicalProperties()), conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier,
+ groupExpression, logicalProperties, conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ @Override
+ public LogicalHudiScan withConjuncts(Set<Expression> conjuncts) {
+ return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier, Optional.empty(),
+ Optional.of(getLogicalProperties()), conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ public LogicalHudiScan withSelectedPartitions(SelectedPartitions
selectedPartitions) {
+ return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier, Optional.empty(),
+ Optional.of(getLogicalProperties()), conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ @Override
+ public LogicalHudiScan withRelationId(RelationId relationId) {
+ return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier, Optional.empty(),
+ Optional.empty(), conjuncts, selectedPartitions, tableSample,
tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitLogicalHudiScan(this, context);
+ }
+
+ /**
+ * Set scan params for incremental read
+ *
+ * @param table should be hudi table
+ * @param scanParams including incremental read params
+ */
+ public LogicalHudiScan withScanParams(HMSExternalTable table,
TableScanParams scanParams) {
+ Optional<IncrementalRelation> newIncrementalRelation =
Optional.empty();
+ Optional<TableScanParams> newScanParams = Optional.empty();
+ if (scanParams != null && scanParams.incrementalRead()) {
+ Map<String, String> optParams = table.getHadoopProperties();
+ if (scanParams.getParams().containsKey("beginTime")) {
+ optParams.put("hoodie.datasource.read.begin.instanttime",
scanParams.getParams().get("beginTime"));
+ }
+ if (scanParams.getParams().containsKey("endTime")) {
+ optParams.put("hoodie.datasource.read.end.instanttime",
scanParams.getParams().get("endTime"));
+ }
+ scanParams.getParams().forEach((k, v) -> {
+ if (k.startsWith("hoodie.")) {
+ optParams.put(k, v);
+ }
+ });
+ HoodieTableMetaClient hudiClient =
HiveMetaStoreClientHelper.getHudiClient(table);
+ try {
+ boolean isCowOrRoTable = table.isHoodieCowTable();
+ if (isCowOrRoTable) {
+ Map<String, String> serd =
table.getRemoteTable().getSd().getSerdeInfo().getParameters();
+ if ("true".equals(serd.get("hoodie.query.as.ro.table"))
+ &&
table.getRemoteTable().getTableName().endsWith("_ro")) {
+ // Incremental read RO table as RT table, I don't know
why?
+ isCowOrRoTable = false;
+ LOG.warn("Execute incremental read on RO table: {}",
table.getFullQualifiers());
+ }
+ }
+ if (isCowOrRoTable) {
+ newIncrementalRelation = Optional.of(new
COWIncrementalRelation(
+ optParams,
HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
+ } else {
+ newIncrementalRelation = Optional.of(new
MORIncrementalRelation(
+ optParams,
HiveMetaStoreClientHelper.getConfiguration(table), hudiClient));
+ }
+ } catch (Exception e) {
+ throw new AnalysisException(
+ "Failed to create incremental relation for table: " +
table.getFullQualifiers(), e);
+ }
+ }
+ newScanParams = Optional.ofNullable(scanParams);
+ return new LogicalHudiScan(relationId, table, qualifier,
Optional.empty(),
+ Optional.empty(), conjuncts, selectedPartitions, tableSample,
tableSnapshot,
+ newScanParams, newIncrementalRelation);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
index 8706db65f1e..639e026949e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
@@ -42,11 +42,11 @@ import java.util.Set;
*/
public class PhysicalFileScan extends PhysicalCatalogRelation {
- private final DistributionSpec distributionSpec;
- private final Set<Expression> conjuncts;
- private final SelectedPartitions selectedPartitions;
- private final Optional<TableSample> tableSample;
- private final Optional<TableSnapshot> tableSnapshot;
+ protected final DistributionSpec distributionSpec;
+ protected final Set<Expression> conjuncts;
+ protected final SelectedPartitions selectedPartitions;
+ protected final Optional<TableSample> tableSample;
+ protected final Optional<TableSnapshot> tableSnapshot;
/**
* Constructor for PhysicalFileScan.
@@ -56,7 +56,32 @@ public class PhysicalFileScan extends
PhysicalCatalogRelation {
LogicalProperties logicalProperties, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions, Optional<TableSample>
tableSample,
Optional<TableSnapshot> tableSnapshot) {
- super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties);
+ this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
distributionSpec, groupExpression,
+ logicalProperties, conjuncts, selectedPartitions, tableSample,
tableSnapshot);
+ }
+
+ /**
+ * Constructor for PhysicalFileScan.
+ */
+ public PhysicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ DistributionSpec distributionSpec, Optional<GroupExpression>
groupExpression,
+ LogicalProperties logicalProperties, PhysicalProperties
physicalProperties,
+ Statistics statistics, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions,
+ Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot) {
+ this(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
distributionSpec, groupExpression,
+ logicalProperties, physicalProperties, statistics, conjuncts,
selectedPartitions, tableSample,
+ tableSnapshot);
+ }
+
+ /**
+ * For hudi file scan to specified PlanTye
+ */
+ protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable
table, List<String> qualifier,
+ DistributionSpec distributionSpec, Optional<GroupExpression>
groupExpression,
+ LogicalProperties logicalProperties, Set<Expression> conjuncts,
+ SelectedPartitions selectedPartitions, Optional<TableSample>
tableSample,
+ Optional<TableSnapshot> tableSnapshot) {
+ super(id, type, table, qualifier, groupExpression, logicalProperties);
this.distributionSpec = distributionSpec;
this.conjuncts = conjuncts;
this.selectedPartitions = selectedPartitions;
@@ -64,15 +89,12 @@ public class PhysicalFileScan extends
PhysicalCatalogRelation {
this.tableSnapshot = tableSnapshot;
}
- /**
- * Constructor for PhysicalFileScan.
- */
- public PhysicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ protected PhysicalFileScan(RelationId id, PlanType type, ExternalTable
table, List<String> qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression>
groupExpression,
LogicalProperties logicalProperties, PhysicalProperties
physicalProperties,
Statistics statistics, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions,
Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot) {
- super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties,
+ super(id, type, table, qualifier, groupExpression, logicalProperties,
physicalProperties, statistics);
this.distributionSpec = distributionSpec;
this.conjuncts = conjuncts;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java
similarity index 56%
copy from
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java
index 8706db65f1e..d5bc299a2ba 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHudiScan.java
@@ -17,8 +17,10 @@
package org.apache.doris.nereids.trees.plans.physical;
+import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.hudi.source.IncrementalRelation;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.LogicalProperties;
@@ -34,113 +36,98 @@ import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
- * Physical file scan for external catalog.
+ * Physical Hudi scan for Hudi table.
*/
-public class PhysicalFileScan extends PhysicalCatalogRelation {
+public class PhysicalHudiScan extends PhysicalFileScan {
- private final DistributionSpec distributionSpec;
- private final Set<Expression> conjuncts;
- private final SelectedPartitions selectedPartitions;
- private final Optional<TableSample> tableSample;
- private final Optional<TableSnapshot> tableSnapshot;
+ // for hudi incremental read
+ private final Optional<TableScanParams> scanParams;
+ private final Optional<IncrementalRelation> incrementalRelation;
/**
- * Constructor for PhysicalFileScan.
+ * Constructor for PhysicalHudiScan.
*/
- public PhysicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ public PhysicalHudiScan(RelationId id, ExternalTable table, List<String>
qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression>
groupExpression,
LogicalProperties logicalProperties, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions, Optional<TableSample>
tableSample,
- Optional<TableSnapshot> tableSnapshot) {
- super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties);
- this.distributionSpec = distributionSpec;
- this.conjuncts = conjuncts;
- this.selectedPartitions = selectedPartitions;
- this.tableSample = tableSample;
- this.tableSnapshot = tableSnapshot;
+ Optional<TableSnapshot> tableSnapshot,
+ Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation) {
+ super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier,
distributionSpec, groupExpression, logicalProperties,
+ conjuncts, selectedPartitions, tableSample, tableSnapshot);
+ Objects.requireNonNull(scanParams, "scanParams should not null");
+ Objects.requireNonNull(incrementalRelation, "incrementalRelation
should not null");
+ this.scanParams = scanParams;
+ this.incrementalRelation = incrementalRelation;
}
/**
- * Constructor for PhysicalFileScan.
+ * Constructor for PhysicalHudiScan.
*/
- public PhysicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
+ public PhysicalHudiScan(RelationId id, ExternalTable table, List<String>
qualifier,
DistributionSpec distributionSpec, Optional<GroupExpression>
groupExpression,
LogicalProperties logicalProperties, PhysicalProperties
physicalProperties,
Statistics statistics, Set<Expression> conjuncts,
SelectedPartitions selectedPartitions,
- Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot) {
- super(id, PlanType.PHYSICAL_FILE_SCAN, table, qualifier,
groupExpression, logicalProperties,
- physicalProperties, statistics);
- this.distributionSpec = distributionSpec;
- this.conjuncts = conjuncts;
- this.selectedPartitions = selectedPartitions;
- this.tableSample = tableSample;
- this.tableSnapshot = tableSnapshot;
+ Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot,
+ Optional<TableScanParams> scanParams,
Optional<IncrementalRelation> incrementalRelation) {
+ super(id, PlanType.PHYSICAL_HUDI_SCAN, table, qualifier,
distributionSpec, groupExpression, logicalProperties,
+ physicalProperties, statistics, conjuncts, selectedPartitions,
tableSample, tableSnapshot);
+ this.scanParams = scanParams;
+ this.incrementalRelation = incrementalRelation;
}
- public DistributionSpec getDistributionSpec() {
- return distributionSpec;
+ public Optional<TableScanParams> getScanParams() {
+ return scanParams;
}
- public Set<Expression> getConjuncts() {
- return conjuncts;
+ public Optional<IncrementalRelation> getIncrementalRelation() {
+ return incrementalRelation;
}
- public SelectedPartitions getSelectedPartitions() {
- return selectedPartitions;
+ @Override
+ public PhysicalHudiScan withGroupExpression(Optional<GroupExpression>
groupExpression) {
+ return new PhysicalHudiScan(relationId, getTable(), qualifier,
distributionSpec,
+ groupExpression, getLogicalProperties(), conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
}
- public Optional<TableSample> getTableSample() {
- return tableSample;
+ @Override
+ public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
+ Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
+ return new PhysicalHudiScan(relationId, getTable(), qualifier,
distributionSpec,
+ groupExpression, logicalProperties.get(), conjuncts,
selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
}
- public Optional<TableSnapshot> getTableSnapshot() {
- return tableSnapshot;
+ @Override
+ public PhysicalHudiScan withPhysicalPropertiesAndStats(PhysicalProperties
physicalProperties,
+ Statistics statistics) {
+ return new PhysicalHudiScan(relationId, getTable(), qualifier,
distributionSpec,
+ groupExpression, getLogicalProperties(), physicalProperties,
statistics, conjuncts,
+ selectedPartitions, tableSample, tableSnapshot,
+ scanParams, incrementalRelation);
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitPhysicalHudiScan(this, context);
}
@Override
public String toString() {
- return Utils.toSqlString("PhysicalFileScan",
+ return Utils.toSqlString("PhysicalHudiScan",
"qualified", Utils.qualifiedName(qualifier, table.getName()),
"output", getOutput(),
"stats", statistics,
"conjuncts", conjuncts,
"selected partitions num",
- selectedPartitions.isPruned ?
selectedPartitions.selectedPartitions.size() : "unknown"
+ selectedPartitions.isPruned ?
selectedPartitions.selectedPartitions.size() : "unknown",
+ "isIncremental", incrementalRelation.isPresent()
);
}
-
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitPhysicalFileScan(this, context);
- }
-
- @Override
- public PhysicalFileScan withGroupExpression(Optional<GroupExpression>
groupExpression) {
- return new PhysicalFileScan(relationId, getTable(), qualifier,
distributionSpec,
- groupExpression, getLogicalProperties(), conjuncts,
selectedPartitions, tableSample, tableSnapshot);
- }
-
- @Override
- public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression>
groupExpression,
- Optional<LogicalProperties> logicalProperties, List<Plan>
children) {
- return new PhysicalFileScan(relationId, getTable(), qualifier,
distributionSpec,
- groupExpression, logicalProperties.get(), conjuncts,
selectedPartitions, tableSample, tableSnapshot);
- }
-
- @Override
- public ExternalTable getTable() {
- return (ExternalTable) table;
- }
-
- @Override
- public PhysicalFileScan withPhysicalPropertiesAndStats(PhysicalProperties
physicalProperties,
- Statistics statistics) {
- return new PhysicalFileScan(relationId, getTable(), qualifier,
distributionSpec,
- groupExpression, getLogicalProperties(), physicalProperties,
statistics, conjuncts,
- selectedPartitions, tableSample, tableSnapshot);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
index 046964c351d..0871e3dca37 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
@@ -26,6 +26,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExternalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -40,6 +41,7 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOla
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHudiScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
@@ -105,6 +107,10 @@ public interface RelationVisitor<R, C> {
return visitLogicalExternalRelation(fileScan, context);
}
+ default R visitLogicalHudiScan(LogicalHudiScan fileScan, C context) {
+ return visitLogicalFileScan(fileScan, context);
+ }
+
default R visitLogicalJdbcScan(LogicalJdbcScan jdbcScan, C context) {
return visitLogicalExternalRelation(jdbcScan, context);
}
@@ -154,6 +160,10 @@ public interface RelationVisitor<R, C> {
return visitPhysicalCatalogRelation(fileScan, context);
}
+ default R visitPhysicalHudiScan(PhysicalHudiScan hudiScan, C context) {
+ return visitPhysicalFileScan(hudiScan, context);
+ }
+
default R visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, C context) {
return visitPhysicalCatalogRelation(jdbcScan, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java
index a625e4490e1..b145338ff81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.qe.ConnectContext;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.List;
@@ -42,13 +43,24 @@ public class RelationUtil {
case 1: { // table
// Use current database name from catalog.
String tableName = nameParts.get(0);
- String catalogName = context.getCurrentCatalog().getName();
+ CatalogIf catalogIf = context.getCurrentCatalog();
+ if (catalogIf == null) {
+ throw new IllegalStateException("Current catalog is not
set.");
+ }
+ String catalogName = catalogIf.getName();
String dbName = context.getDatabase();
+ if (Strings.isNullOrEmpty(dbName)) {
+ throw new IllegalStateException("Current database is not
set.");
+ }
return ImmutableList.of(catalogName, dbName, tableName);
}
case 2: { // db.table
// Use database name from table name parts.
- String catalogName = context.getCurrentCatalog().getName();
+ CatalogIf catalogIf = context.getCurrentCatalog();
+ if (catalogIf == null) {
+ throw new IllegalStateException("Current catalog is not
set.");
+ }
+ String catalogName = catalogIf.getName();
// if the relation is view, nameParts.get(0) is dbName.
String dbName = nameParts.get(0);
String tableName = nameParts.get(1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index d60ab89c9c7..b91d5378cb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -98,6 +98,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -1964,7 +1965,14 @@ public class SingleNodePlanner {
TableIf table = tblRef.getDesc().getTable();
switch (((HMSExternalTable) table).getDlaType()) {
case HUDI:
- scanNode = new HudiScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ // Old planner does not support hudi incremental read,
+ // so just pass Optional.empty() to HudiScanNode
+ if (tblRef.getScanParams() != null) {
+ throw new UserException("Hudi incremental read is
not supported, "
+ + "please set enable_nereids_planner =
true to enable new optimizer");
+ }
+ scanNode = new HudiScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true,
+ Optional.empty(), Optional.empty());
break;
case ICEBERG:
scanNode = new IcebergScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
index 8dd8421ebc2..b968d27b5b8 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/external/hms/HmsCatalogTest.java
@@ -33,6 +33,7 @@ import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
import org.apache.doris.qe.SessionVariable;
@@ -133,6 +134,10 @@ public class HmsCatalogTest extends AnalyzeCheckTestBase {
tbl.getDatabase();
minTimes = 0;
result = db;
+
+ tbl.getDlaType();
+ minTimes = 0;
+ result = DLAType.HIVE;
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]