This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 03c8cae0dbc branch-4.0: (cloud) Hold table write lock across
first-time dynamic partition setup to prevent CREATE MV race #62755 (#62862)
03c8cae0dbc is described below
commit 03c8cae0dbc336cc2ae4881602a85b68fe01a818
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 7 17:13:25 2026 +0800
branch-4.0: (cloud) Hold table write lock across first-time dynamic
partition setup to prevent CREATE MV race #62755 (#62862)
Cherry-picked from #62755
Co-authored-by: deardeng <[email protected]>
---
.../apache/doris/datasource/InternalCatalog.java | 104 +++++++++------
.../test_create_table_and_create_mv_race.groovy | 143 +++++++++++++++++++++
2 files changed, 208 insertions(+), 39 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 02f9ab3c672..82ac3d992d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -3188,6 +3188,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
Pair<Boolean, Boolean> result;
+ boolean holdTableLock = false;
db.writeLockOrDdlException();
try {
// db name not changed
@@ -3196,51 +3197,76 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
// register table, write create table edit log
result = db.createTableWithoutLock(olapTable, false,
createTableInfo.isIfNotExists());
+ if (!result.second) {
+ olapTable.writeLock();
+ holdTableLock = true;
+ }
} finally {
db.writeUnlock();
}
- if (!result.first) {
-
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
- }
+ try {
+ if (!result.first) {
+
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
+ }
- if (result.second) { // table already exists
- if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
- // if this is a colocate table, its table id is already
added to colocate group
- // so we should remove the tableId here
- Env.getCurrentColocateIndex().removeTable(tableId);
+ if (result.second) { // table already exists
+ if
(Env.getCurrentColocateIndex().isColocateTable(tableId)) {
+ // if this is a colocate table, its table id is
already added to colocate group
+ // so we should remove the tableId here
+ Env.getCurrentColocateIndex().removeTable(tableId);
+ }
+ for (Long tabletId : tabletIdSet) {
+ Env.getCurrentInvertedIndex().deleteTablet(tabletId);
+ }
+ LOG.info("duplicate create table[{};{}] in db[{};{}], skip
next steps",
+ tableName, tableId, db.getName(), db.getId());
+ } else {
+ // if table not exists, then db.createTableWithLock will
write an editlog.
+ hadLogEditCreateTable = true;
+
+ // we have added these index to memory, only need to
persist here
+ if
(Env.getCurrentColocateIndex().isColocateTable(tableId)) {
+ GroupId groupId =
Env.getCurrentColocateIndex().getGroup(tableId);
+ Map<Tag, List<List<Long>>> backendsPerBucketSeq =
Env.getCurrentColocateIndex()
+ .getBackendsPerBucketSeq(groupId);
+ ColocatePersistInfo info =
ColocatePersistInfo.createForAddTable(groupId, tableId,
+ backendsPerBucketSeq);
+
Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
+ }
+ LOG.info("successfully create table[{};{}] in db[{};{}]",
+ tableName, tableId, db.getName(), db.getId());
+
+ if
(DebugPointUtil.isEnable("FE.createOlapTable.beforeFirstTimeDynamicPartition"))
{
+ long sleepMs = DebugPointUtil.getDebugParamOrDefault(
+
"FE.createOlapTable.beforeFirstTimeDynamicPartition", "sleepMs", 0L);
+ if (sleepMs > 0) {
+ LOG.info("debug point
FE.createOlapTable.beforeFirstTimeDynamicPartition, sleep {}ms",
+ sleepMs);
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ Env.getCurrentEnv().getDynamicPartitionScheduler()
+ .executeDynamicPartitionFirstTime(db.getId(),
olapTable.getId());
+ // register or remove table from DynamicPartition after
table created
+
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(),
olapTable, false);
+ Env.getCurrentEnv().getDynamicPartitionScheduler()
+ .createOrUpdateRuntimeInfo(tableId,
DynamicPartitionScheduler.LAST_UPDATE_TIME,
+ TimeUtils.getCurrentFormatTime());
}
- for (Long tabletId : tabletIdSet) {
- Env.getCurrentInvertedIndex().deleteTablet(tabletId);
+
+ if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
+ LOG.info("debug point FE.createOlapTable.exception, throw
e");
+ throw new DdlException("debug point
FE.createOlapTable.exception");
+ }
+ } finally {
+ if (holdTableLock) {
+ olapTable.writeUnlock();
}
- LOG.info("duplicate create table[{};{}] in db[{};{}], skip
next steps",
- tableName, tableId, db.getName(), db.getId());
- } else {
- // if table not exists, then db.createTableWithLock will write
an editlog.
- hadLogEditCreateTable = true;
-
- // we have added these index to memory, only need to persist
here
- if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
- GroupId groupId =
Env.getCurrentColocateIndex().getGroup(tableId);
- Map<Tag, List<List<Long>>> backendsPerBucketSeq =
Env.getCurrentColocateIndex()
- .getBackendsPerBucketSeq(groupId);
- ColocatePersistInfo info =
ColocatePersistInfo.createForAddTable(groupId, tableId,
- backendsPerBucketSeq);
- Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
- }
- LOG.info("successfully create table[{};{}] in db[{};{}]",
- tableName, tableId, db.getName(), db.getId());
- Env.getCurrentEnv().getDynamicPartitionScheduler()
- .executeDynamicPartitionFirstTime(db.getId(),
olapTable.getId());
- // register or remove table from DynamicPartition after table
created
-
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(),
olapTable, false);
- Env.getCurrentEnv().getDynamicPartitionScheduler()
- .createOrUpdateRuntimeInfo(tableId,
DynamicPartitionScheduler.LAST_UPDATE_TIME,
- TimeUtils.getCurrentFormatTime());
- }
-
- if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
- LOG.info("debug point FE.createOlapTable.exception, throw e");
- throw new DdlException("debug point
FE.createOlapTable.exception");
}
} catch (DdlException e) {
LOG.warn("create table failed {} - {}", tabletIdSet,
e.getMessage());
diff --git
a/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
b/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
new file mode 100644
index 00000000000..47c71f0c1c2
--- /dev/null
+++
b/regression-test/suites/cloud_p0/partition/test_create_table_and_create_mv_race.groovy
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+
+// Regression test for the cloud-specific race where a concurrent
+// CREATE MATERIALIZED VIEW slips in after OP_CREATE_TABLE is journaled
+// but before first-time dynamic partition entries are journaled, causing
+// the rollup job to reference partitions that never appear in the journal.
+// Replay later NPEs at RollupJobV2.addTabletToInvertedIndex.
+//
+// The fix holds olapTable.writeLock across the whole first-time dynamic
+// partition setup so CREATE MV must wait until the table is fully built.
+suite("test_create_table_and_create_mv_race", 'p0, docker') {
+ if (!isCloudMode()) {
+ return
+ }
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ options.setFeNum(3)
+ options.feConfigs.add('sys_log_verbose_modules=org')
+ options.setBeNum(1)
+ options.cloudMode = true
+
+ docker(options) {
+ sql """set enable_sql_cache=false"""
+
+ def tbl = 'test_create_table_and_create_mv_race_tbl'
+ def mvName = 'test_create_table_and_create_mv_race_mv'
+
+ sql "DROP TABLE IF EXISTS ${tbl}"
+
+ // Widen the race window: slow down first-time dynamic partition
+ // so the concurrent CREATE MV has time to arrive.
+ // With the fix, CREATE MV will block on olapTable.writeLock() and
+ // only run after the table is fully built.
+ //
+ // params serialize to HTTP query strings on the wire, so values
+ // must be String-convertible (Groovy handles the coercion).
+ cluster.injectDebugPoints(NodeType.FE, [
+ 'FE.createOlapTable.beforeFirstTimeDynamicPartition':
[sleepMs: '10000']
+ ])
+
+ try {
+ def createDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)
+ def mvDoneAt = new java.util.concurrent.atomic.AtomicLong(0L)
+
+ def createFuture = thread('create-table') {
+ sql """
+ CREATE TABLE ${tbl} (
+ user_id BIGINT,
+ create_dt datetime,
+ amount BIGINT
+ )
+ DUPLICATE KEY(user_id, create_dt)
+ PARTITION BY RANGE(create_dt) ()
+ DISTRIBUTED BY HASH(user_id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-2",
+ "dynamic_partition.end" = "2",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.create_history_partition" = "true"
+ )
+ """
+ createDoneAt.set(System.currentTimeMillis())
+ }
+
+ // Give CREATE TABLE time to reach the injected sleep.
+ sleep(2000)
+
+ def mvFuture = thread('create-mv') {
+ // If CREATE MV fires before the table exists in the
namespace, retry a few times.
+ def attempts = 30
+ while (attempts-- > 0) {
+ try {
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName} AS
+ SELECT user_id AS mv_user_id, sum(amount) AS
mv_sum_amount
+ FROM ${tbl}
+ GROUP BY user_id
+ """
+ break
+ } catch (Exception e) {
+ def msg = e.getMessage() ?: ''
+ if (msg.contains("Unknown table") ||
msg.contains("does not exist")) {
+ sleep(200)
+ continue
+ }
+ throw e
+ }
+ }
+ mvDoneAt.set(System.currentTimeMillis())
+ }
+
+ createFuture.get()
+ mvFuture.get()
+
+ // Correctness invariant: CREATE MV must have finished AFTER
CREATE TABLE.
+ // If the lock is missing, CREATE MV would return first (it's
cheap) while
+ // CREATE TABLE is still inside the injected 10s sleep.
+ def ct = createDoneAt.get()
+ def mt = mvDoneAt.get()
+ assert ct > 0 && mt > 0, "both futures should have completed"
+ assert mt >= ct, "CREATE MV (${mt}) must finish after CREATE TABLE
(${ct})," +
+ " otherwise the lock fix is missing and MV raced ahead of
first-time dynamic partition setup"
+ } finally {
+ cluster.clearFrontendDebugPoints()
+ }
+
+ cluster.checkFeIsAlive(1, true)
+
+ // Verify the table has all dynamic partitions.
+ // dynamic_partition.start=-2, end=2, create_history_partition=true →
5 partitions expected
+ def partitions = sql "SHOW PARTITIONS FROM ${tbl}"
+ assert partitions.size() >= 3,
+ "dynamic_partition.start=-2/end=2 should have produced at
least 3 partitions, got ${partitions.size()}"
+
+ // Sanity: the MV exists and an end-to-end insert works.
+ def now = new Date()
+ def dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+ def today = dateFormat.format(now)
+ sql "INSERT INTO ${tbl} VALUES (1, '${today} 12:00:00', 100)"
+ def cnt = sql "SELECT count(*) FROM ${tbl}"
+ assert cnt[0][0] == 1L, "expected 1 row, got ${cnt[0][0]}"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]