This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a9361eae7f7 branch-3.1: [fix](mtmv) Fix hudi materialized view union 
all rewritten plan execute fail because of invalid slot #58643 (#58724)
a9361eae7f7 is described below

commit a9361eae7f7f9dcc11dd26c6fa5626aa8553939f
Author: seawinde <[email protected]>
AuthorDate: Fri Dec 5 17:59:46 2025 +0800

    branch-3.1: [fix](mtmv) Fix hudi materialized view union all rewritten plan 
execute fail because of invalid slot #58643 (#58724)
    
    picked from #58643
---
 .../mv/AbstractMaterializedViewRule.java           |   4 +-
 .../rules/exploration/mv/PartitionCompensator.java |  42 +++++-
 .../exploration/mv/PartitionCompensatorTest.java   | 159 +++++++++++++++++++++
 .../hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy   |   8 +-
 4 files changed, 200 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 3a6f08f8cc8..c6c44b9fb4a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -198,7 +198,7 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
      * only one materialization every time. Different query pattern should 
override the sub logic.
      */
     protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext 
cascadesContext,
-            MaterializationContext materializationContext) {
+            MaterializationContext materializationContext) throws 
AnalysisException {
         List<Plan> rewriteResults = new ArrayList<>();
         StructInfo viewStructInfo = materializationContext.getStructInfo();
         MatchMode matchMode = decideMatchMode(queryStructInfo.getRelations(), 
viewStructInfo.getRelations());
@@ -295,7 +295,7 @@ public abstract class AbstractMaterializedViewRule 
implements ExplorationRuleFac
                 continue;
             }
             Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, 
Set<String>>> invalidPartitions;
-            if (PartitionCompensator.needUnionRewrite(materializationContext)
+            if (PartitionCompensator.needUnionRewrite(materializationContext, 
cascadesContext.getStatementContext())
                     && sessionVariable.isEnableMaterializedViewUnionRewrite()) 
{
                 MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
                 BaseTableInfo relatedTableInfo = 
mtmv.getMvPartitionInfo().getRelatedTableInfo();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 3c086de0919..f17d5364ff0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -23,8 +23,10 @@ import org.apache.doris.catalog.PartitionInfo;
 import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.ExternalTable;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -151,26 +153,49 @@ public class PartitionCompensator {
 
     /**
      * Check if need union compensate or not
+     * If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST, should not do union compensate
+     * because it means query all partitions from base table and prune 
partition failed
      */
-    public static boolean needUnionRewrite(MaterializationContext 
materializationContext) {
+    public static boolean needUnionRewrite(MaterializationContext 
materializationContext,
+                                           StatementContext statementContext) 
throws AnalysisException {
         if (!(materializationContext instanceof AsyncMaterializationContext)) {
             return false;
         }
         MTMV mtmv = ((AsyncMaterializationContext) 
materializationContext).getMtmv();
-        PartitionType type = mtmv.getPartitionInfo().getType();
         BaseTableInfo relatedTableInfo = 
mtmv.getMvPartitionInfo().getRelatedTableInfo();
-        return !PartitionType.UNPARTITIONED.equals(type) && relatedTableInfo 
!= null;
+        if (relatedTableInfo == null) {
+            return false;
+        }
+        if 
(PartitionType.UNPARTITIONED.equals(mtmv.getPartitionInfo().getType())) {
+            return false;
+        }
+        MTMVRelatedTableIf pctTable = 
mtmv.getMvPartitionInfo().getRelatedTable();
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap =
+                statementContext.getTableUsedPartitionNameMap();
+        if (pctTable instanceof ExternalTable && !((ExternalTable) 
pctTable).supportInternalPartitionPruned()) {
+            // if pct table is external table and not support internal 
partition pruned,
+            // we consider query all partitions from pct table, this would 
cause loop union compensate,
+            // so we skip union compensate in this case
+            return false;
+        }
+        Collection<Pair<RelationId, Set<String>>> tableUsedPartitions
+                = tableUsedPartitionNameMap.get(pctTable.getFullQualifiers());
+        // If query base table all partitions with ALL_PARTITIONS or 
ALL_PARTITIONS_LIST,
+        // should not do union compensate, because it means query all 
partitions from base table
+        // and prune partition failed
+        return !ALL_PARTITIONS_LIST.equals(tableUsedPartitions)
+                && 
tableUsedPartitions.stream().noneMatch(ALL_PARTITIONS::equals);
     }
 
     /**
      * Get query used partitions
      * this is calculated from tableUsedPartitionNameMap and tables in 
statementContext
      *
-     * @param customRelationIdSet if union compensate occurs, the new query 
used partitions is changed,
+     * @param currentUsedRelationIdSet if union compensate occurs, the new 
query used partitions is changed,
      *         so need to get used partitions by relation id set
      */
     public static Map<List<String>, Set<String>> 
getQueryUsedPartitions(StatementContext statementContext,
-            BitSet customRelationIdSet) {
+            BitSet currentUsedRelationIdSet) {
         // get table used partitions
         // if table is not in statementContext().getTables() which means the 
table is partition prune as empty relation
         Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap = statementContext
@@ -195,6 +220,13 @@ public class PartitionCompensator {
                     queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
null);
                     continue tableLoop;
                 }
+                // If currentUsedRelationIdSet is not empty, need check 
relation id to get concrete used partitions
+                BitSet usedPartitionRelation = new BitSet();
+                
usedPartitionRelation.set(tableUsedPartitionPair.key().asInt());
+                if (!currentUsedRelationIdSet.isEmpty()
+                        && 
!currentUsedRelationIdSet.intersects(usedPartitionRelation)) {
+                    continue;
+                }
                 usedPartitionSet.addAll(tableUsedPartitionPair.value());
             }
             queryUsedRelatedTablePartitionsMap.put(queryUsedTable, 
usedPartitionSet);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 25c0a679d8d..5efdff2b12e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -17,16 +17,30 @@
 
 package org.apache.doris.nereids.rules.exploration.mv;
 
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.hive.HMSExternalTable;
+import org.apache.doris.mtmv.BaseTableInfo;
+import org.apache.doris.mtmv.MTMVPartitionInfo;
+import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.util.PlanChecker;
 import org.apache.doris.utframe.TestWithFeService;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Multimap;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.util.BitSet;
 import java.util.List;
@@ -190,4 +204,149 @@ public class PartitionCompensatorTest extends 
TestWithFeService {
                             Assertions.assertEquals(orderTableUsedPartition, 
ImmutableSet.of("p1", "p2", "p3", "p4"));
                         });
     }
+
+    @Test
+    public void testNeedUnionRewriteExternalNoPrune() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                newBaseTableInfo(),
+                Mockito.mock(MTMVRelatedTableIf.class),
+                true);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNeedUnionRewritePositive() throws Exception {
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                newBaseTableInfo(),
+                Mockito.mock(MTMVRelatedTableIf.class),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+        
Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(ArrayListMultimap.create());
+        Assertions.assertTrue(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testNotNeedUnionRewriteWhenAllPartitions() throws Exception {
+        BaseTableInfo tableInfo = newBaseTableInfo();
+        MaterializationContext ctx = mockCtx(
+                PartitionType.LIST,
+                tableInfo,
+                Mockito.mock(MTMVRelatedTableIf.class),
+                false);
+        StatementContext sc = Mockito.mock(StatementContext.class);
+
+        ArrayListMultimap<List<String>, Pair<RelationId, Set<String>>> t = 
ArrayListMultimap.create();
+        t.put(ImmutableList.of(), PartitionCompensator.ALL_PARTITIONS);
+        Mockito.when(sc.getTableUsedPartitionNameMap()).thenReturn(t);
+        Assertions.assertFalse(PartitionCompensator.needUnionRewrite(ctx, sc));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsAllAndPartial() {
+        // Prepare qualifiers
+        List<String> lineitemQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        List<String> ordersQualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"orders_list_partition");
+
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+
+        tableUsedPartitionNameMap.put(lineitemQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+
+        RelationId ridA = new RelationId(1);
+        RelationId ridB = new RelationId(2);
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridA, 
ImmutableSet.of("p1", "p2")));
+        tableUsedPartitionNameMap.put(ordersQualifier, Pair.of(ridB, 
ImmutableSet.of("p3")));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(result.get(lineitemQualifier)); // all partitions
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2", "p3"), 
result.get(ordersQualifier));
+
+        BitSet filterRidA = new BitSet();
+        filterRidA.set(ridA.asInt());
+        Map<List<String>, Set<String>> resultRidA = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidA);
+        Assertions.assertNull(resultRidA.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p1", "p2"), 
resultRidA.get(ordersQualifier));
+
+        BitSet filterRidB = new BitSet();
+        filterRidB.set(ridB.asInt());
+        Map<List<String>, Set<String>> resultRidB = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), filterRidB);
+        Assertions.assertNull(resultRidB.get(lineitemQualifier));
+        Assertions.assertEquals(ImmutableSet.of("p3"), 
resultRidB.get(ordersQualifier));
+
+        tableUsedPartitionNameMap.put(ordersQualifier, 
PartitionCompensator.ALL_PARTITIONS);
+        Map<List<String>, Set<String>> resultAllOrders = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertNull(resultAllOrders.get(ordersQualifier));
+    }
+
+    @Test
+    public void testGetQueryUsedPartitionsEmptyCollectionMeansNoPartitions() {
+        List<String> qualifier = ImmutableList.of(
+                "internal", "partition_compensate_test", 
"lineitem_list_partition");
+        Multimap<List<String>, Pair<RelationId, Set<String>>> 
tableUsedPartitionNameMap
+                = 
connectContext.getStatementContext().getTableUsedPartitionNameMap();
+        tableUsedPartitionNameMap.clear();
+        // Put an empty set via a distinct relation id to simulate no 
partitions used
+        RelationId rid = new RelationId(3);
+        tableUsedPartitionNameMap.put(qualifier, Pair.of(rid, 
ImmutableSet.of()));
+
+        Map<List<String>, Set<String>> result = 
PartitionCompensator.getQueryUsedPartitions(
+                connectContext.getStatementContext(), new BitSet());
+        Assertions.assertEquals(ImmutableSet.of(), result.get(qualifier));
+    }
+
+    private static MaterializationContext mockCtx(
+            PartitionType type,
+            BaseTableInfo pctInfo,
+            MTMVRelatedTableIf pctTable,
+            boolean externalNoPrune) throws AnalysisException {
+
+        MTMV mtmv = Mockito.mock(MTMV.class);
+        PartitionInfo pi = Mockito.mock(PartitionInfo.class);
+        Mockito.when(mtmv.getPartitionInfo()).thenReturn(pi);
+        Mockito.when(pi.getType()).thenReturn(type);
+
+        MTMVPartitionInfo mpi = Mockito.mock(MTMVPartitionInfo.class);
+        Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mpi);
+        Mockito.when(mpi.getRelatedTableInfo()).thenReturn(pctInfo);
+        Mockito.when(mpi.getRelatedTable()).thenReturn(pctTable);
+
+        if (externalNoPrune) {
+            HMSExternalTable ext = Mockito.mock(HMSExternalTable.class);
+            
Mockito.when(ext.supportInternalPartitionPruned()).thenReturn(false);
+            Mockito.when(mpi.getRelatedTable()).thenReturn(ext);
+        }
+
+        AsyncMaterializationContext ctx = 
Mockito.mock(AsyncMaterializationContext.class);
+        Mockito.when(ctx.getMtmv()).thenReturn(mtmv);
+        return ctx;
+    }
+
+    private static BaseTableInfo newBaseTableInfo() {
+        CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
+        Mockito.when(catalog.getId()).thenReturn(1L);
+        Mockito.when(catalog.getName()).thenReturn("internal");
+
+        DatabaseIf<?> db = Mockito.mock(DatabaseIf.class);
+        Mockito.when(db.getId()).thenReturn(2L);
+        Mockito.when(db.getFullName()).thenReturn("partition_compensate_test");
+        Mockito.when(db.getCatalog()).thenReturn(catalog);
+
+        TableIf table = Mockito.mock(TableIf.class);
+        Mockito.when(table.getId()).thenReturn(3L);
+        Mockito.when(table.getName()).thenReturn("t");
+        Mockito.when(table.getDatabase()).thenReturn(db);
+
+        return new BaseTableInfo(table);
+    }
 }
diff --git 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
index 70a11d3633a..680f7eaa93d 100644
--- 
a/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
+++ 
b/regression-test/suites/external_table_p2/hudi/hudi_mtmv/test_hudi_rewrite_mtmv.groovy
@@ -62,9 +62,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
 
-    def explainOnePartition = sql """ explain  ${mvSql} """
-    logger.info("explainOnePartition: " + explainOnePartition.toString())
-    assertTrue(explainOnePartition.toString().contains("VUNION"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_one_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")
@@ -79,9 +77,7 @@ suite("test_hudi_rewrite_mtmv", 
"p2,external,hudi,external_remote,external_remot
     waitingMTMVTaskFinishedByMvName(mvName)
     order_qt_refresh_auto "SELECT * FROM ${mvName} "
 
-    def explainAllPartition = sql """ explain  ${mvSql}; """
-    logger.info("explainAllPartition: " + explainAllPartition.toString())
-    assertTrue(explainAllPartition.toString().contains("VOlapScanNode"))
+    mv_rewrite_success(mvSql, mvName)
     order_qt_refresh_all_partition_rewrite "${mvSql}"
 
     mv_rewrite_success("${mvSql}", "${mvName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to