This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 2255d7aeed3 branch-4.0:[fix](mtmv) Fix hudi materialized view union
all rewritten plan execute fail because of invalid slot (#58643) (#58725)
2255d7aeed3 is described below
commit 2255d7aeed36f9eee328e6360e0604b028490eb2
Author: seawinde <[email protected]>
AuthorDate: Fri Dec 5 12:16:56 2025 +0800
branch-4.0:[fix](mtmv) Fix hudi materialized view union all rewritten plan
execute fail because of invalid slot (#58643) (#58725)
pr: https://github.com/apache/doris/pull/58643
commitId: a128c14d
---
.../mv/AbstractMaterializedViewRule.java | 2 +-
.../rules/exploration/mv/PartitionCompensator.java | 41 ++++-
.../trees/plans/logical/LogicalHudiScan.java | 2 +-
.../exploration/mv/PartitionCompensatorTest.java | 199 +++++++++++++++++++++
.../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 8 +-
5 files changed, 240 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 69232d6e261..036ffcd2251 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -316,7 +316,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo,
Set<String>>> invalidPartitions;
- if (PartitionCompensator.needUnionRewrite(materializationContext)
+ if (PartitionCompensator.needUnionRewrite(materializationContext,
cascadesContext.getStatementContext())
&& sessionVariable.isEnableMaterializedViewUnionRewrite())
{
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
Map<List<String>, Set<String>> queryUsedPartitions =
PartitionCompensator.getQueryUsedPartitions(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 22c3540b5cb..3fe966864d0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
@@ -223,14 +225,38 @@ public class PartitionCompensator {
/**
* Check if need union compensate or not
+ * If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST, should not do union compensate
+ * because it means query all partitions from base table and prune
partition failed
*/
- public static boolean needUnionRewrite(MaterializationContext
materializationContext) {
+ public static boolean needUnionRewrite(MaterializationContext
materializationContext,
+ StatementContext statementContext)
throws AnalysisException {
if (!(materializationContext instanceof AsyncMaterializationContext)) {
return false;
}
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
PartitionType type = mtmv.getPartitionInfo().getType();
- List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
+ MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
+ List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
+ Set<MTMVRelatedTableIf> pctTables = mvPartitionInfo.getPctTables();
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap =
+ statementContext.getTableUsedPartitionNameMap();
+ for (MTMVRelatedTableIf pctTable : pctTables) {
+ if (pctTable instanceof ExternalTable && !((ExternalTable)
pctTable).supportInternalPartitionPruned()) {
+ // if pct table is external table and not support internal
partition pruned,
+ // we consider query all partitions from pct table, this would
cause loop union compensate,
+ // so we skip union compensate in this case
+ return false;
+ }
+ Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+ =
tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+ if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+ ||
tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) {
+ // If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST,
+ // should not do union compensate, because it means query all
partitions from base table
+ // and prune partition failed
+ return false;
+ }
+ }
return !PartitionType.UNPARTITIONED.equals(type) &&
!pctInfos.isEmpty();
}
@@ -238,11 +264,11 @@ public class PartitionCompensator {
* Get query used partitions
* this is calculated from tableUsedPartitionNameMap and tables in
statementContext
*
- * @param customRelationIdSet if union compensate occurs, the new query
used partitions is changed,
+ * @param currentUsedRelationIdSet if union compensate occurs, the new
query used partitions is changed,
* so need to get used partitions by relation id set
*/
public static Map<List<String>, Set<String>>
getQueryUsedPartitions(StatementContext statementContext,
- BitSet customRelationIdSet) {
+ BitSet currentUsedRelationIdSet) {
// get table used partitions
// if table is not in statementContext().getTables() which means the
table is partition prune as empty relation
Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = statementContext
@@ -267,6 +293,13 @@ public class PartitionCompensator {
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
null);
continue tableLoop;
}
+ // If currentUsedRelationIdSet is not empty, need check
relation id to get concrete used partitions
+ BitSet usedPartitionRelation = new BitSet();
+
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+ if (!currentUsedRelationIdSet.isEmpty()
+ &&
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+ continue;
+ }
usedPartitionSet.addAll(tableUsedPartitionPair.value());
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
usedPartitionSet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 9a123a04d2b..7f616d7045f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -161,7 +161,7 @@ public class LogicalHudiScan extends LogicalFileScan {
public LogicalHudiScan withRelationId(RelationId relationId) {
return new LogicalHudiScan(relationId, (ExternalTable) table,
qualifier,
selectedPartitions, tableSample, tableSnapshot, scanParams,
incrementalRelation,
- operativeSlots, virtualColumns, groupExpression,
Optional.of(getLogicalProperties()));
+ operativeSlots, virtualColumns, groupExpression, Optional.empty());
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 672628e7d1e..17d75f93fcf 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,21 +17,39 @@
package org.apache.doris.nereids.rules.exploration.mv;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseColInfo;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public class PartitionCompensatorTest extends TestWithFeService {
@@ -191,4 +209,185 @@ public class PartitionCompensatorTest extends
TestWithFeService {
Assertions.assertEquals(orderTableUsedPartition,
ImmutableSet.of("p1", "p2", "p3", "p4"));
});
}
+
+ @Test
+ public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws
Exception {
+ MaterializationContext ctx1 = mockCtx(
+ PartitionType.UNPARTITIONED,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(),
+ false);
+ StatementContext sc1 = Mockito.mock(StatementContext.class);
+
Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1,
sc1));
+
+ MaterializationContext ctx2 = mockCtx(
+ PartitionType.RANGE,
+ Collections.emptyList(),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc2 = Mockito.mock(StatementContext.class);
+
Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2,
sc2));
+ }
+
+ @Test
+ public void testNeedUnionRewriteEmptyPctTables() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.RANGE,
+ ImmutableList.of(),
+ Collections.emptySet(),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ true);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNeedUnionRewritePositive() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+ BaseTableInfo tableInfo = newBaseTableInfo();
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ ImmutableList.of(new BaseColInfo("c", tableInfo)),
+ ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
+ ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t =
ArrayListMultimap.create();
+ t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+ Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsAllAndPartial() {
+ // Prepare qualifiers
+ List<String> lineitemQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ List<String> ordersQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"orders_list_partition");
+
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+
+ tableUsedPartitionNameMap.put(lineitemQualifier,
PartitionCompensator.ALL_PARTITIONS);
+
+ RelationId ridA = new RelationId(1);
+ RelationId ridB = new RelationId(2);
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA,
ImmutableSet.of("p1", "p2")));
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB,
ImmutableSet.of("p3")));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"),
result.get(ordersQualifier));
+
+ BitSet filterRidA = new BitSet();
+ filterRidA.set(ridA.asInt());
+ Map<List<String>, Set<String>> resultRidA =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidA);
+ Assertions.assertNull(resultRidA.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2"),
resultRidA.get(ordersQualifier));
+
+ BitSet filterRidB = new BitSet();
+ filterRidB.set(ridB.asInt());
+ Map<List<String>, Set<String>> resultRidB =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidB);
+ Assertions.assertNull(resultRidB.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p3"),
resultRidB.get(ordersQualifier));
+
+ tableUsedPartitionNameMap.put(ordersQualifier,
PartitionCompensator.ALL_PARTITIONS);
+ Map<List<String>, Set<String>> resultAllOrders =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+ List<String> qualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+ // Put an empty set via a distinct relation id to simulate no
partitions used
+ RelationId rid = new RelationId(3);
+ tableUsedPartitionNameMap.put(qualifier, Pair.of(rid,
ImmutableSet.of()));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+ }
+
+ private static MaterializationContext mockCtx(
+ PartitionType type,
+ List<BaseColInfo> pctInfos,
+ Set<MTMVRelatedTableIf> pctTables,
+ boolean externalNoPrune) throws AnalysisException {
+
+ MTMV mtmv = Mockito.mock(MTMV.class);
+ PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+ Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+ Mockito.when(pi.getType()).thenReturn(type);
+
+ MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+ Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+ Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos);
+ Mockito.when(mpi.getPctTables()).thenReturn(pctTables);
+
+ if (externalNoPrune) {
+ HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+ Set<TableIf> tbls = new HashSet<>(pctTables);
+ tbls.add(ext);
+ Mockito.when(mpi.getPctTables()).thenReturn(
+
tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet()));
+ }
+
+ AsyncMaterializationContext ctx =
Mockito.mock(AsyncMaterializationContext.class);
+ Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+ return ctx;
+ }
+
+ private static BaseTableInfo newBaseTableInfo() {
+ CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+ Mockito.when(catalog.getId()).thenReturn(1L);
+ Mockito.when(catalog.getName()).thenReturn("internal");
+
+ DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+ Mockito.when(db.getId()).thenReturn(2L);
+ Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+ Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+ TableIf table = Mockito.mock(TableIf.class);
+ Mockito.when(table.getId()).thenReturn(3L);
+ Mockito.when(table.getName()).thenReturn("t");
+ Mockito.when(table.getDatabase()).thenReturn(db);
+
+ return new BaseTableInfo(table);
+ }
}
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
---
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
- def explainOnePartition = sql """ explain ${mvSql} """
- logger.info("explainOnePartition: " + explainOnePartition.toString())
- assertTrue(explainOnePartition.toString().contains("VUNION"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_one_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_auto "SELECT * FROM ${mvName} "
- def explainAllPartition = sql """ explain ${mvSql}; """
- logger.info("explainAllPartition: " + explainAllPartition.toString())
- assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_all_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]