morningman commented on code in PR #40558:
URL: https://github.com/apache/doris/pull/40558#discussion_r1759653217
##########
fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java:
##########
@@ -270,6 +279,50 @@ private boolean rollback(long taskId) {
return InsertOverwriteUtil.dropPartitions(olapTable,
task.getTempPartitionNames());
}
+ /**
+ * If the current table id has a running insert overwrite, throw an
exception.
+ * If not, record it in runningTables
+ *
+ * @param dbId Run the dbId for insert overwrite
+ * @param tableId Run the tableId for insert overwrite
+ */
+ public void recordRunningTableOrException(long dbId, long tableId) {
+ runningLock.writeLock().lock();
+ try {
+ if (runningTables.containsKey(dbId) &&
runningTables.get(dbId).contains(tableId)) {
+ throw new AnalysisException(
+ String.format("insert overwrite is running on db: %s,
table: %s", dbId, tableId));
Review Comment:
```suggestion
String.format("Not allowed running Insert Overwrite
on same table: %s.%s", dbId, tableId));
```
And I suggest to use db/table name in error msg
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
Review Comment:
```suggestion
LOG.info("insert overwrite is cancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
addTempPartitions, queryId: {}",
Review Comment:
```suggestion
LOG.info("insert overwrite is cancelled before
addTempPartitions, queryId: {}",
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
addTempPartitions, queryId: {}",
+ ctx.getQueryIdentifier());
+ // not need deal temp partition
+ insertOverwriteManager.taskSuccess(taskId);
+ return;
+ }
InsertOverwriteUtil.addTempPartitions(targetTable,
partitionNames, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before insertInto,
queryId: {}", ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
insertInto(ctx, executor, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
replacePartition, queryId: {}",
Review Comment:
ditto
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
addTempPartitions, queryId: {}",
+ ctx.getQueryIdentifier());
+ // not need deal temp partition
+ insertOverwriteManager.taskSuccess(taskId);
+ return;
+ }
InsertOverwriteUtil.addTempPartitions(targetTable,
partitionNames, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before insertInto,
queryId: {}", ctx.getQueryIdentifier());
Review Comment:
ditto
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
addTempPartitions, queryId: {}",
+ ctx.getQueryIdentifier());
+ // not need deal temp partition
+ insertOverwriteManager.taskSuccess(taskId);
+ return;
+ }
InsertOverwriteUtil.addTempPartitions(targetTable,
partitionNames, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before insertInto,
queryId: {}", ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
insertInto(ctx, executor, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
replacePartition, queryId: {}",
+ ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
InsertOverwriteUtil.replacePartition(targetTable,
partitionNames, tempPartitionNames);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before taskSuccess,
do nothing, queryId: {}",
+ ctx.getQueryIdentifier());
+ }
+ insertOverwriteManager.taskSuccess(taskId);
}
} catch (Exception e) {
LOG.warn("insert into overwrite failed with task(or group) id " +
taskId);
if (isAutoDetectOverwrite()) {
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupFail(taskId);
+ insertOverwriteManager.taskGroupFail(taskId);
} else {
-
Env.getCurrentEnv().getInsertOverwriteManager().taskFail(taskId);
+ insertOverwriteManager.taskFail(taskId);
}
throw e;
} finally {
ConnectContext.get().setSkipAuth(false);
+ insertOverwriteManager
+ .dropRunningRecord(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = false;
+ }
+ }
+
+ /**
+ * cancel insert overwrite
+ */
+ public void cancel() {
+ this.isCancelled = true;
+ }
+
+ /**
+ * wait insert overwrite not running
+ */
+ public void waitNotRunning() {
+ long waitMaxTimeMills = 10 * 1000L;
+ long waitTime = 0;
+ while (true) {
Review Comment:
You can use `Awaitility`, just like using it in regression test case
##########
regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy:
##########
@@ -1275,7 +1275,29 @@ class Suite implements GroovyInterceptable {
}
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
- } while (timeoutTimestamp > System.currentTimeMillis() && (status ==
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+ } while (timeoutTimestamp > System.currentTimeMillis() && (status ==
'PENDING' || status == 'RUNNING' || status == 'NULL'))
+ if (status != "SUCCESS") {
+ logger.info("status is not success")
+ }
+ Assert.assertEquals("SUCCESS", status)
+ }
+
+ void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
+ Thread.sleep(2000);
+ String showTasks = "select
TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from
tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
+ String status = "NULL"
+ List<List<Object>> result
+ long startTime = System.currentTimeMillis()
+ long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
Review Comment:
Use `Awaitility`
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -157,35 +160,103 @@ public void run(ConnectContext ctx, StmtExecutor
executor) throws Exception {
// Do not create temp partition on FE
partitionNames = new ArrayList<>();
}
+ InsertOverwriteManager insertOverwriteManager =
Env.getCurrentEnv().getInsertOverwriteManager();
+
insertOverwriteManager.recordRunningTableOrException(targetTable.getDatabase().getId(),
targetTable.getId());
+ isRunning = true;
long taskId = 0;
try {
if (isAutoDetectOverwrite()) {
// taskId here is a group id. it contains all replace tasks
made and registered in rpc process.
- taskId =
Env.getCurrentEnv().getInsertOverwriteManager().registerTaskGroup();
+ taskId = insertOverwriteManager.registerTaskGroup();
// When inserting, BE will call to replace partition by
FrontendService. FE will register new temp
// partitions and return. for transactional, the replacement
will really occur when insert successed,
// i.e. `insertInto` finished. then we call taskGroupSuccess
to make replacement.
insertInto(ctx, executor, taskId);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskGroupSuccess(taskId,
(OlapTable) targetTable);
+ insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable)
targetTable);
} else {
List<String> tempPartitionNames =
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
- taskId = Env.getCurrentEnv().getInsertOverwriteManager()
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
registerTask, queryId: {}", ctx.getQueryIdentifier());
+ return;
+ }
+ taskId = insertOverwriteManager
.registerTask(targetTable.getDatabase().getId(),
targetTable.getId(), tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
addTempPartitions, queryId: {}",
+ ctx.getQueryIdentifier());
+ // not need deal temp partition
+ insertOverwriteManager.taskSuccess(taskId);
+ return;
+ }
InsertOverwriteUtil.addTempPartitions(targetTable,
partitionNames, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before insertInto,
queryId: {}", ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
insertInto(ctx, executor, tempPartitionNames);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before
replacePartition, queryId: {}",
+ ctx.getQueryIdentifier());
+ insertOverwriteManager.taskFail(taskId);
+ return;
+ }
InsertOverwriteUtil.replacePartition(targetTable,
partitionNames, tempPartitionNames);
-
Env.getCurrentEnv().getInsertOverwriteManager().taskSuccess(taskId);
+ if (isCancelled) {
+ LOG.info("insert overwrite isCancelled before taskSuccess,
do nothing, queryId: {}",
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]