This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 2255d7aeed3 branch-4.0:[fix](mtmv) Fix hudi materialized view union 
all rewritten plan execute fail because of invalid slot (#58643) (#58725)
2255d7aeed3 is described below

commit 2255d7aeed36f9eee328e6360e0604b028490eb2
Author: seawinde <[email protected]>
AuthorDate: Fri Dec 5 12:16:56 2025 +0800

    branch-4.0:[fix](mtmv) Fix hudi materialized view union all rewritten plan 
execute fail because of invalid slot (#58643) (#58725)
    
    pr: https://github.com/apache/doris/pull/58643
    commitId: a128c14d
---
 .../mv/AbstractMaterializedViewRule.java           |   2 +-
 .../rules/exploration/mv/PartitionCompensator.java |  41 ++++-
 .../trees/plans/logical/LogicalHudiScan.java       |   2 +-
 .../exploration/mv/PartitionCompensatorTest.java   | 199 +++++++++++++++++++++
 .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy   |   8 +-
 5 files changed, 240 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 69232d6e261..036ffcd2251 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -316,7 +316,7 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
                 continue;
             }
             Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo, 
Set<String>>> invalidPartitions;
-            if (PartitionCompensator.needUnionRewrite(materializationContext)
+            if (PartitionCompensator.needUnionRewrite(materializationContext, 
cascadesContext.getStatementContext())
                     && sessionVariable.isEnableMaterializedViewUnionRewrite()) 
{
                 MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
                 Map<List<String>, Set<String>> queryUsedPartitions = 
PartitionCompensator.getQueryUsedPartitions(
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 22c3540b5cb..3fe966864d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.mtmv.BaseColInfo;
 import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
@@ -223,14 +225,38 @@ public class PartitionCompensator {
 
     /**
      * Check if need union compensate or not
+     * If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST, should not do union compensate
+     * because it means query all partitions from base table and prune 
partition failed
      */
-    public static boolean needUnionRewrite(MaterializationContext 
materializationContext) {
+    public static boolean needUnionRewrite(MaterializationContext 
materializationContext,
+                                           StatementContext statementContext) 
throws AnalysisException {
         if (!(materializationContext instanceof AsyncMaterializationContext)) {
             return false;
         }
         MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
         PartitionType type = mtmv.getPartitionInfo().getType();
-        List<BaseColInfo> pctInfos = mtmv.getMvPartitionInfo().getPctInfos();
+        MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
+        List<BaseColInfo> pctInfos = mvPartitionInfo.getPctInfos();
+        Set<MTMVRelatedTableIf> pctTables = mvPartitionInfo.getPctTables();
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap =
+                statementContext.getTableUsedPartitionNameMap();
+        for (MTMVRelatedTableIf pctTable : pctTables) {
+            if (pctTable instanceof ExternalTable && !((ExternalTable) 
pctTable).supportInternalPartitionPruned()) {
+                // if pct table is external table and not support internal 
partition pruned,
+                // we consider query all partitions from pct table, this would 
cause loop union compensate,
+                // so we skip union compensate in this case
+                return false;
+            }
+            Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+                    = 
tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+            if (ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+                    || 
tableUsedPartitions.stream().anyMatch(ALL_PARTITIONS::equals)) {
+                // If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST,
+                // should not do union compensate, because it means query all 
partitions from base table
+                // and prune partition failed
+                return false;
+            }
+        }
         return !PartitionType.UNPARTITIONED.equals(type) && 
!pctInfos.isEmpty();
     }
 
@@ -238,11 +264,11 @@ public class PartitionCompensator {
      * Get query used partitions
      * this is calculated from tableUsedPartitionNameMap and tables in 
statementContext
      *
-     * @param customRelationIdSet if union compensate occurs, the new query 
used partitions is changed,
+     * @param currentUsedRelationIdSet if union compensate occurs, the new 
query used partitions is changed,
      *         so need to get used partitions by relation id set
      */
     public static Map<List<String>, Set<String>> 
getQueryUsedPartitions(StatementContext statementContext,
-            BitSet customRelationIdSet) {
+            BitSet currentUsedRelationIdSet) {
         // get table used partitions
         // if table is not in statementContext().getTables() which means the 
table is partition prune as empty relation
         Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap = statementContext
@@ -267,6 +293,13 @@ public class PartitionCompensator {
                     queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
null);
                     continue tableLoop;
                 }
+                // If currentUsedRelationIdSet is not empty, need check 
relation id to get concrete used partitions
+                BitSet usedPartitionRelation = new BitSet();
+                
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+                if (!currentUsedRelationIdSet.isEmpty()
+                        && 
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+                    continue;
+                }
                 usedPartitionSet.addAll(tableUsedPartitionPair.value());
             }
             queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
usedPartitionSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
index 9a123a04d2b..7f616d7045f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHudiScan.java
@@ -161,7 +161,7 @@ public class LogicalHudiScan extends LogicalFileScan {
     public LogicalHudiScan withRelationId(RelationId relationId) {
         return new LogicalHudiScan(relationId, (ExternalTable) table, 
qualifier,
             selectedPartitions, tableSample, tableSnapshot, scanParams, 
incrementalRelation,
-            operativeSlots, virtualColumns, groupExpression, 
Optional.of(getLogicalProperties()));
+            operativeSlots, virtualColumns, groupExpression, Optional.empty());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 672628e7d1e..17d75f93fcf 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,21 +17,39 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseColInfo;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 public class PartitionCompensatorTest extends TestWithFeService {
 
@@ -191,4 +209,185 @@ public class PartitionCompensatorTest extends 
TestWithFeService {
                             Assertions.assertEquals(orderTableUsedPartition, 
ImmutableSet.of("p1", "p2", "p3", "p4"));
                         });
     }
+
+    @Test
+    public void testNeedUnionRewriteUnpartitionedOrNoPctInfos() throws 
Exception {
+        MaterializationContext ctx1 = mockCtx(
+                PartitionType.UNPARTITIONED,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(),
+                false);
+        StatementContext sc1 = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc1.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx1, 
sc1));
+
+        MaterializationContext ctx2 = mockCtx(
+                PartitionType.RANGE,
+                Collections.emptyList(),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc2 = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc2.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx2, 
sc2));
+    }
+
+    @Test
+    public void testNeedUnionRewriteEmptyPctTables() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.RANGE,
+                ImmutableList.of(),
+                Collections.emptySet(),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                true);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNeedUnionRewritePositive() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", newBaseTableInfo())),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+        BaseTableInfo tableInfo = newBaseTableInfo();
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                ImmutableList.of(new BaseColInfo("c", tableInfo)),
+                ImmutableSet.of(Mockito.mock(MTMVRelatedTableIf.class)),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+
+        ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t = 
ArrayListMultimap.create();
+        t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+        Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsAllAndPartial() {
+        // Prepare qualifiers
+        List<String> lineitemQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        List<String> ordersQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"orders_list_partition");
+
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+
+        tableUsedPartitionNameMap.put(lineitemQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+
+        RelationId ridA = new RelationId(1);
+        RelationId ridB = new RelationId(2);
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, 
ImmutableSet.of("p1", "p2")));
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, 
ImmutableSet.of("p3")));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), 
result.get(ordersQualifier));
+
+        BitSet filterRidA = new BitSet();
+        filterRidA.set(ridA.asInt());
+        Map<List<String>, Set<String>> resultRidA = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidA);
+        Assertions.assertNull(resultRidA.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2"), 
resultRidA.get(ordersQualifier));
+
+        BitSet filterRidB = new BitSet();
+        filterRidB.set(ridB.asInt());
+        Map<List<String>, Set<String>> resultRidB = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidB);
+        Assertions.assertNull(resultRidB.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p3"), 
resultRidB.get(ordersQualifier));
+
+        tableUsedPartitionNameMap.put(ordersQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+        Map<List<String>, Set<String>> resultAllOrders = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+        List<String> qualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+        // Put an empty set via a distinct relation id to simulate no 
partitions used
+        RelationId rid = new RelationId(3);
+        tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, 
ImmutableSet.of()));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+    }
+
+    private static MaterializationContext mockCtx(
+            PartitionType type,
+            List<BaseColInfo> pctInfos,
+            Set<MTMVRelatedTableIf> pctTables,
+            boolean externalNoPrune) throws AnalysisException {
+
+        MTMV mtmv = Mockito.mock(MTMV.class);
+        PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+        Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+        Mockito.when(pi.getType()).thenReturn(type);
+
+        MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+        Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+        Mockito.when(mpi.getPctInfos()).thenReturn(pctInfos);
+        Mockito.when(mpi.getPctTables()).thenReturn(pctTables);
+
+        if (externalNoPrune) {
+            HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+            
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+            Set<TableIf> tbls = new HashSet<>(pctTables);
+            tbls.add(ext);
+            Mockito.when(mpi.getPctTables()).thenReturn(
+                    
tbls.stream().map(MTMVRelatedTableIf.class::cast).collect(Collectors.toSet()));
+        }
+
+        AsyncMaterializationContext ctx = 
Mockito.mock(AsyncMaterializationContext.class);
+        Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+        return ctx;
+    }
+
+    private static BaseTableInfo newBaseTableInfo() {
+        CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+        Mockito.when(catalog.getId()).thenReturn(1L);
+        Mockito.when(catalog.getName()).thenReturn("internal");
+
+        DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+        Mockito.when(db.getId()).thenReturn(2L);
+        Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+        Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+        TableIf table = Mockito.mock(TableIf.class);
+        Mockito.when(table.getId()).thenReturn(3L);
+        Mockito.when(table.getName()).thenReturn("t");
+        Mockito.when(table.getDatabase()).thenReturn(db);
+
+        return new BaseTableInfo(table);
+    }
 }
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
--- 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
 
-    def explainOnePartition = sql """ explain  ${mvSql} """
-    logger.info("explainOnePartition: " + explainOnePartition.toString())
-    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_one_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_auto "SELECT * FROM ${mvName} "
 
-    def explainAllPartition = sql """ explain  ${mvSql}; """
-    logger.info("explainAllPartition: " + explainAllPartition.toString())
-    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_all_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to