This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7a24719f482 [enhance](mtmv) Refuse to execute insert overwrite on the
same table … (#40992)
7a24719f482 is described below
commit 7a24719f4821e733173b27b088dfb6b5f7d250fd
Author: zhangdong <[email protected]>
AuthorDate: Thu Sep 19 22:18:46 2024 +0800
[enhance](mtmv) Refuse to execute insert overwrite on the same table …
(#40992)
…(#40558)
pick: https://github.com/apache/doris/pull/40558
---
.../insertoverwrite/InsertOverwriteManager.java | 58 ++++++++++++++++
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 4 ++
.../insert/InsertOverwriteTableCommand.java | 71 ++++++++++++++++++--
.../java/org/apache/doris/qe/StmtExecutor.java | 21 ++++++
.../InsertOverwriteManagerTest.java | 77 ++++++++++++++++++++++
.../org/apache/doris/regression/suite/Suite.groovy | 24 ++++++-
.../suites/mtmv_p0/test_alter_job_mtmv.groovy | 67 +++++++++++++++++++
7 files changed, 315 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index 81524ae0208..a00107c76a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -17,17 +17,21 @@
package org.apache.doris.insertoverwrite;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import
org.apache.doris.insertoverwrite.InsertOverwriteLog.InsertOverwriteOpType;
+import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -40,7 +44,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class InsertOverwriteManager extends MasterDaemon implements Writable {
private static final Logger LOG =
LogManager.getLogger(InsertOverwriteManager.class);
@@ -62,6 +68,11 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
@SerializedName(value = "partitionPairs")
private Map<Long, Map<Long, Long>> partitionPairs =
Maps.newConcurrentMap();
+ // TableId running insert overwrite
+ // dbId ==> Set<tableId>
+ private Map<Long, Set<Long>> runningTables = Maps.newHashMap();
+ private ReentrantReadWriteLock runningLock = new
ReentrantReadWriteLock(true);
+
public InsertOverwriteManager() {
super("InsertOverwriteDropDirtyPartitions", CLEAN_INTERVAL_SECOND *
1000);
}
@@ -270,6 +281,53 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
return InsertOverwriteUtil.dropPartitions(olapTable,
task.getTempPartitionNames());
}
+ /**
+ * If the current table id has a running insert overwrite, throw an
exception.
+ * If not, record it in runningTables
+ *
+ * @param db Run the db for insert overwrite
+ * @param table Run the table for insert overwrite
+ */
+ public void recordRunningTableOrException(DatabaseIf db, TableIf table) {
+ long dbId = db.getId();
+ long tableId = table.getId();
+ runningLock.writeLock().lock();
+ try {
+ if (runningTables.containsKey(dbId) &&
runningTables.get(dbId).contains(tableId)) {
+ throw new AnalysisException(
+ String.format("Not allowed running Insert Overwrite on
same table: %s.%s", db.getFullName(),
+ table.getName()));
+ }
+ if (runningTables.containsKey(dbId)) {
+ runningTables.get(dbId).add(tableId);
+ } else {
+ runningTables.put(dbId, Sets.newHashSet(tableId));
+ }
+ } finally {
+ runningLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Remove from running records
+ *
+ * @param dbId Run the dbId for insert overwrite
+ * @param tableId Run the tableId for insert overwrite
+ */
+ public void dropRunningRecord(long dbId, long tableId) {
+ runningLock.writeLock().lock();
+ try {
+ if (runningTables.containsKey(dbId) &&
runningTables.get(dbId).contains(tableId)) {
+ runningTables.get(dbId).remove(tableId);
+ if (runningTables.get(dbId).size() == 0) {
+ runningTables.remove(dbId);
+ }
+ }
+ } finally {
+ runningLock.writeLock().unlock();
+ }
+ }
+
/**
* replay logs
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 1fa42236c61..b515a03c125 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -237,6 +237,10 @@ public class MTMVTask extends AbstractTask {
ctx.setQueryId(queryId);
ctx.getState().setNereids(true);
command.run(ctx, executor);
+ if (getStatus() == TaskStatus.CANCELED) {
+ // Throwing an exception to interrupt subsequent partition update
tasks
+ throw new JobException("task is CANCELED");
+ }
if (ctx.getState().getStateType() != MysqlStateType.OK) {
throw new JobException(ctx.getState().getErrorMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index 75b80ade581..d2dc409d2f0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -59,11 +60,14 @@ import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.awaitility.Awaitility;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* insert into select command implementation
@@ -80,6 +84,8 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
private LogicalPlan logicalQuery;
private Optional<String> labelName;
private final Optional<LogicalPlan> cte;
+ private AtomicBoolean isCancelled = new AtomicBoolean(false);
+ private AtomicBoolean isRunning = new AtomicBoolean(false);
/**
* constructor
@@ -165,35 +171,88 @@ public class InsertOverwriteTableCommand extends Command
implements ForwardWithS
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase(),
targetTable);
+ isRunning.set(true);
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled.get()) {
+ LOG.info("insert overwrite is cancelled before
registerTask, queryId: {}",
+ ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled.get()) {
+ LOG.info("insert overwrite is cancelled before
addTempPartitions, queryId: {}",
+ ctx.getQueryIdentifier());
+ // not need deal temp partition
+ insertOverwriteManager.taskSuccess(taskId);
+ return;
+ }
InsertOverwriteUtil.addTempPartitions(targetTable,
partitionNames, tempPartitionNames);
+ if (isCancelled.get()) {
+ LOG.info("insert overwrite is cancelled before insertInto,
queryId: {}", ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
insertInto(ctx, executor, tempPartitionNames);
+ if (isCancelled.get()) {
+ LOG.info("insert overwrite is cancelled before
replacePartition, queryId: {}",
+ ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
InsertOverwriteUtil.replacePartition(targetTable,
partitionNames, tempPartitionNames);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+ if (isCancelled.get()) {
+ LOG.info("insert overwrite is cancelled before
taskSuccess, do nothing, queryId: {}",
+ ctx.getQueryIdentifier());
+ }
+ insertOverwriteManager.taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed with task(or group) id " +
taskId);
if (isAutoDetectOverwrite()) {
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
+ insertOverwriteManager.taskGroupFail(taskId);
} else {
-
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
+ insertOverwriteManager.taskFail(taskId);
}
throw e;
} finally {
ConnectContext.get().setSkipAuth(false);
+ insertOverwriteManager
+ .dropRunningRecord(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning.set(false);
+ }
+ }
+
+ /**
+ * cancel insert overwrite
+ */
+ public void cancel() {
+ this.isCancelled.set(true);
+ }
+
+ /**
+ * wait insert overwrite not running
+ */
+ public void waitNotRunning() {
+ long waitMaxTimeSecond = 10L;
+ try {
+ Awaitility.await().atMost(waitMaxTimeSecond,
TimeUnit.SECONDS).untilFalse(isRunning);
+ } catch (Exception e) {
+ LOG.warn("waiting time exceeds {} second, stop wait, labelName:
{}", waitMaxTimeSecond,
+ labelName.isPresent() ? labelName.get() : "", e);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7efbadbf78a..027be4c6eed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1471,6 +1471,11 @@ public class StmtExecutor {
// Because this is called by other thread
public void cancel() {
+ Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand =
getInsertOverwriteTableCommand();
+ if (insertOverwriteTableCommand.isPresent()) {
+ // If the be scheduling has not been triggered yet, cancel the
scheduling first
+ insertOverwriteTableCommand.get().cancel();
+ }
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
@@ -1481,6 +1486,22 @@ public class StmtExecutor {
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof
AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
+ if (insertOverwriteTableCommand.isPresent()) {
+ // Wait for the command to run or cancel completion
+ insertOverwriteTableCommand.get().waitNotRunning();
+ }
+ }
+
+ private Optional<InsertOverwriteTableCommand>
getInsertOverwriteTableCommand() {
+ if (parsedStmt instanceof LogicalPlanAdapter) {
+ LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter)
parsedStmt;
+ LogicalPlan logicalPlan = logicalPlanAdapter.getLogicalPlan();
+ if (logicalPlan instanceof InsertOverwriteTableCommand) {
+ InsertOverwriteTableCommand insertOverwriteTableCommand =
(InsertOverwriteTableCommand) logicalPlan;
+ return Optional.of(insertOverwriteTableCommand);
+ }
+ }
+ return Optional.empty();
}
// Because this is called by other thread
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
new file mode 100644
index 00000000000..4bf6c9f12d5
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.insertoverwrite;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class InsertOverwriteManagerTest {
+ @Mocked
+ private DatabaseIf db;
+
+ @Mocked
+ private TableIf table;
+
+ @Before
+ public void setUp()
+ throws NoSuchMethodException, SecurityException,
AnalysisException, DdlException, MetaNotFoundException {
+
+ new Expectations() {
+ {
+ db.getId();
+ minTimes = 0;
+ result = 1L;
+
+ db.getFullName();
+ minTimes = 0;
+ result = "db1";
+
+ table.getId();
+ minTimes = 0;
+ result = 2L;
+
+ table.getName();
+ minTimes = 0;
+ result = "table1";
+ }
+ };
+ }
+
+ @Test
+ public void testParallel() {
+ InsertOverwriteManager manager = new InsertOverwriteManager();
+ manager.recordRunningTableOrException(db, table);
+ try {
+ manager.recordRunningTableOrException(db, table);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Not allowed"));
+ }
+ manager.dropRunningRecord(db.getId(), table.getId());
+ manager.recordRunningTableOrException(db, table);
+ }
+
+}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 65b298ae009..e76de35799d 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1166,7 +1166,29 @@ class Suite implements GroovyInterceptable {
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
- } while (timeoutTimestamp > System.currentTimeMillis() && (status ==
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+ } while (timeoutTimestamp > System.currentTimeMillis() && (status ==
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+ if (status != "SUCCESS") {
+ logger.info("status is not success")
+ }
+ Assert.assertEquals("SUCCESS", status)
+ }
+
+ void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
+ Thread.sleep(2000);
+ String showTasks = "select
TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from
tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
+ String status = "NULL"
+ List<List<Object>> result
+ long startTime = System.currentTimeMillis()
+ long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
+ do {
+ result = sql(showTasks)
+ logger.info("result: " + result.toString())
+ if (!result.isEmpty()) {
+ status = result.last().get(4)
+ }
+ logger.info("The state of ${showTasks} is ${status}")
+ Thread.sleep(1000);
+ } while (timeoutTimestamp > System.currentTimeMillis() && (status ==
'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED'))
if (status != "SUCCESS") {
logger.info("status is not success")
}
diff --git a/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy
new file mode 100644
index 00000000000..fa1618d5bf5
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_alter_job_mtmv.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert;
+
+suite("test_alter_job_mtmv") {
+ String suiteName = "test_alter_job_mtmv"
+ String tableName = "${suiteName}_table"
+ String mvName = "${suiteName}_mv"
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE TABLE `${tableName}` (
+ `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
+ `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
+ `num` SMALLINT NOT NULL COMMENT '\"数量\"'
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`, `date`, `num`)
+ COMMENT 'OLAP'
+ PARTITION BY RANGE(`date`)
+ (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
+ PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
+ PARTITION p201703_all VALUES [('2017-03-01'), ('2017-04-01')))
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
+ PROPERTIES ('replication_num' = '1') ;
+ """
+ sql """
+ insert into ${tableName}
values(1,"2017-01-15",1),(1,"2017-02-15",2),(1,"2017-03-15",3);
+ """
+
+ //This is an immediately built materialized view that cancels running
tasks and creates new ones after updating job information.
+ // Due to the uncertainty of the case, there may be several situations
here:
+ // 1. The task has not been created yet, so it has not been cancelled
+ // 2. The task has been completed, so there was no cancellation
+ // 3. The task has been created but not yet completed
+ // But regardless of the status of the previous case,
+ // this case is used to ensure that the newly launched task can run
successfully after modifying the materialized view
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ REFRESH COMPLETE ON MANUAL
+ partition by(`date`)
+ DISTRIBUTED BY RANDOM BUCKETS 2
+ PROPERTIES ('replication_num' = '1')
+ AS
+ SELECT * FROM ${tableName};
+ """
+ sql """alter MATERIALIZED VIEW ${mvName} refresh COMPLETE on commit; """
+ waitingMTMVTaskFinishedByMvNameAllowCancel(mvName)
+
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]