This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 597b047cf1b branch-4.0: [fix](cloud)Fix `create dynamic table` race
with `insert overwrite` #59489 (#59884)
597b047cf1b is described below
commit 597b047cf1be58df17255713c5514d559a230e50
Author: deardeng <[email protected]>
AuthorDate: Sat Jan 17 18:48:35 2026 +0800
branch-4.0: [fix](cloud)Fix `create dynamic table` race with `insert
overwrite` #59489 (#59884)
cherry pick from #59489
---
.../main/java/org/apache/doris/alter/Alter.java | 2 +-
.../main/java/org/apache/doris/catalog/Env.java | 13 +--
.../doris/clone/DynamicPartitionScheduler.java | 74 +++++++++++------
.../apache/doris/datasource/InternalCatalog.java | 29 +++----
.../org/apache/doris/mtmv/MTMVPartitionUtil.java | 2 +-
.../java/org/apache/doris/persist/EditLog.java | 22 +++++-
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../different_serialize.groovy} | 2 +-
...eate_partition_and_insert_overwrite_race.groovy | 92 ++++++++++++++++++++++
9 files changed, 189 insertions(+), 49 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 1c3f94cce68..dafb0cc4f33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -688,7 +688,7 @@ public class Alter {
DynamicPartitionUtil.checkAlterAllowed(
(OlapTable) db.getTableOrMetaException(tableName,
TableType.OLAP));
}
- Env.getCurrentEnv().addPartition(db, tableName,
(AddPartitionClause) alterClause, false, 0, true);
+ Env.getCurrentEnv().addPartition(db, tableName,
(AddPartitionClause) alterClause, false, 0, true, null);
} else if (alterClause instanceof AddPartitionLikeClause) {
if (!((AddPartitionLikeClause)
alterClause).getIsTempPartition()) {
DynamicPartitionUtil.checkAlterAllowed(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 191b808d20f..67a004e45cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3489,14 +3489,17 @@ public class Env {
* @param isCreateTable this call is for creating table
* @param generatedPartitionId the preset partition id for the partition
to add
* @param writeEditLog whether to write an edit log for this addition
- * @return PartitionPersistInfo to be written to editlog. It may be null
if no partitions added.
+ * @batchPartitions output parameter, used to batch write edit log outside
this function, can be null.
+ * first is editlog PartitionPersistInfo, second is the added Partition
* @throws DdlException
*/
- public PartitionPersistInfo addPartition(Database db, String tableName,
AddPartitionClause addPartitionClause,
+ public void addPartition(Database db, String tableName, AddPartitionClause
addPartitionClause,
boolean isCreateTable, long
generatedPartitionId,
- boolean writeEditLog) throws
DdlException {
- return getInternalCatalog().addPartition(db, tableName,
addPartitionClause,
- isCreateTable, generatedPartitionId, writeEditLog);
+ boolean writeEditLog,
+ List<Pair<PartitionPersistInfo,
Partition>> batchPartitions)
+ throws DdlException {
+ getInternalCatalog().addPartition(db, tableName, addPartitionClause,
+ isCreateTable, generatedPartitionId, writeEditLog,
batchPartitions);
}
public void addMultiPartitions(Database db, String tableName,
AlterMultiPartitionClause multiPartitionClause)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index de63ce1745c..3e0aa29c1c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -776,7 +776,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
cloudBatchBeforeCreatePartitions(executeFirstTime,
addPartitionClauses, olapTable, indexIds,
db, tableName, generatedPartitionIds);
- List<PartitionPersistInfo> partsInfo = new ArrayList<>();
+ List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo =
new ArrayList<>();
for (int i = 0; i < addPartitionClauses.size(); i++) {
try {
boolean needWriteEditLog = true;
@@ -785,15 +785,10 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
if (Config.isCloudMode()) {
needWriteEditLog = !executeFirstTime;
}
- PartitionPersistInfo info =
- Env.getCurrentEnv().addPartition(db,
tableName, addPartitionClauses.get(i),
+ Env.getCurrentEnv().addPartition(db, tableName,
addPartitionClauses.get(i),
executeFirstTime,
executeFirstTime && Config.isCloudMode() ?
generatedPartitionIds.get(i) : 0,
- needWriteEditLog);
- if (info == null) {
- throw new Exception("null persisted partition
returned");
- }
- partsInfo.add(info);
+ needWriteEditLog, batchPartsInfo);
clearCreatePartitionFailedMsg(olapTable.getId());
} catch (Exception e) {
recordCreatePartitionFailedMsg(db.getFullName(),
tableName, e.getMessage(), olapTable.getId());
@@ -804,7 +799,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
}
}
}
- cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
+ cloudBatchAfterCreatePartitions(executeFirstTime,
batchPartsInfo,
addPartitionClauses, db, olapTable, indexIds,
tableName);
// ATTN: Breaking up dynamic partition table scheduling,
consuming peak CPU consumption
@@ -824,15 +819,16 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
}
}
- private void cloudBatchAfterCreatePartitions(boolean executeFirstTime,
List<PartitionPersistInfo> partsInfo,
-
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
- OlapTable olapTable,
List<Long> indexIds,
- String tableName)
throws DdlException {
+ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime,
+
List<Pair<PartitionPersistInfo, Partition>> batchPartsInfo,
+ ArrayList<AddPartitionClause>
addPartitionClauses, Database db,
+ OlapTable olapTable,
List<Long> indexIds,
+ String tableName) throws
DdlException {
if (Config.isNotCloudMode()) {
return;
}
- List<Long> succeedPartitionIds =
partsInfo.stream().map(partitionPersistInfo
- ->
partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
+ List<Long> succeedPartitionIds =
batchPartsInfo.stream().map(partitionInfo
+ ->
partitionInfo.first.getPartition().getId()).collect(Collectors.toList());
if (!executeFirstTime || addPartitionClauses.isEmpty()) {
LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime,
addPartitionClauses.size());
return;
@@ -849,7 +845,7 @@ public class DynamicPartitionScheduler extends MasterDaemon
{
succeedPartitionIds, indexIds, true /* isCreateTable */,
false /* isBatchCommit */);
LOG.info("begin write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
- partsInfo.size(), db.getFullName(), tableName,
olapTable.getId());
+ batchPartsInfo.size(), db.getFullName(), tableName,
olapTable.getId());
// ATTN: here, edit log must after commit cloud partition,
// prevent commit RPC failure from causing data loss
if
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions"))
{
@@ -857,20 +853,48 @@ public class DynamicPartitionScheduler extends
MasterDaemon {
// committed, but not log edit
throw new Exception("debug point
FE.DynamicPartitionScheduler.before.commitCloudPartition");
}
- for (int i = 0; i < partsInfo.size(); i++) {
-
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
- if
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
- if (i == partsInfo.size() / 2) {
- LOG.info("debug point
FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
- // committed, but log some edit, others failed
- throw new Exception("debug point
FE.DynamicPartitionScheduler"
- + ".in.commitCloudPartition");
+
+ for (int i = 0; i < batchPartsInfo.size(); i++) {
+ // get table write lock to add partition, edit log and modify
table state must be atomic
+ olapTable.writeLockOrDdlException();
+ try {
+ boolean isTempPartition =
addPartitionClauses.get(i).isTempPartition();
+ Partition toAddPartition = batchPartsInfo.get(i).second;
+ String partitionName = toAddPartition.getName();
+ // ATTN: Check here to see if the newly created dynamic
+ // partition has already been added by another process.
+ // If it has, do not add this dynamic partition again,
+ // and call `onErasePartition` to clean up any remaining
information.
+ Partition checkIsAdded =
olapTable.getPartition(partitionName, isTempPartition);
+ if (checkIsAdded != null) {
+ LOG.warn("dynamic partition has been added, skip it. "
+ + "db: {}, table: {}, partition: {}, tableId:
{}",
+ db.getFullName(), tableName, partitionName,
olapTable.getId());
+ Env.getCurrentEnv().onErasePartition(toAddPartition);
+ continue;
+ }
+ if (isTempPartition) {
+ olapTable.addTempPartition(toAddPartition);
+ } else {
+ olapTable.addPartition(toAddPartition);
}
+
+
Env.getCurrentEnv().getEditLog().logAddPartition(batchPartsInfo.get(i).first);
+ if
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
+ if (i == batchPartsInfo.size() / 2) {
+ LOG.info("debug point
FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
+ // committed, but log some edit, others failed
+ throw new Exception("debug point
FE.DynamicPartitionScheduler"
+ + ".in.commitCloudPartition");
+ }
+ }
+ } finally {
+ olapTable.writeUnlock();
}
}
LOG.info("finish write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
- partsInfo.size(), db.getFullName(), tableName,
olapTable.getId());
+ batchPartsInfo.size(), db.getFullName(), tableName,
olapTable.getId());
} catch (Exception e) {
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId
{} exception {}",
db.getFullName(), tableName, olapTable.getId(),
e.getMessage());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 90b7f8cd9e9..e5f4fc74c56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1358,7 +1358,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
} finally {
table.readUnlock();
}
- addPartition(db, tableName, clause, false, 0, true);
+ addPartition(db, tableName, clause, false, 0, true, null);
} catch (UserException e) {
throw new DdlException("Failed to ADD PARTITION " +
addPartitionLikeClause.getPartitionName()
@@ -1447,9 +1447,11 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
- public PartitionPersistInfo addPartition(Database db, String tableName,
AddPartitionClause addPartitionClause,
+ public void addPartition(Database db, String tableName, AddPartitionClause
addPartitionClause,
boolean isCreateTable, long
generatedPartitionId,
- boolean writeEditLog) throws
DdlException {
+ boolean writeEditLog,
+ List<Pair<PartitionPersistInfo,
Partition>> batchPartitions)
+ throws DdlException {
// in cloud mode, isCreateTable == true, create dynamic partition use,
so partitionId must have been generated.
// isCreateTable == false, other case, partitionId generate in below,
must be set 0
if (!FeConstants.runningUnitTest && Config.isCloudMode()
@@ -1478,7 +1480,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
if (singlePartitionDesc.isSetIfNotExists()) {
LOG.info("table[{}] add partition[{}] which already
exists", olapTable.getName(), partitionName);
if
(!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) {
- return null;
+ return;
}
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION,
partitionName);
@@ -1645,7 +1647,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
db, tableName, olapTable, partitionName, singlePartitionDesc);
if (ownerFutureOr.isErr()) {
if (ownerFutureOr.unwrapErr() == null) {
- return null;
+ return;
} else {
throw ownerFutureOr.unwrapErr();
}
@@ -1701,7 +1703,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
LOG.info("table[{}] add partition[{}] which already
exists", olapTable.getName(), partitionName);
if (singlePartitionDesc.isSetIfNotExists()) {
failedCleanCallback.run();
- return null;
+ return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION,
partitionName);
}
@@ -1759,12 +1761,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
// update partition info
partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId,
isTempPartition);
- if (isTempPartition) {
- olapTable.addTempPartition(partition);
- } else {
- olapTable.addPartition(partition);
- }
-
// log
PartitionPersistInfo info = null;
if (partitionInfo.getType() == PartitionType.RANGE) {
@@ -1790,11 +1786,16 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
if (writeEditLog) {
Env.getCurrentEnv().getEditLog().logAddPartition(info);
+ if (isTempPartition) {
+ olapTable.addTempPartition(partition);
+ } else {
+ olapTable.addPartition(partition);
+ }
LOG.info("succeed in creating partition[{}], temp: {}",
partitionId, isTempPartition);
} else {
+ batchPartitions.add(Pair.of(info, partition));
LOG.info("postpone creating partition[{}], temp: {}",
partitionId, isTempPartition);
}
- return info;
} finally {
olapTable.writeUnlock();
}
@@ -1840,7 +1841,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (SinglePartitionDesc singlePartitionDesc : singlePartitionDescs) {
AddPartitionClause addPartitionClause = new
AddPartitionClause(singlePartitionDesc, null,
multiPartitionClause.getProperties(), false);
- addPartition(db, tableName, addPartitionClause, false, 0, true);
+ addPartition(db, tableName, addPartitionClause, false, 0, true,
null);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index a92c75c7a43..f84e339a512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -404,7 +404,7 @@ public class MTMVPartitionUtil {
AddPartitionClause addPartitionClause = new
AddPartitionClause(singlePartitionDesc,
mtmv.getDefaultDistributionInfo().toDistributionDesc(),
partitionProperties, false);
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(),
mtmv.getName(), addPartitionClause,
- false, 0, true);
+ false, 0, true, null);
}
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 8302cd896be..95885f92286 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1679,7 +1679,21 @@ public class EditLog {
}
public long logAddPartition(PartitionPersistInfo info) {
+ if (DebugPointUtil.isEnable("FE.logAddPartition.slow")) {
+ DebugPointUtil.DebugPoint debugPoint =
DebugPointUtil.getDebugPoint("FE.logAddPartition.slow");
+ String pName = debugPoint.param("pName", "");
+ if (info.getPartition().getName().equals(pName)) {
+ int sleepMs = debugPoint.param("sleep", 1000);
+ LOG.info("logAddPartition debug point hit, pName {}, sleep {}
s", pName, sleepMs);
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ LOG.warn("sleep interrupted", e);
+ }
+ }
+ }
long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
+ LOG.info("log add partition, logId:{}, info: {}", logId,
info.toJson());
AddPartitionRecord record = new AddPartitionRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
return logId;
@@ -1687,6 +1701,7 @@ public class EditLog {
public long logDropPartition(DropPartitionInfo info) {
long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
+ LOG.info("log drop partition, logId:{}, info: {}", logId,
info.toJson());
Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info,
logId);
return logId;
}
@@ -1697,6 +1712,7 @@ public class EditLog {
public void logRecoverPartition(RecoverInfo info) {
long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info);
+ LOG.info("log recover partition, logId:{}, info: {}", logId,
info.toJson());
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info,
logId);
}
@@ -1715,6 +1731,7 @@ public class EditLog {
public void logDropTable(DropInfo info) {
long logId = logEdit(OperationType.OP_DROP_TABLE, info);
+ LOG.info("log drop table, logId : {}, infos: {}", logId, info);
if (Strings.isNullOrEmpty(info.getCtl()) ||
info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
DropTableRecord record = new DropTableRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record);
@@ -1727,11 +1744,13 @@ public class EditLog {
public void logRecoverTable(RecoverInfo info) {
long logId = logEdit(OperationType.OP_RECOVER_TABLE, info);
+ LOG.info("log recover table, logId : {}, infos: {}", logId, info);
Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info,
logId);
}
public void logDropRollup(DropInfo info) {
long logId = logEdit(OperationType.OP_DROP_ROLLUP, info);
+ LOG.info("log drop rollup, logId : {}, infos: {}", logId, info);
Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId);
}
@@ -1848,7 +1867,8 @@ public class EditLog {
}
public void logDatabaseRename(DatabaseInfo databaseInfo) {
- logEdit(OperationType.OP_RENAME_DB, databaseInfo);
+ long logId = logEdit(OperationType.OP_RENAME_DB, databaseInfo);
+ LOG.info("log database rename, logId : {}, infos: {}", logId,
databaseInfo);
}
public void logTableRename(TableInfo tableInfo) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index caca155df12..2ed46f4882e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3785,7 +3785,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
for (AddPartitionClause addPartitionClause :
addPartitionClauseMap.values()) {
try {
// here maybe check and limit created partitions num
- Env.getCurrentEnv().addPartition(db, olapTable.getName(),
addPartitionClause, false, 0, true);
+ Env.getCurrentEnv().addPartition(db, olapTable.getName(),
addPartitionClause, false, 0, true, null);
} catch (DdlException e) {
LOG.warn(e);
errorStatus.setErrorMsgs(
diff --git
a/regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy
b/regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy
similarity index 99%
rename from
regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy
rename to
regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy
index 682a8fff9ad..9433c97e130 100644
---
a/regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy
+++
b/regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy
@@ -17,7 +17,7 @@
import org.codehaus.groovy.runtime.IOGroovyMethods
-suite ("diffrent_serialize_cloud") {
+suite ("different_serialize_cloud") {
sql """ DROP TABLE IF EXISTS d_table; """
diff --git
a/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy
b/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy
new file mode 100644
index 00000000000..58c259db816
--- /dev/null
+++
b/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import com.mysql.cj.jdbc.StatementImpl
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.NodeType
+import org.apache.doris.regression.suite.SuiteCluster
+
+suite("test_create_partition_and_insert_overwrite_race", 'p0, docker') {
+ if (!isCloudMode()) {
+ return
+ }
+ def options = new ClusterOptions()
+ options.enableDebugPoints()
+ // one master, one observer
+ options.setFeNum(2)
+ options.feConfigs.add('sys_log_verbose_modules=org')
+ options.setBeNum(3)
+ options.cloudMode = true
+
+ // 1. connect to observer
+ options.connectToFollower = true
+ docker(options) {
+ sql """set enable_sql_cache=false"""
+ def tbl = 'test_create_partition_and_insert_overwrite_race_tbl'
+ def tbl2 = 'test_create_partition_and_insert_overwrite_race_tbl2'
+ def createTableSql = { String tableName ->
+ sql """
+ CREATE TABLE ${tableName} (
+ order_id BIGINT,
+ create_dt datetime,
+ username VARCHAR(20)
+ )
+ DUPLICATE KEY(order_id)
+ PARTITION BY RANGE(create_dt) ()
+ DISTRIBUTED BY HASH(order_id) BUCKETS 10
+ PROPERTIES(
+ "dynamic_partition.enable" = "true",
+ "dynamic_partition.time_unit" = "DAY",
+ "dynamic_partition.start" = "-5",
+ "dynamic_partition.end" = "5",
+ "dynamic_partition.prefix" = "p",
+ "dynamic_partition.create_history_partition" = "true"
+ );
+ """
+ }
+
+ createTableSql(tbl)
+ createTableSql(tbl2)
+
+ // Generate insert statements with dates: current date -2, -1, 0, +1,
+2 days
+ def now = new Date()
+ def dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
+ for (def i = -2; i <= 2; i++) {
+ def targetDate = new Date(now.time + i * 24 * 60 * 60 * 1000L)
+ def dateStr = dateFormat.format(targetDate)
+ def hour = String.format("%02d", Math.abs(i) + 1)
+ def insertDate = "${dateStr} ${hour}:00:00"
+ sql """insert into ${tbl2} values (${i + 3}, '${insertDate}',
'test')"""
+ }
+
+ sql """DROP TABLE ${tbl}"""
+ def partitionNameFormat = new java.text.SimpleDateFormat("yyyyMMdd")
+ def currentPartitionName = "p" + partitionNameFormat.format(now)
+ cluster.injectDebugPoints(NodeType.FE,
['FE.logAddPartition.slow':[pName:currentPartitionName, sleep:50 * 1000]])
+ def futrue = thread {
+ for (def i = 0; i < 55; i++) {
+ try_sql """INSERT OVERWRITE TABLE ${tbl} partition(*) select *
from ${tbl2}"""
+ sleep(1 * 1000)
+ cluster.checkFeIsAlive(2, true)
+ }
+ }
+ def future1 = thread {
+ createTableSql(tbl)
+ }
+ futrue.get()
+ future1.get()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]