This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a128c14d396 [fix](mtmv) Fix hudi materialized view union all rewritten 
plan execute fail because of invalid slot (#58643)
a128c14d396 is described below

commit a128c14d396d10ac0e85624cb53f7c8cb28cbc85
Author: seawinde <[email protected]>
AuthorDate: Thu Dec 4 14:20:09 2025 +0800

    [fix](mtmv) Fix hudi materialized view union all rewritten plan execute 
fail because of invalid slot (#58643)
    
    ### What problem does this PR solve?
    
    Related PR: #57558 #58413
    
    Problem Summary:
    
    This fix addresses the following three issues:
    1. When invoking the method
    org.apache.doris.nereids.trees.plans.logical.LogicalHudiScan#withRelationId,
    the output needs to be recalculated to meet expectations.
    2. After compensating with a union all due to partial partition
    invalidation of a materialized view, during the next round of
    transparent rewriting, the rewriting for the child of the union
    allshould use the query partitioncorresponding to the specific relation
    id to prevent infinite loops.
    3. Currently, in the `test_hudi_rewrite_mtmv` test, if the plan
    rewritten by the materialized view transparent rewriting is not selected
    by the CBO, it is difficult to troubleshoot because explain memo planis
    not used. Therefore, the corresponding test method is modified.
---
 .../mv/AbstractMaterializedViewRule.java           |   2 +-
 .../rules/exploration/mv/PartitionCompensator.java |  41 ++++-
 .../trees/plans/logical/LogicalHudiScan.java       |   2 +-
 .../exploration/mv/PartitionCompensatorTest.java   | 199 +++++++++++++++++++++
 .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy   |   8 +-
 5 files changed, 240 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 69232d6e261..036ffcd2251 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -316,7 +316,7 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
                 continue;
             }
             Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo, 
Set<String>>> invalidPartitions;
-            if (PartitionCompensator.needUnionRewrite(materializationContext)
+            if (PartitionCompensator.needUnionRewrite(materializationContext, 
cascadesContext.getStatementContext())
                     && sessionVariable.isEnableMaterializedViewUnionRewrite()) 
{
                 MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
                 Map<List<String>, Set<String>> queryUsedPartitions = 
PartitionCompensator.getQueryUsedPartitions(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 22c3540b5cb..3fe966864d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.mtmv.BaseColInfo;
 import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
@@ -223,14 +225,38 @@ public class PartitionCompensator {
 
     /**
      * Check if need union compensate or not
+     * If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST, should not do union compensate
+     * because it means query all partitions from base table and prune 
partition failed
      */
-    public static boolean needUnionRewrite(MaterializationContext 
materializationContext) {
+    public static boolean needUnionRewrite(MaterializationContext 
materializationContext,
+                                           StatementContext statementContext) 
throws AnalysisException {
         if (!(materializationContext instanceof AsyncMaterializationContext)) {
             return false;
         }
         MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
         PartitionType type = mtmv.getPartitionInfo().getType();
-        List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
+        MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
+        List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
+        Set<MTMVRelatedTableIf> pctTables = mvPartitionInfo.getPctTables();
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap =
+                statementContext.getTableUsedPartitionNameMap();
+        for (MTMVRelatedTableIf pctTable : pctTables) {
+            if (pctTable instanceof ExternalTable && !((ExternalTable) 
pctTable).supportInternalPartitionPruned()) {
+                // if pct table is external table and not support internal 
partition pruned,
+                // we consider query all partitions from pct table, this would 
cause loop union compensate,
+                // so we skip union compensate in this case
+                return false;
+            }
+            Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+                    = 
tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+            if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+                    || 
tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) {
+                // If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST,
+                // should not do union compensate, because it means query all 
partitions from base table
+                // and prune partition failed
+                return false;
+            }
+        }
         return !PartitionType.UNPARTITIONED.equals(type) && 
!pctInfos.isEmpty();
     }
 
@@ -238,11 +264,11 @@ public class PartitionCompensator {
      * Get query used partitions
      * this is calculated from tableUsedPartitionNameMap and tables in 
statementContext
      *
-     * @param customRelationIdSet if union compensate occurs, the new query 
used partitions is changed,
+     * @param currentUsedRelationIdSet if union compensate occurs, the new 
query used partitions is changed,
      *         so need to get used partitions by relation id set
      */
     public static Map<List<String>, Set<String>> 
getQueryUsedPartitions(StatementContext statementContext,
-            BitSet customRelationIdSet) {
+            BitSet currentUsedRelationIdSet) {
         // get table used partitions
         // if table is not in statementContext().getTables() which means the 
table is partition prune as empty relation
         Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap = statementContext
@@ -267,6 +293,13 @@ public class PartitionCompensator {
                     queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
null);
                     continue tableLoop;
                 }
+                // If currentUsedRelationIdSet is not empty, need check 
relation id to get concrete used partitions
+                BitSet usedPartitionRelation = new BitSet();
+                
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+                if (!currentUsedRelationIdSet.isEmpty()
+                        && 
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+                    continue;
+                }
                 usedPartitionSet.addAll(tableUsedPartitionPair.value());
             }
             queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
usedPartitionSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 0e10bb79920..ca380dbdddd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -183,7 +183,7 @@ public class LogicalHudiScan extends LogicalFileScan {
     public LogicalHudiScan withRelationId(RelationId relationId) {
         return new LogicalHudiScan(relationId, (ExternalTable) table, 
qualifier,
             selectedPartitions, tableSample, tableSnapshot, scanParams, 
incrementalRelation,
-                operativeSlots, virtualColumns, groupExpression, 
Optional.of(getLogicalProperties()),
+                operativeSlots, virtualColumns, groupExpression, 
Optional.empty(),
                 tableAlias, cachedOutputs);
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 672628e7d1e..17d75f93fcf 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,21 +17,39 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseColInfo;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class PartitionCompensatorTest extends TestWithFeService {
 
@@ -191,4 +209,185 @@ public class PartitionCompensatorTest extends 
TestWithFeService {
                             Assertions.assertEquals(orderTableUsedPartition, 
ImmutableSet.of("p1", "p2", "p3", "p4"));
                         });
     }
+
+    @Test
+    public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws 
Exception {
+        MaterializationContext ctx1 = mockCtx(
+                PartitionType.UNPARTITIONED,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(),
+                false);
+        StatementContext sc1 = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1, 
sc1));
+
+        MaterializationContext ctx2 = mockCtx(
+                PartitionType.RANGE,
+                Collections.emptyList(),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc2 = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2, 
sc2));
+    }
+
+    @Test
+    public void testNeedUnionRewriteEmptyPctTables() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.RANGE,
+                ImmutableList.of(),
+                Collections.emptySet(),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                true);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNeedUnionRewritePositive() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+        BaseTableInfo tableInfo = newBaseTableInfo();
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", tableInfo)),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+
+        ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t = 
ArrayListMultimap.create();
+        t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+        Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsAllAndPartial() {
+        // Prepare qualifiers
+        List<String> lineitemQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        List<String> ordersQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"orders_list_partition");
+
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+
+        tableUsedPartitionNameMap.put(lineitemQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+
+        RelationId ridA = new RelationId(1);
+        RelationId ridB = new RelationId(2);
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, 
ImmutableSet.of("p1", "p2")));
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, 
ImmutableSet.of("p3")));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), 
result.get(ordersQualifier));
+
+        BitSet filterRidA = new BitSet();
+        filterRidA.set(ridA.asInt());
+        Map<List<String>, Set<String>> resultRidA = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidA);
+        Assertions.assertNull(resultRidA.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2"), 
resultRidA.get(ordersQualifier));
+
+        BitSet filterRidB = new BitSet();
+        filterRidB.set(ridB.asInt());
+        Map<List<String>, Set<String>> resultRidB = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidB);
+        Assertions.assertNull(resultRidB.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p3"), 
resultRidB.get(ordersQualifier));
+
+        tableUsedPartitionNameMap.put(ordersQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+        Map<List<String>, Set<String>> resultAllOrders = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+        List<String> qualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+        // Put an empty set via a distinct relation id to simulate no 
partitions used
+        RelationId rid = new RelationId(3);
+        tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, 
ImmutableSet.of()));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+    }
+
+    private static MaterializationContext mockCtx(
+            PartitionType type,
+            List<BaseColInfo> pctInfos,
+            Set<MTMVRelatedTableIf> pctTables,
+            boolean externalNoPrune) throws AnalysisException {
+
+        MTMV mtmv = Mockito.mock(MTMV.class);
+        PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+        Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+        Mockito.when(pi.getType()).thenReturn(type);
+
+        MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+        Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+        Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos);
+        Mockito.when(mpi.getPctTables()).thenReturn(pctTables);
+
+        if (externalNoPrune) {
+            HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+            
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+            Set<TableIf> tbls = new HashSet<>(pctTables);
+            tbls.add(ext);
+            Mockito.when(mpi.getPctTables()).thenReturn(
+                    
tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet()));
+        }
+
+        AsyncMaterializationContext ctx = 
Mockito.mock(AsyncMaterializationContext.class);
+        Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+        return ctx;
+    }
+
+    private static BaseTableInfo newBaseTableInfo() {
+        CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+        Mockito.when(catalog.getId()).thenReturn(1L);
+        Mockito.when(catalog.getName()).thenReturn("internal");
+
+        DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+        Mockito.when(db.getId()).thenReturn(2L);
+        Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+        Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+        TableIf table = Mockito.mock(TableIf.class);
+        Mockito.when(table.getId()).thenReturn(3L);
+        Mockito.when(table.getName()).thenReturn("t");
+        Mockito.when(table.getDatabase()).thenReturn(db);
+
+        return new BaseTableInfo(table);
+    }
 }
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
--- 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
 
-    def explainOnePartition = sql """ explain  ${mvSql} """
-    logger.info("explainOnePartition: " + explainOnePartition.toString())
-    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_one_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_auto "SELECT * FROM ${mvName} "
 
-    def explainAllPartition = sql """ explain  ${mvSql}; """
-    logger.info("explainAllPartition: " + explainAllPartition.toString())
-    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_all_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to