This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a9361eae7f7 branch-3.1: [fix](mtmv) Fix hudi materialized view union
all rewritten plan execute fail because of invalid slot #58643 (#58724)
a9361eae7f7 is described below
commit a9361eae7f7f9dcc11dd26c6fa5626aa8553939f
Author: seawinde <[email protected]>
AuthorDate: Fri Dec 5 17:59:46 2025 +0800
branch-3.1: [fix](mtmv) Fix hudi materialized view union all rewritten plan
execute fail because of invalid slot #58643 (#58724)
picked from #58643
---
.../mv/AbstractMaterializedViewRule.java | 4 +-
.../rules/exploration/mv/PartitionCompensator.java | 42 +++++-
.../exploration/mv/PartitionCompensatorTest.java | 159 +++++++++++++++++++++
.../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy | 8 +-
4 files changed, 200 insertions(+), 13 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 3a6f08f8cc8..c6c44b9fb4a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -198,7 +198,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
* only one materialization every time. Different query pattern should
override the sub logic.
*/
protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext
cascadesContext,
- MaterializationContext materializationContext) {
+ MaterializationContext materializationContext) throws
AnalysisException {
List<Plan> rewriteResults = new ArrayList<>();
StructInfo viewStructInfo = materializationContext.getStructInfo();
MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(),
viewStructInfo.getRelations());
@@ -295,7 +295,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
continue;
}
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo,
Set<String>>> invalidPartitions;
- if (PartitionCompensator.needUnionRewrite(materializationContext)
+ if (PartitionCompensator.needUnionRewrite(materializationContext,
cascadesContext.getStatementContext())
&& sessionVariable.isEnableMaterializedViewUnionRewrite())
{
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
BaseTableInfo relatedTableInfo =
mtmv.getMvPartitionInfo().getRelatedTableInfo();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 3c086de0919..f17d5364ff0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -151,26 +153,49 @@ public class PartitionCompensator {
/**
* Check if need union compensate or not
+ * If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST, should not do union compensate
+ * because it means query all partitions from base table and prune
partition failed
*/
- public static boolean needUnionRewrite(MaterializationContext
materializationContext) {
+ public static boolean needUnionRewrite(MaterializationContext
materializationContext,
+ StatementContext statementContext)
throws AnalysisException {
if (!(materializationContext instanceof AsyncMaterializationContext)) {
return false;
}
MTMV mtmv = ((AsyncMaterializationContext)
materializationContext).getMtmv();
- PartitionType type = mtmv.getPartitionInfo().getType();
BaseTableInfo relatedTableInfo =
mtmv.getMvPartitionInfo().getRelatedTableInfo();
- return !PartitionType.UNPARTITIONED.equals(type) && relatedTableInfo
!= null;
+ if (relatedTableInfo == null) {
+ return false;
+ }
+ if
(PartitionType.UNPARTITIONED.equals(mtmv.getPartitionInfo().getType())) {
+ return false;
+ }
+ MTMVRelatedTableIf pctTable =
mtmv.getMvPartitionInfo().getRelatedTable();
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap =
+ statementContext.getTableUsedPartitionNameMap();
+ if (pctTable instanceof ExternalTable && !((ExternalTable)
pctTable).supportInternalPartitionPruned()) {
+ // if pct table is external table and not support internal
partition pruned,
+ // we consider query all partitions from pct table, this would
cause loop union compensate,
+ // so we skip union compensate in this case
+ return false;
+ }
+ Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+ = tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+ // If query base table all partitions with ALL_PARTITIONS or
ALL_PARTITIONS_LIST,
+ // should not do union compensate, because it means query all
partitions from base table
+ // and prune partition failed
+ return !ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+ &&
tableUsedPartitions.stream().noneMatch(ALL_PARTITIONS::equals);
}
/**
* Get query used partitions
* this is calculated from tableUsedPartitionNameMap and tables in
statementContext
*
- * @param customRelationIdSet if union compensate occurs, the new query
used partitions is changed,
+ * @param currentUsedRelationIdSet if union compensate occurs, the new
query used partitions is changed,
* so need to get used partitions by relation id set
*/
public static Map<List<String>, Set<String>>
getQueryUsedPartitions(StatementContext statementContext,
- BitSet customRelationIdSet) {
+ BitSet currentUsedRelationIdSet) {
// get table used partitions
// if table is not in statementContext().getTables() which means the
table is partition prune as empty relation
Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap = statementContext
@@ -195,6 +220,13 @@ public class PartitionCompensator {
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
null);
continue tableLoop;
}
+ // If currentUsedRelationIdSet is not empty, need check
relation id to get concrete used partitions
+ BitSet usedPartitionRelation = new BitSet();
+
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+ if (!currentUsedRelationIdSet.isEmpty()
+ &&
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+ continue;
+ }
usedPartitionSet.addAll(tableUsedPartitionPair.value());
}
queryUsedRelatedTablePartitionsMap.put(queryUsedTable,
usedPartitionSet);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 25c0a679d8d..5efdff2b12e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,16 +17,30 @@
package org.apache.doris.nereids.rules.exploration.mv;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.util.BitSet;
import java.util.List;
@@ -190,4 +204,149 @@ public class PartitionCompensatorTest extends
TestWithFeService {
Assertions.assertEquals(orderTableUsedPartition,
ImmutableSet.of("p1", "p2", "p3", "p4"));
});
}
+
+ @Test
+ public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ newBaseTableInfo(),
+ Mockito.mock(MTMVRelatedTableIf.class),
+ true);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNeedUnionRewritePositive() throws Exception {
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ newBaseTableInfo(),
+ Mockito.mock(MTMVRelatedTableIf.class),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+ Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+ BaseTableInfo tableInfo = newBaseTableInfo();
+ MaterializationContext ctx = mockCtx(
+ PartitionType.LIST,
+ tableInfo,
+ Mockito.mock(MTMVRelatedTableIf.class),
+ false);
+ StatementContext sc = Mockito.mock(StatementContext.class);
+
+ ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t =
ArrayListMultimap.create();
+ t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+ Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+ Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsAllAndPartial() {
+ // Prepare qualifiers
+ List<String> lineitemQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ List<String> ordersQualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"orders_list_partition");
+
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+
+ tableUsedPartitionNameMap.put(lineitemQualifier,
PartitionCompensator.ALL_PARTITIONS);
+
+ RelationId ridA = new RelationId(1);
+ RelationId ridB = new RelationId(2);
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA,
ImmutableSet.of("p1", "p2")));
+ tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB,
ImmutableSet.of("p3")));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"),
result.get(ordersQualifier));
+
+ BitSet filterRidA = new BitSet();
+ filterRidA.set(ridA.asInt());
+ Map<List<String>, Set<String>> resultRidA =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidA);
+ Assertions.assertNull(resultRidA.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p1", "p2"),
resultRidA.get(ordersQualifier));
+
+ BitSet filterRidB = new BitSet();
+ filterRidB.set(ridB.asInt());
+ Map<List<String>, Set<String>> resultRidB =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), filterRidB);
+ Assertions.assertNull(resultRidB.get(lineitemQualifier));
+ Assertions.assertEquals(ImmutableSet.of("p3"),
resultRidB.get(ordersQualifier));
+
+ tableUsedPartitionNameMap.put(ordersQualifier,
PartitionCompensator.ALL_PARTITIONS);
+ Map<List<String>, Set<String>> resultAllOrders =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+ }
+
+ @Test
+ public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+ List<String> qualifier = ImmutableList.of(
+ "internal", "partition_compensate_test",
"lineitem_list_partition");
+ Multimap<List<String>, Pair<RelationId, Set<String>>>
tableUsedPartitionNameMap
+ =
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+ tableUsedPartitionNameMap.clear();
+ // Put an empty set via a distinct relation id to simulate no
partitions used
+ RelationId rid = new RelationId(3);
+ tableUsedPartitionNameMap.put(qualifier, Pair.of(rid,
ImmutableSet.of()));
+
+ Map<List<String>, Set<String>> result =
PartitionCompensator.getQueryUsedPartitions(
+ connectContext.getStatementContext(), new BitSet());
+ Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+ }
+
+ private static MaterializationContext mockCtx(
+ PartitionType type,
+ BaseTableInfo pctInfo,
+ MTMVRelatedTableIf pctTable,
+ boolean externalNoPrune) throws AnalysisException {
+
+ MTMV mtmv = Mockito.mock(MTMV.class);
+ PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+ Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+ Mockito.when(pi.getType()).thenReturn(type);
+
+ MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+ Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+ Mockito.when(mpi.getRelatedTableInfo()).thenReturn(pctInfo);
+ Mockito.when(mpi.getRelatedTable()).thenReturn(pctTable);
+
+ if (externalNoPrune) {
+ HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+ Mockito.when(mpi.getRelatedTable()).thenReturn(ext);
+ }
+
+ AsyncMaterializationContext ctx =
Mockito.mock(AsyncMaterializationContext.class);
+ Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+ return ctx;
+ }
+
+ private static BaseTableInfo newBaseTableInfo() {
+ CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+ Mockito.when(catalog.getId()).thenReturn(1L);
+ Mockito.when(catalog.getName()).thenReturn("internal");
+
+ DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+ Mockito.when(db.getId()).thenReturn(2L);
+ Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+ Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+ TableIf table = Mockito.mock(TableIf.class);
+ Mockito.when(table.getId()).thenReturn(3L);
+ Mockito.when(table.getName()).thenReturn("t");
+ Mockito.when(table.getDatabase()).thenReturn(db);
+
+ return new BaseTableInfo(table);
+ }
}
diff --git
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
---
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
- def explainOnePartition = sql """ explain ${mvSql} """
- logger.info("explainOnePartition: " + explainOnePartition.toString())
- assertTrue(explainOnePartition.toString().contains("VUNION"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_one_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv",
"p2,external,hudi,external_remote,external_remot
waitingMTMVTaskFinishedByMvName(mvName)
order_qt_refresh_auto "SELECT * FROM ${mvName} "
- def explainAllPartition = sql """ explain ${mvSql}; """
- logger.info("explainAllPartition: " + explainAllPartition.toString())
- assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+ mv_rewrite_success(mvSql, mvName)
order_qt_refresh_all_partition_rewrite "${mvSql}"
mv_rewrite_success("${mvSql}", "${mvName}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]