This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 547c18d28b [feature](load) support CLEAN LABEL stmt (#11362)
547c18d28b is described below

commit 547c18d28b45bcf4388dab9c3206c30206404dc9
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Aug 1 10:43:33 2022 +0800

    [feature](load) support CLEAN LABEL stmt (#11362)
---
 .../Load/CLEAN-LABEL.md                            |  62 +++++++++++
 .../Load/CLEAN-LABEL.md                            |  62 +++++++++++
 fe/fe-core/src/main/cup/sql_parser.cup             |  15 ++-
 .../org/apache/doris/analysis/CleanLabelStmt.java  |  72 +++++++++++++
 .../java/org/apache/doris/analysis/InsertStmt.java |  11 +-
 .../org/apache/doris/journal/JournalEntity.java    |   6 ++
 .../org/apache/doris/load/loadv2/LoadManager.java  |  98 ++++++++++++++++-
 .../doris/persist/CleanLabelOperationLog.java      |  60 +++++++++++
 .../java/org/apache/doris/persist/EditLog.java     |   9 ++
 .../org/apache/doris/persist/OperationType.java    |   1 +
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   3 +
 .../doris/transaction/DatabaseTransactionMgr.java  |  49 +++++++++
 .../data/load/clean_label/test_clean_label.out     |  18 ++++
 .../load/clean_label/test_clean_label.groovy       | 116 +++++++++++++++++++++
 14 files changed, 569 insertions(+), 13 deletions(-)

diff --git 
a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
new file mode 100644
index 0000000000..8f7abe201e
--- /dev/null
+++ 
b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
@@ -0,0 +1,62 @@
+---
+{
+    "title": "CLEAN-LABEL",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## CLEAN-LABEL
+
+### Name
+
+CLEAN LABEL
+
+### Description
+
+For manual cleanup of historical load jobs. After cleaning, the Label can be 
reused.
+
+Syntax:
+
+```sql
+CLEAN LABEL [label] FROM db;
+```
+
+### Example
+
+1. Clean label label1 from database db1
+
+       ```sql
+       CLEAN LABEL label1 FROM db1;
+       ```
+
+2. Clean all labels from database db1
+
+       ```sql
+       CLEAN LABEL FROM db1;
+       ```
+
+### Keywords
+
+    CLEAN, LABEL
+
+### Best Practice
+
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
new file mode 100644
index 0000000000..83d9a90600
--- /dev/null
+++ 
b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/CLEAN-LABEL.md
@@ -0,0 +1,62 @@
+---
+{
+    "title": "CLEAN-LABEL",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## CLEAN-LABEL
+
+### Name
+
+CLEAN LABEL
+
+### Description
+
+用于手动清理历史导入作业的 Label。清理后,Label 可以重复使用。
+
+语法:
+
+```sql
+CLEAN LABEL [label] FROM db;
+```
+
+### Example
+
+1. 清理 db1 中,Label 为 label1 的导入作业。
+
+       ```sql
+       CLEAN LABEL label1 FROM db1;
+       ```
+
+2. 清理 db1 中所有历史 Label。
+
+       ```sql
+       CLEAN LABEL FROM db1;
+       ```
+
+### Keywords
+
+    CLEAN, LABEL
+
+### Best Practice
+
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup 
b/fe/fe-core/src/main/cup/sql_parser.cup
index 985bd1d15d..d938b10201 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -304,7 +304,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, 
help_stmt, load_stmt,
     use_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, 
create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
     link_stmt, migrate_stmt, switch_stmt, enter_stmt, transaction_stmt, 
unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
     import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, 
import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
-    import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, 
refresh_stmt;
+    import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, 
refresh_stmt, clean_stmt;
 
 nonterminal String transaction_label;
 nonterminal ImportColumnDesc import_column_desc;
@@ -755,6 +755,8 @@ stmt ::=
     {: RESULT = stmt; :}
     | refresh_stmt:stmt
     {: RESULT = stmt; :}
+    | clean_stmt:stmt
+    {: RESULT = stmt; :}
     | /* empty: query only has comments */
     {:
         RESULT = new EmptyStmt();
@@ -794,6 +796,17 @@ refresh_stmt ::=
     :}
     ;
 
+clean_stmt ::=
+    KW_CLEAN KW_LABEL from_or_in ident:db
+    {:
+        RESULT = new CleanLabelStmt(db, null);
+    :}
+    | KW_CLEAN KW_LABEL ident:label from_or_in ident:db
+    {:
+        RESULT = new CleanLabelStmt(db, label);
+    :}
+    ;
+
 // plugin statement
 install_plugin_stmt ::=
     KW_INSTALL KW_PLUGIN KW_FROM ident_or_text:source opt_properties:properties
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java
new file mode 100644
index 0000000000..3fe587e5f0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CleanLabelStmt.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.base.Strings;
+
+/**
+ * CLEAN LABEL FROM db;
+ * CLEAN LABEL my_label FROM db;
+ */
+public class CleanLabelStmt extends DdlStmt {
+    private String db;
+    private String label;
+
+    public CleanLabelStmt(String db, String label) {
+        this.db = db;
+        this.label = label;
+    }
+
+    public String getDb() {
+        return db;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        super.analyze(analyzer);
+        db = ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, 
db);
+        label = Strings.nullToEmpty(label);
+        // check auth
+        if (!Env.getCurrentEnv().getAuth().checkDbPriv(ConnectContext.get(), 
db, PrivPredicate.LOAD)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, 
"LOAD");
+        }
+    }
+
+    @Override
+    public String toSql() {
+        return "CLEAN LABEL" + (Strings.isNullOrEmpty(label) ? " " : " " + 
label) + " FROM " + db;
+    }
+
+    @Override
+    public RedirectStatus getRedirectStatus() {
+        return RedirectStatus.FORWARD_WITH_SYNC;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 28fbf8999a..bb6ece1e98 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -99,7 +99,6 @@ public class InsertStmt extends DdlStmt {
     private Boolean isRepartition;
     private boolean isStreaming = false;
     private String label = null;
-    private boolean isUserSpecifiedLabel = false;
 
     private Map<Long, Integer> indexIdToSchemaHash = null;
 
@@ -142,10 +141,6 @@ public class InsertStmt extends DdlStmt {
         this.planHints = hints;
         this.targetColumnNames = cols;
 
-        if (!Strings.isNullOrEmpty(label)) {
-            isUserSpecifiedLabel = true;
-        }
-
         this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
                 && ((SelectStmt) queryStmt).getTableRefs().isEmpty());
     }
@@ -248,10 +243,6 @@ public class InsertStmt extends DdlStmt {
         return label;
     }
 
-    public boolean isUserSpecifiedLabel() {
-        return isUserSpecifiedLabel;
-    }
-
     public DataSink getDataSink() {
         return dataSink;
     }
@@ -306,7 +297,7 @@ public class InsertStmt extends DdlStmt {
         // create label and begin transaction
         long timeoutSecond = 
ConnectContext.get().getSessionVariable().getQueryTimeoutS();
         if (Strings.isNullOrEmpty(label)) {
-            label = "insert_" + 
DebugUtil.printId(analyzer.getContext().queryId());
+            label = "insert_" + 
DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
         }
         if (!isExplain() && !isTransactionBegin) {
             if (targetTable instanceof OlapTable) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 943315def0..19556ce614 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -57,6 +57,7 @@ import org.apache.doris.persist.BackendTabletsInfo;
 import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
+import org.apache.doris.persist.CleanLabelOperationLog;
 import org.apache.doris.persist.ClusterInfo;
 import org.apache.doris.persist.ColocatePersistInfo;
 import org.apache.doris.persist.ConsistencyCheckInfo;
@@ -679,6 +680,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_CLEAN_LABEL: {
+                data = CleanLabelOperationLog.read(in);
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 4e29c2775f..92ec49ddbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CompoundPredicate.Operator;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.Database;
@@ -39,7 +40,9 @@ import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.load.FailMsg.CancelType;
 import org.apache.doris.load.Load;
+import org.apache.doris.persist.CleanLabelOperationLog;
 import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
 import org.apache.doris.transaction.TransactionState;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -573,8 +576,8 @@ public class LoadManager implements Writable {
             Map<String, List<LoadJob>> labelToLoadJobs = 
dbIdToLabelToLoadJobs.get(dbId);
             if (labelToLoadJobs.containsKey(label)) {
                 List<LoadJob> labelLoadJobs = labelToLoadJobs.get(label);
-                Optional<LoadJob> loadJobOptional =
-                        labelLoadJobs.stream().filter(entity -> 
entity.getState() != JobState.CANCELLED).findFirst();
+                Optional<LoadJob> loadJobOptional = labelLoadJobs.stream()
+                        .filter(entity -> entity.getState() != 
JobState.CANCELLED).findFirst();
                 if (loadJobOptional.isPresent()) {
                     LOG.warn("Failed to add load job when label {} has been 
used.", label);
                     throw new LabelAlreadyUsedException(label);
@@ -583,6 +586,97 @@ public class LoadManager implements Writable {
         }
     }
 
+    public void cleanLabel(CleanLabelStmt stmt) throws DdlException {
+        String dbName = stmt.getDb();
+        String label = stmt.getLabel();
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
+        cleanLabelInternal(db.getId(), label, false);
+    }
+
+    public void replayCleanLabel(CleanLabelOperationLog log) {
+        cleanLabelInternal(log.getDbId(), log.getLabel(), true);
+    }
+
+    /**
+     * Clean the label with given database and label
+     * It will only remove the load jobs which are already done.
+     * 1. Remove from LoadManager
+     * 2. Remove from DatabaseTransactionMgr
+     *
+     * @param dbId
+     * @param label
+     * @param isReplay
+     */
+    private void cleanLabelInternal(long dbId, String label, boolean isReplay) 
{
+        // 1. Remove from LoadManager
+        int counter = 0;
+        writeLock();
+        try {
+            if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+                // no label in this db, just return
+                return;
+            }
+            Map<String, List<LoadJob>> labelToJob = 
dbIdToLabelToLoadJobs.get(dbId);
+            if (Strings.isNullOrEmpty(label)) {
+                // clean all labels in this db
+                Iterator<Map.Entry<String, List<LoadJob>>> iter = 
labelToJob.entrySet().iterator();
+                while (iter.hasNext()) {
+                    List<LoadJob> jobs = iter.next().getValue();
+                    Iterator<LoadJob> innerIter = jobs.iterator();
+                    while (innerIter.hasNext()) {
+                        LoadJob job = innerIter.next();
+                        if (!job.isCompleted()) {
+                            continue;
+                        }
+                        innerIter.remove();
+                        idToLoadJob.remove(job.getId());
+                        ++counter;
+                    }
+                    if (jobs.isEmpty()) {
+                        iter.remove();
+                    }
+                }
+            } else {
+                List<LoadJob> jobs = labelToJob.get(label);
+                if (jobs == null) {
+                    // no job for this label, just return
+                    return;
+                }
+                Iterator<LoadJob> iter = jobs.iterator();
+                while (iter.hasNext()) {
+                    LoadJob job = iter.next();
+                    if (!job.isCompleted()) {
+                        continue;
+                    }
+                    iter.remove();
+                    idToLoadJob.remove(job.getId());
+                    ++counter;
+                }
+                if (jobs.isEmpty()) {
+                    labelToJob.remove(label);
+                }
+            }
+        } finally {
+            writeUnlock();
+        }
+        LOG.info("clean {} labels on db {} with label '{}' in load mgr.", 
counter, dbId, label);
+
+        // 2. Remove from DatabaseTransactionMgr
+        try {
+            DatabaseTransactionMgr dbTxnMgr = 
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(dbId);
+            dbTxnMgr.cleanLabel(label);
+        } catch (AnalysisException e) {
+            // just ignore, because we don't want to throw any exception here.
+        }
+
+        // 3. Log
+        if (!isReplay) {
+            CleanLabelOperationLog log = new CleanLabelOperationLog(dbId, 
label);
+            Env.getCurrentEnv().getEditLog().logCleanLabel(log);
+        }
+        LOG.info("finished to clean label on db {} with label {}. is replay: 
{}", dbId, label, isReplay);
+    }
+
     private void readLock() {
         lock.readLock().lock();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java
new file mode 100644
index 0000000000..7410e81d6a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/CleanLabelOperationLog.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * For clean label
+ */
+public class CleanLabelOperationLog implements Writable {
+    @SerializedName(value = "dbId")
+    private long dbId;
+    @SerializedName(value = "label")
+    private String label;
+
+    public CleanLabelOperationLog(long dbId, String label) {
+        this.dbId = dbId;
+        this.label = label;
+    }
+
+    public long getDbId() {
+        return dbId;
+    }
+
+    public String getLabel() {
+        return label;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, GsonUtils.GSON.toJson(this));
+    }
+
+    public static CleanLabelOperationLog read(DataInput in) throws IOException 
{
+        return GsonUtils.GSON.fromJson(Text.readString(in), 
CleanLabelOperationLog.class);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index e33c3e3788..9da6cdd656 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -863,6 +863,11 @@ public class EditLog {
                     
env.getSchemaChangeHandler().replayModifyTableAddOrDropColumns(info);
                     break;
                 }
+                case OperationType.OP_CLEAN_LABEL: {
+                    final CleanLabelOperationLog log = 
(CleanLabelOperationLog) journal.getData();
+                    env.getLoadManager().replayCleanLabel(log);
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1497,4 +1502,8 @@ public class EditLog {
     public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) 
{
         logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS, info);
     }
+
+    public void logCleanLabel(CleanLabelOperationLog log) {
+        logEdit(OperationType.OP_CLEAN_LABEL, log);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 168d88ec96..411bf056b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -83,6 +83,7 @@ public class OperationType {
     public static final short OP_LOAD_CANCEL = 35;
     public static final short OP_EXPORT_CREATE = 36;
     public static final short OP_EXPORT_UPDATE_STATE = 37;
+    public static final short OP_CLEAN_LABEL = 38;
 
     @Deprecated
     public static final short OP_FINISH_SYNC_DELETE = 40;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index 5264852214..2c0b426d3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -47,6 +47,7 @@ import org.apache.doris.analysis.CancelAlterSystemStmt;
 import org.apache.doris.analysis.CancelAlterTableStmt;
 import org.apache.doris.analysis.CancelBackupStmt;
 import org.apache.doris.analysis.CancelLoadStmt;
+import org.apache.doris.analysis.CleanLabelStmt;
 import org.apache.doris.analysis.CreateCatalogStmt;
 import org.apache.doris.analysis.CreateClusterStmt;
 import org.apache.doris.analysis.CreateDataSyncJobStmt;
@@ -315,6 +316,8 @@ public class DdlExecutor {
             env.getDataSourceMgr().alterCatalogName((AlterCatalogNameStmt) 
ddlStmt);
         } else if (ddlStmt instanceof AlterCatalogPropertyStmt) {
             
env.getDataSourceMgr().alterCatalogProps((AlterCatalogPropertyStmt) ddlStmt);
+        } else if (ddlStmt instanceof CleanLabelStmt) {
+            env.getCurrentEnv().getLoadManager().cleanLabel((CleanLabelStmt) 
ddlStmt);
         } else {
             throw new DdlException("Unknown statement.");
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 83e437f5ca..4d5b2fd8e9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -57,6 +57,7 @@ import org.apache.doris.thrift.TUniqueId;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1723,4 +1724,52 @@ public class DatabaseTransactionMgr {
             transactionState.write(out);
         }
     }
+
+    public void cleanLabel(String label) {
+        Set<Long> removedTxnIds = Sets.newHashSet();
+        writeLock();
+        try {
+            if (Strings.isNullOrEmpty(label)) {
+                Iterator<Map.Entry<String, Set<Long>>> iter = 
labelToTxnIds.entrySet().iterator();
+                while (iter.hasNext()) {
+                    Set<Long> txnIds = iter.next().getValue();
+                    Iterator<Long> innerIter = txnIds.iterator();
+                    while (innerIter.hasNext()) {
+                        long txnId = innerIter.next();
+                        if (idToFinalStatusTransactionState.remove(txnId) != 
null) {
+                            innerIter.remove();
+                            removedTxnIds.add(txnId);
+                        }
+                    }
+                    if (txnIds.isEmpty()) {
+                        iter.remove();
+                    }
+                }
+            } else {
+                Set<Long> txnIds = labelToTxnIds.get(label);
+                if (txnIds == null) {
+                    return;
+                }
+                Iterator<Long> iter = txnIds.iterator();
+                while (iter.hasNext()) {
+                    long txnId = iter.next();
+                    if (idToFinalStatusTransactionState.remove(txnId) != null) 
{
+                        iter.remove();
+                        removedTxnIds.add(txnId);
+                    }
+                }
+                if (txnIds.isEmpty()) {
+                    labelToTxnIds.remove(label);
+                }
+            }
+            // remove from finalStatusTransactionStateDequeShort and 
finalStatusTransactionStateDequeLong
+            // So that we can keep consistency in meta image
+            finalStatusTransactionStateDequeShort.removeIf(txn -> 
removedTxnIds.contains(txn.getTransactionId()));
+            finalStatusTransactionStateDequeLong.removeIf(txn -> 
removedTxnIds.contains(txn.getTransactionId()));
+        } finally {
+            writeUnlock();
+        }
+        LOG.info("clean {} labels on db {} with label '{}' in database 
transaction mgr.", removedTxnIds.size(), dbId,
+                label);
+    }
 }
diff --git a/regression-test/data/load/clean_label/test_clean_label.out 
b/regression-test/data/load/clean_label/test_clean_label.out
new file mode 100644
index 0000000000..6d03950a8b
--- /dev/null
+++ b/regression-test/data/load/clean_label/test_clean_label.out
@@ -0,0 +1,18 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+1      2
+1      2
+1      2
+1      2
+
+-- !select --
+1      2
+1      2
+1      2
+1      2
+1      2
+1      2
+1      2
+1      2
+1      2
+
diff --git a/regression-test/suites/load/clean_label/test_clean_label.groovy 
b/regression-test/suites/load/clean_label/test_clean_label.groovy
new file mode 100644
index 0000000000..b80b7b3278
--- /dev/null
+++ b/regression-test/suites/load/clean_label/test_clean_label.groovy
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_clean_label", "load") {
+    // define a sql table
+    def testTable = "tbl_test_clean_label"
+    
+    def create_test_table = {testTablex, enable_vectorized_flag ->
+        if (enable_vectorized_flag) {
+            sql """ set enable_vectorized_engine = true """
+        }
+
+        def result1 = sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+              `k1` INT(11) NULL COMMENT "",
+              `k2` INT(11) NULL COMMENT ""
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1",
+            "storage_format" = "V2"
+            )
+            """
+        
+        // DDL/DML return 1 row and 3 column, the only value is update row 
count
+        assertTrue(result1.size() == 1)
+        assertTrue(result1[0].size() == 1)
+        assertTrue(result1[0][0] == 0, "Create table should update 0 rows")
+    }
+
+    // case1: 
+    try {
+        sql "DROP TABLE IF EXISTS ${testTable}"
+        create_test_table.call(testTable, true)
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test1 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test2 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test3 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test4 select 
1, 2;"
+        }
+
+        qt_select "select * from ${testTable} order by k1"
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test4 select 
1, 2;"
+            exception "errCode = 2, detailMessage = Label"
+        }
+
+        test {
+            sql "clean label clean_label_test4 from 
regression_test_load_clean_label;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test4 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test1 select 
1, 2;"
+            exception "errCode = 2, detailMessage = Label"
+        }
+
+        test {
+            sql "clean label from regression_test_load_clean_label;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test1 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test2 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test3 select 
1, 2;"
+        }
+
+        test {
+            sql "insert into ${testTable} with label clean_label_test4 select 
1, 2;"
+        }
+
+        qt_select "select * from ${testTable} order by k1;"
+
+        test {
+            sql "clean label from regression_test_load_clean_label;"
+        }
+
+    } finally {
+        try_sql("DROP TABLE IF EXISTS ${testTable}")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to