This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 547c18d28b [feature](load) support CLEAN LABEL stmt (#11362)
547c18d28b is described below
commit 547c18d28b45bcf4388dab9c3206c30206404dc9
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Aug 1 10:43:33 2022 +0800
[feature](load) support CLEAN LABEL stmt (#11362)
---
.../Load/CLEAN-LABEL.md | 62 +++++++++++
.../Load/CLEAN-LABEL.md | 62 +++++++++++
fe/fe-core/src/main/cup/sql_parser.cup | 15 ++-
.../org/apache/doris/analysis/CleanLabelStmt.java | 72 +++++++++++++
.../java/org/apache/doris/analysis/InsertStmt.java | 11 +-
.../org/apache/doris/journal/JournalEntity.java | 6 ++
.../org/apache/doris/load/loadv2/LoadManager.java | 98 ++++++++++++++++-
.../doris/persist/CleanLabelOperationLog.java | 60 +++++++++++
.../java/org/apache/doris/persist/EditLog.java | 9 ++
.../org/apache/doris/persist/OperationType.java | 1 +
.../main/java/org/apache/doris/qe/DdlExecutor.java | 3 +
.../doris/transaction/DatabaseTransactionMgr.java | 49 +++++++++
.../data/load/clean_label/test_clean_label.out | 18 ++++
.../load/clean_label/test_clean_label.groovy | 116 +++++++++++++++++++++
14 files changed, 569 insertions(+), 13 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
new file mode 100644
index 0000000000..8f7abe201e
--- /dev/null
+++
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
@@ -0,0 +1,62 @@
+---
+{
+ "title": "CLEAN-LABEL",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## CLEAN-LABEL
+
+### Name
+
+CLEAN LABEL
+
+### Description
+
+For manual cleanup of historical load jobs. After cleaning, the Label can be
reused.
+
+Syntax:
+
+```sql
+CLEAN LABEL [label] FROM db;
+```
+
+### Example
+
+1. Clean label label1 from database db1
+
+ ```sql
+ CLEAN LABEL label1 FROM db1;
+ ```
+
+2. Clean all labels from database db1
+
+ ```sql
+ CLEAN LABEL FROM db1;
+ ```
+
+### Keywords
+
+ CLEAN, LABEL
+
+### Best Practice
+
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
new file mode 100644
index 0000000000..83d9a90600
--- /dev/null
+++
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
@@ -0,0 +1,62 @@
+---
+{
+ "title": "CLEAN-LABEL",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## CLEAN-LABEL
+
+### Name
+
+CLEAN LABEL
+
+### Description
+
+用于手动清理历史导入作业的 Label。清理后,Label 可以重复使用。
+
+语法:
+
+```sql
+CLEAN LABEL [label] FROM db;
+```
+
+### Example
+
+1. 清理 db1 中,Label 为 label1 的导入作业。
+
+ ```sql
+ CLEAN LABEL label1 FROM db1;
+ ```
+
+2. 清理 db1 中所有历史 Label。
+
+ ```sql
+ CLEAN LABEL FROM db1;
+ ```
+
+### Keywords
+
+ CLEAN, LABEL
+
+### Best Practice
+
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index 985bd1d15d..d938b10201 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -304,7 +304,7 @@ nonterminal StatementBase stmt, show_stmt, show_param,
help_stmt, load_stmt,
use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt,
create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
link_stmt, migrate_stmt, switch_stmt, enter_stmt, transaction_stmt,
unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt,
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
- import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt,
refresh_stmt;
+ import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt,
refresh_stmt, clean_stmt;
nonterminal String transaction_label;
nonterminal ImportColumnDesc import_column_desc;
@@ -755,6 +755,8 @@ stmt ::=
{: RESULT = stmt; :}
| refresh_stmt:stmt
{: RESULT = stmt; :}
+ | clean_stmt:stmt
+ {: RESULT = stmt; :}
| /* empty: query only has comments */
{:
RESULT = new EmptyStmt();
@@ -794,6 +796,17 @@ refresh_stmt ::=
:}
;
+clean_stmt ::=
+ KW_CLEAN KW_LABEL from_or_in ident:db
+ {:
+ RESULT = new CleanLabelStmt(db, null);
+ :}
+ | KW_CLEAN KW_LABEL ident:label from_or_in ident:db
+ {:
+ RESULT = new CleanLabelStmt(db, label);
+ :}
+ ;
+
// plugin statement
install_plugin_stmt ::=
KW_INSTALL KW_PLUGIN KW_FROM ident_or_text:source opt_properties:properties
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java
new file mode 100644
index 0000000000..3fe587e5f0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Strings;
+
+/**
+ * CLEAN LABEL FROM db;
+ * CLEAN LABEL my_label FROM db;
+ */
+public class CleanLabelStmt extends DdlStmt {
+ private String db;
+ private String label;
+
+ public CleanLabelStmt(String db, String label) {
+ this.db = db;
+ this.label = label;
+ }
+
+ public String getDb() {
+ return db;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws UserException {
+ super.analyze(analyzer);
+ db = ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER,
db);
+ label = Strings.nullToEmpty(label);
+ // check auth
+ if (!Env.getCurrentEnv().getAuth().checkDbPriv(ConnectContext.get(),
db, PrivPredicate.LOAD)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"LOAD");
+ }
+ }
+
+ @Override
+ public String toSql() {
+ return "CLEAN LABEL" + (Strings.isNullOrEmpty(label) ? " " : " " +
label) + " FROM " + db;
+ }
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.FORWARD_WITH_SYNC;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 28fbf8999a..bb6ece1e98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -99,7 +99,6 @@ public class InsertStmt extends DdlStmt {
private Boolean isRepartition;
private boolean isStreaming = false;
private String label = null;
- private boolean isUserSpecifiedLabel = false;
private Map<Long, Integer> indexIdToSchemaHash = null;
@@ -142,10 +141,6 @@ public class InsertStmt extends DdlStmt {
this.planHints = hints;
this.targetColumnNames = cols;
- if (!Strings.isNullOrEmpty(label)) {
- isUserSpecifiedLabel = true;
- }
-
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
@@ -248,10 +243,6 @@ public class InsertStmt extends DdlStmt {
return label;
}
- public boolean isUserSpecifiedLabel() {
- return isUserSpecifiedLabel;
- }
-
public DataSink getDataSink() {
return dataSink;
}
@@ -306,7 +297,7 @@ public class InsertStmt extends DdlStmt {
// create label and begin transaction
long timeoutSecond =
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
if (Strings.isNullOrEmpty(label)) {
- label = "insert_" +
DebugUtil.printId(analyzer.getContext().queryId());
+ label = "insert_" +
DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
}
if (!isExplain() && !isTransactionBegin) {
if (targetTable instanceof OlapTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 943315def0..19556ce614 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -57,6 +57,7 @@ import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.persist.ClusterInfo;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.ConsistencyCheckInfo;
@@ -679,6 +680,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_CLEAN_LABEL: {
+ data = CleanLabelOperationLog.read(in);
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4e29c2775f..92ec49ddbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Database;
@@ -39,7 +40,9 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
+import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import com.google.common.annotations.VisibleForTesting;
@@ -573,8 +576,8 @@ public class LoadManager implements Writable {
Map<String, List<LoadJob>> labelToLoadJobs =
dbIdToLabelToLoadJobs.get(dbId);
if (labelToLoadJobs.containsKey(label)) {
List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
- Optional<LoadJob> loadJobOptional =
- labelLoadJobs.stream().filter(entity ->
entity.getState() != JobState.CANCELLED).findFirst();
+ Optional<LoadJob> loadJobOptional = labelLoadJobs.stream()
+ .filter(entity -> entity.getState() !=
JobState.CANCELLED).findFirst();
if (loadJobOptional.isPresent()) {
LOG.warn("Failed to add load job when label {} has been
used.", label);
throw new LabelAlreadyUsedException(label);
@@ -583,6 +586,97 @@ public class LoadManager implements Writable {
}
}
+ public void cleanLabel(CleanLabelStmt stmt) throws DdlException {
+ String dbName = stmt.getDb();
+ String label = stmt.getLabel();
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+ cleanLabelInternal(db.getId(), label, false);
+ }
+
+ public void replayCleanLabel(CleanLabelOperationLog log) {
+ cleanLabelInternal(log.getDbId(), log.getLabel(), true);
+ }
+
+ /**
+ * Clean the label with given database and label
+ * It will only remove the load jobs which are already done.
+ * 1. Remove from LoadManager
+ * 2. Remove from DatabaseTransactionMgr
+ *
+ * @param dbId
+ * @param label
+ * @param isReplay
+ */
+ private void cleanLabelInternal(long dbId, String label, boolean isReplay)
{
+ // 1. Remove from LoadManager
+ int counter = 0;
+ writeLock();
+ try {
+ if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+ // no label in this db, just return
+ return;
+ }
+ Map<String, List<LoadJob>> labelToJob =
dbIdToLabelToLoadJobs.get(dbId);
+ if (Strings.isNullOrEmpty(label)) {
+ // clean all labels in this db
+ Iterator<Map.Entry<String, List<LoadJob>>> iter =
labelToJob.entrySet().iterator();
+ while (iter.hasNext()) {
+ List<LoadJob> jobs = iter.next().getValue();
+ Iterator<LoadJob> innerIter = jobs.iterator();
+ while (innerIter.hasNext()) {
+ LoadJob job = innerIter.next();
+ if (!job.isCompleted()) {
+ continue;
+ }
+ innerIter.remove();
+ idToLoadJob.remove(job.getId());
+ ++counter;
+ }
+ if (jobs.isEmpty()) {
+ iter.remove();
+ }
+ }
+ } else {
+ List<LoadJob> jobs = labelToJob.get(label);
+ if (jobs == null) {
+ // no job for this label, just return
+ return;
+ }
+ Iterator<LoadJob> iter = jobs.iterator();
+ while (iter.hasNext()) {
+ LoadJob job = iter.next();
+ if (!job.isCompleted()) {
+ continue;
+ }
+ iter.remove();
+ idToLoadJob.remove(job.getId());
+ ++counter;
+ }
+ if (jobs.isEmpty()) {
+ labelToJob.remove(label);
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ LOG.info("clean {} labels on db {} with label '{}' in load mgr.",
counter, dbId, label);
+
+ // 2. Remove from DatabaseTransactionMgr
+ try {
+ DatabaseTransactionMgr dbTxnMgr =
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
+ dbTxnMgr.cleanLabel(label);
+ } catch (AnalysisException e) {
+ // just ignore, because we don't want to throw any exception here.
+ }
+
+ // 3. Log
+ if (!isReplay) {
+ CleanLabelOperationLog log = new CleanLabelOperationLog(dbId,
label);
+ Env.getCurrentEnv().getEditLog().logCleanLabel(log);
+ }
+ LOG.info("finished to clean label on db {} with label {}. is replay:
{}", dbId, label, isReplay);
+ }
+
private void readLock() {
lock.readLock().lock();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java
new file mode 100644
index 0000000000..7410e81d6a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * For clean label
+ */
+public class CleanLabelOperationLog implements Writable {
+ @SerializedName(value = "dbId")
+ private long dbId;
+ @SerializedName(value = "label")
+ private String label;
+
+ public CleanLabelOperationLog(long dbId, String label) {
+ this.dbId = dbId;
+ this.label = label;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, GsonUtils.GSON.toJson(this));
+ }
+
+ public static CleanLabelOperationLog read(DataInput in) throws IOException
{
+ return GsonUtils.GSON.fromJson(Text.readString(in),
CleanLabelOperationLog.class);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e33c3e3788..9da6cdd656 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -863,6 +863,11 @@ public class EditLog {
env.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info);
break;
}
+ case OperationType.OP_CLEAN_LABEL: {
+ final CleanLabelOperationLog log =
(CleanLabelOperationLog) journal.getData();
+ env.getLoadManager().replayCleanLabel(log);
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1497,4 +1502,8 @@ public class EditLog {
public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info)
{
logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS, info);
}
+
+ public void logCleanLabel(CleanLabelOperationLog log) {
+ logEdit(OperationType.OP_CLEAN_LABEL, log);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 168d88ec96..411bf056b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -83,6 +83,7 @@ public class OperationType {
public static final short OP_LOAD_CANCEL = 35;
public static final short OP_EXPORT_CREATE = 36;
public static final short OP_EXPORT_UPDATE_STATE = 37;
+ public static final short OP_CLEAN_LABEL = 38;
@Deprecated
public static final short OP_FINISH_SYNC_DELETE = 40;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5264852214..2c0b426d3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -47,6 +47,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
import org.apache.doris.analysis.CreateClusterStmt;
import org.apache.doris.analysis.CreateDataSyncJobStmt;
@@ -315,6 +316,8 @@ public class DdlExecutor {
env.getDataSourceMgr().alterCatalogName((AlterCatalogNameStmt)
ddlStmt);
} else if (ddlStmt instanceof AlterCatalogPropertyStmt) {
env.getDataSourceMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt);
+ } else if (ddlStmt instanceof CleanLabelStmt) {
+ env.getCurrentEnv().getLoadManager().cleanLabel((CleanLabelStmt)
ddlStmt);
} else {
throw new DdlException("Unknown statement.");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 83e437f5ca..4d5b2fd8e9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -57,6 +57,7 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -1723,4 +1724,52 @@ public class DatabaseTransactionMgr {
transactionState.write(out);
}
}
+
+ public void cleanLabel(String label) {
+ Set<Long> removedTxnIds = Sets.newHashSet();
+ writeLock();
+ try {
+ if (Strings.isNullOrEmpty(label)) {
+ Iterator<Map.Entry<String, Set<Long>>> iter =
labelToTxnIds.entrySet().iterator();
+ while (iter.hasNext()) {
+ Set<Long> txnIds = iter.next().getValue();
+ Iterator<Long> innerIter = txnIds.iterator();
+ while (innerIter.hasNext()) {
+ long txnId = innerIter.next();
+ if (idToFinalStatusTransactionState.remove(txnId) !=
null) {
+ innerIter.remove();
+ removedTxnIds.add(txnId);
+ }
+ }
+ if (txnIds.isEmpty()) {
+ iter.remove();
+ }
+ }
+ } else {
+ Set<Long> txnIds = labelToTxnIds.get(label);
+ if (txnIds == null) {
+ return;
+ }
+ Iterator<Long> iter = txnIds.iterator();
+ while (iter.hasNext()) {
+ long txnId = iter.next();
+ if (idToFinalStatusTransactionState.remove(txnId) != null)
{
+ iter.remove();
+ removedTxnIds.add(txnId);
+ }
+ }
+ if (txnIds.isEmpty()) {
+ labelToTxnIds.remove(label);
+ }
+ }
+ // remove from finalStatusTransactionStateDequeShort and
finalStatusTransactionStateDequeLong
+ // So that we can keep consistency in meta image
+ finalStatusTransactionStateDequeShort.removeIf(txn ->
removedTxnIds.contains(txn.getTransactionId()));
+ finalStatusTransactionStateDequeLong.removeIf(txn ->
removedTxnIds.contains(txn.getTransactionId()));
+ } finally {
+ writeUnlock();
+ }
+ LOG.info("clean {} labels on db {} with label '{}' in database
transaction mgr.", removedTxnIds.size(), dbId,
+ label);
+ }
}
diff --git a/regression-test/data/load/clean_label/test_clean_label.out
b/regression-test/data/load/clean_label/test_clean_label.out
new file mode 100644
index 0000000000..6d03950a8b
--- /dev/null
+++ b/regression-test/data/load/clean_label/test_clean_label.out
@@ -0,0 +1,18 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+1 2
+1 2
+1 2
+1 2
+
+-- !select --
+1 2
+1 2
+1 2
+1 2
+1 2
+1 2
+1 2
+1 2
+1 2
+
diff --git a/regression-test/suites/load/clean_label/test_clean_label.groovy
b/regression-test/suites/load/clean_label/test_clean_label.groovy
new file mode 100644
index 0000000000..b80b7b3278
--- /dev/null
+++ b/regression-test/suites/load/clean_label/test_clean_label.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_clean_label", "load") {
+ // define a sql table
+ def testTable = "tbl_test_clean_label"
+
+ def create_test_table = {testTablex, enable_vectorized_flag ->
+ if (enable_vectorized_flag) {
+ sql """ set enable_vectorized_engine = true """
+ }
+
+ def result1 = sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ `k1` INT(11) NULL COMMENT "",
+ `k2` INT(11) NULL COMMENT ""
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "storage_format" = "V2"
+ )
+ """
+
+ // DDL/DML return 1 row and 3 column, the only value is update row
count
+ assertTrue(result1.size() == 1)
+ assertTrue(result1[0].size() == 1)
+ assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+ }
+
+ // case1:
+ try {
+ sql "DROP TABLE IF EXISTS ${testTable}"
+ create_test_table.call(testTable, true)
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test1 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test2 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test3 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test4 select
1, 2;"
+ }
+
+ qt_select "select * from ${testTable} order by k1"
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test4 select
1, 2;"
+ exception "errCode = 2, detailMessage = Label"
+ }
+
+ test {
+ sql "clean label clean_label_test4 from
regression_test_load_clean_label;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test4 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test1 select
1, 2;"
+ exception "errCode = 2, detailMessage = Label"
+ }
+
+ test {
+ sql "clean label from regression_test_load_clean_label;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test1 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test2 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test3 select
1, 2;"
+ }
+
+ test {
+ sql "insert into ${testTable} with label clean_label_test4 select
1, 2;"
+ }
+
+ qt_select "select * from ${testTable} order by k1;"
+
+ test {
+ sql "clean label from regression_test_load_clean_label;"
+ }
+
+ } finally {
+ try_sql("DROP TABLE IF EXISTS ${testTable}")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]