This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a5972409d7e [Fix](mv) Fix ConcurrentModificationException in
PartitionCompensator (#61145)
a5972409d7e is described below
commit a5972409d7ec42b19ca759c2f319b96766245b8b
Author: Sahil Devgon <[email protected]>
AuthorDate: Mon Mar 16 23:08:51 2026 +0530
[Fix](mv) Fix ConcurrentModificationException in PartitionCompensator
(#61145)
### What problem does this PR solve?
This PR fixes the bug causing ConcurrentModificationException in
PartitionCompensator.
Co-authored-by: sahil-devgon <[email protected]>
---
.../rules/exploration/mv/PartitionCompensator.java | 19 ++--
.../exploration/mv/PartitionCompensatorTest.java | 123 +++++++++++++++++++++
2 files changed, 133 insertions(+), 9 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
index 3fe966864d0..4c26ab47cec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensator.java
@@ -139,19 +139,20 @@ public class PartitionCompensator {
.computeIfAbsent(baseTableNeedUnionTable.key(), k ->
new HashSet<>())
.addAll(baseTableNeedUnionTable.value());
}
- // merge all partition to delete or union
- Set<String> needRemovePartitionSet = new HashSet<>();
-
mvPartitionNeedRemoveNameMap.values().forEach(needRemovePartitionSet::addAll);
- mvPartitionNeedRemoveNameMap.replaceAll((k, v) ->
needRemovePartitionSet);
-
- // consider multi base table partition name not same, how to
handle it?
- Set<String> needUnionPartitionSet = new HashSet<>();
-
baseTablePartitionNeedUnionNameMap.values().forEach(needUnionPartitionSet::addAll);
- baseTablePartitionNeedUnionNameMap.replaceAll((k, v) ->
needUnionPartitionSet);
}
if (allCompensateIsNull) {
return null;
}
+ // merge all partition to delete or union
+ Set<String> needRemovePartitionSet = new HashSet<>();
+
mvPartitionNeedRemoveNameMap.values().forEach(needRemovePartitionSet::addAll);
+ mvPartitionNeedRemoveNameMap.replaceAll((k, v) ->
needRemovePartitionSet);
+
+ // consider multi base table partition name not same, how to handle it?
+ Set<String> needUnionPartitionSet = new HashSet<>();
+
baseTablePartitionNeedUnionNameMap.values().forEach(needUnionPartitionSet::addAll);
+ baseTablePartitionNeedUnionNameMap.replaceAll((k, v) ->
needUnionPartitionSet);
+
return Pair.of(mvPartitionNeedRemoveNameMap,
baseTablePartitionNeedUnionNameMap);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
index 17d75f93fcf..3c1b2bb9519 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/mv/PartitionCompensatorTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.MTMV;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
@@ -30,7 +31,9 @@ import org.apache.doris.mtmv.BaseColInfo;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
+import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.utframe.TestWithFeService;
@@ -41,10 +44,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.BitSet;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -373,6 +379,123 @@ public class PartitionCompensatorTest extends
TestWithFeService {
return ctx;
}
+ // Regression test for ConcurrentModificationException when merging
partition maps
+ // across multiple related tables. The bug was caused by calling
replaceAll() inside
+ // the for-loop after forEach() had already replaced map values with a
shared Set
+ // reference, causing a self-modification on the second iteration.
+ // This test calls calcInvalidPartitions() directly with two related
tables so that
+ // the exact code path containing the bug is exercised end-to-end.
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
testCalcInvalidPartitionsNoConcurrentModificationWithTwoRelatedTables()
+ throws Exception {
+ // Shared catalog/db for both related base tables
+ CatalogIf<?> baseCatalog = Mockito.mock(CatalogIf.class);
+ Mockito.when(baseCatalog.getName()).thenReturn("cat");
+ Mockito.when(baseCatalog.getId()).thenReturn(1L);
+ DatabaseIf<?> baseDb = Mockito.mock(DatabaseIf.class);
+ Mockito.when(baseDb.getFullName()).thenReturn("db");
+ Mockito.when(baseDb.getId()).thenReturn(2L);
+ Mockito.when(baseDb.getCatalog()).thenReturn(baseCatalog);
+
+ MTMVRelatedTableIf relatedTable1 = mockRelatedTableIf(
+ "t1", 10L, ImmutableList.of("cat", "db", "t1"), baseDb);
+ MTMVRelatedTableIf relatedTable2 = mockRelatedTableIf(
+ "t2", 20L, ImmutableList.of("cat", "db", "t2"), baseDb);
+
+ // Two MV valid partitions: mv_p1 maps to t1_p1, mv_p2 maps to t2_p2
+ Partition mvP1 = Mockito.mock(Partition.class);
+ Mockito.when(mvP1.getId()).thenReturn(101L);
+ Mockito.when(mvP1.getName()).thenReturn("mv_p1");
+ Partition mvP2 = Mockito.mock(Partition.class);
+ Mockito.when(mvP2.getId()).thenReturn(102L);
+ Mockito.when(mvP2.getName()).thenReturn("mv_p2");
+
+ Map<String, Set<String>> mappingForTable1 = new HashMap<>();
+ mappingForTable1.put("mv_p1", ImmutableSet.of("t1_p1"));
+ Map<String, Set<String>> mappingForTable2 = new HashMap<>();
+ mappingForTable2.put("mv_p2", ImmutableSet.of("t2_p2"));
+ Map<MTMVRelatedTableIf, Map<String, Set<String>>> partitionMappings =
new HashMap<>();
+ partitionMappings.put(relatedTable1, mappingForTable1);
+ partitionMappings.put(relatedTable2, mappingForTable2);
+
+ BaseColInfo colInfo1 = new BaseColInfo("date_col", new
BaseTableInfo(relatedTable1));
+ BaseColInfo colInfo2 = new BaseColInfo("date_col", new
BaseTableInfo(relatedTable2));
+
+ // Separate catalog/db for the MV itself
+ CatalogIf<?> mvCatalog = Mockito.mock(CatalogIf.class);
+ Mockito.when(mvCatalog.getName()).thenReturn("internal");
+ Mockito.when(mvCatalog.getId()).thenReturn(1L);
+ DatabaseIf<?> mvDb = Mockito.mock(DatabaseIf.class);
+ Mockito.when(mvDb.getFullName()).thenReturn("mv_db");
+ Mockito.when(mvDb.getId()).thenReturn(3L);
+ Mockito.when(mvDb.getCatalog()).thenReturn(mvCatalog);
+
+ MTMV mtmv = Mockito.mock(MTMV.class);
+ Mockito.when(mtmv.getName()).thenReturn("mv1");
+ Mockito.when(mtmv.getId()).thenReturn(100L);
+ Mockito.when(mtmv.getDatabase()).thenReturn(mvDb);
+ PartitionInfo mvPartitionInfo = Mockito.mock(PartitionInfo.class);
+ Mockito.when(mtmv.getPartitionInfo()).thenReturn(mvPartitionInfo);
+
Mockito.when(mvPartitionInfo.getType()).thenReturn(PartitionType.RANGE);
+ MTMVPartitionInfo mvPctInfo = Mockito.mock(MTMVPartitionInfo.class);
+ Mockito.when(mtmv.getMvPartitionInfo()).thenReturn(mvPctInfo);
+
Mockito.when(mvPctInfo.getPctTables()).thenReturn(ImmutableSet.of(relatedTable1,
relatedTable2));
+
Mockito.when(mvPctInfo.getPctInfos()).thenReturn(ImmutableList.of(colInfo1,
colInfo2));
+ // All MV partitions contain data
+
Mockito.when(mtmv.selectNonEmptyPartitionIds(ArgumentMatchers.any())).thenReturn(ImmutableList.of(1L));
+
+ AsyncMaterializationContext matCtx =
Mockito.mock(AsyncMaterializationContext.class);
+ Mockito.when(matCtx.getMtmv()).thenReturn(mtmv);
+
Mockito.when(matCtx.calculatePartitionMappings()).thenReturn(partitionMappings);
+
+ // StatementContext: the MV's two valid partitions are available for
rewrite
+ Map<BaseTableInfo, Collection<Partition>> canRewriteMap = new
HashMap<>();
+ canRewriteMap.put(new BaseTableInfo(mtmv), ImmutableList.of(mvP1,
mvP2));
+ StatementContext stmtCtx = Mockito.mock(StatementContext.class);
+
Mockito.when(stmtCtx.getMvCanRewritePartitionsMap()).thenReturn(canRewriteMap);
+
+ CascadesContext cascadesCtx = Mockito.mock(CascadesContext.class);
+ Mockito.when(cascadesCtx.getStatementContext()).thenReturn(stmtCtx);
+
+ // Rewritten plan has no MV scans, so mvNeedRemovePartitionNameSet
stays empty
+ Plan rewrittenPlan = Mockito.mock(Plan.class);
+
Mockito.when(rewrittenPlan.collectToList(ArgumentMatchers.any())).thenReturn(Collections.emptyList());
+
+ // Each table contributes one covered and one uncovered partition:
+ // t1 uses {t1_p1 (covered by mv_p1), t1_p2 (not covered)}
+ // t2 uses {t2_p1 (not covered), t2_p2 (covered by mv_p2)}
+ Map<List<String>, Set<String>> queryUsedPartitions = new HashMap<>();
+ queryUsedPartitions.put(ImmutableList.of("cat", "db", "t1"),
+ ImmutableSet.of("t1_p1", "t1_p2"));
+ queryUsedPartitions.put(ImmutableList.of("cat", "db", "t2"),
+ ImmutableSet.of("t2_p1", "t2_p2"));
+
+ // Must not throw ConcurrentModificationException when two related
tables each
+ // contribute entries that require the post-loop merge in
calcInvalidPartitions()
+ Pair<Map<BaseTableInfo, Set<String>>, Map<BaseColInfo, Set<String>>>
result =
+ Assertions.assertDoesNotThrow(() ->
+ PartitionCompensator.calcInvalidPartitions(
+ queryUsedPartitions, rewrittenPlan, matCtx,
cascadesCtx));
+
+ // The uncovered partitions from both tables should be merged into one
unified set
+ Assertions.assertNotNull(result);
+ Set<String> expectedUnion = ImmutableSet.of("t1_p2", "t2_p1");
+ result.value().values()
+ .forEach(v -> Assertions.assertEquals(expectedUnion, v));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static MTMVRelatedTableIf mockRelatedTableIf(
+ String tableName, long tableId, List<String> qualifiers,
DatabaseIf<?> db) {
+ MTMVRelatedTableIf table = Mockito.mock(MTMVRelatedTableIf.class);
+ Mockito.when(table.getName()).thenReturn(tableName);
+ Mockito.when(table.getId()).thenReturn(tableId);
+ Mockito.when(table.getDatabase()).thenReturn(db);
+ Mockito.when(table.getFullQualifiers()).thenReturn(qualifiers);
+ return table;
+ }
+
private static BaseTableInfo newBaseTableInfo() {
CatalogIf<?> catalog = Mockito.mock(CatalogIf.class);
Mockito.when(catalog.getId()).thenReturn(1L);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]