This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 53fdd9ed51b [fix](cloud-mow) Add retry when calculating delete bitmap 
timeout when loading data (#40562)
53fdd9ed51b is described below

commit 53fdd9ed51bdc370f58c3fc0ce778c9670f37c70
Author: huanghaibin <[email protected]>
AuthorDate: Thu Sep 19 21:07:15 2024 +0800

    [fix](cloud-mow) Add retry when calculating delete bitmap timeout when 
loading data (#40562)
    
    Add retry when calculating delete bitmap timeout on broker load , like
    stream load doing.
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |   1 +
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 .../transaction/CloudGlobalTransactionMgr.java     |  23 +-
 .../apache/doris/common/util/MetaLockUtils.java    |  10 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  88 +++++---
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |  61 +++--
 .../commands/insert/AbstractInsertExecutor.java    |  21 +-
 .../test_cloud_mow_broker_load_with_retry.out      |   7 +
 .../cloud/test_cloud_mow_insert_with_retry.out     |  15 ++
 .../test_cloud_mow_broker_load_with_retry.groovy   | 251 +++++++++++++++++++++
 .../cloud/test_cloud_mow_insert_timeout.groovy     |   2 +-
 .../cloud/test_cloud_mow_insert_with_retry.groovy  |  86 +++++++
 12 files changed, 492 insertions(+), 75 deletions(-)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index d22af1c1f39..5c369af38a6 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -95,6 +95,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
     }
     // wait for all finished
     token->wait();
+    DBUG_EXECUTE_IF("CloudEngineCalcDeleteBitmapTask.execute.enable_wait", { 
sleep(3); });
 
     LOG(INFO) << "finish to calculate delete bitmap on transaction."
               << "transaction_id=" << transaction_id << ", cost(us): " << 
watch.get_elapse_time_us()
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index be312582a6c..9e9ce4637d3 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2984,7 +2984,7 @@ public class Config extends ConfigBase {
     public static String security_checker_class_name = "";
 
     @ConfField(mutable = true)
-    public static int mow_insert_into_commit_retry_times = 10;
+    public static int mow_calculate_delete_bitmap_retry_times = 10;
 
     @ConfField(mutable = true, description = {"指定S3 Load endpoint白名单, 举例: 
s3_load_endpoint_white_list=a,b,c",
             "the white list for the s3 load endpoint, if it is empty, no white 
list will be set,"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 9890eaa0b3a..eee2faff6f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -930,7 +930,28 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
                                                List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis)
             throws UserException {
-        return commitAndPublishTransaction(db, tableList, transactionId, 
tabletCommitInfos, timeoutMillis, null);
+        int retryTimes = 0;
+        boolean res = false;
+        while (true) {
+            try {
+                res = commitAndPublishTransaction(db, tableList, 
transactionId, tabletCommitInfos, timeoutMillis, null);
+                break;
+            } catch (UserException e) {
+                LOG.warn("failed to commit txn, 
txnId={},retryTimes={},exception={}",
+                        transactionId, retryTimes, e);
+                // only mow table will catch DELETE_BITMAP_LOCK_ERR and need 
to retry
+                if (e.getErrorCode() == 
InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+                    retryTimes++;
+                    if (retryTimes >= 
Config.mow_calculate_delete_bitmap_retry_times) {
+                        // should throw exception after running out of retry 
times
+                        throw e;
+                    }
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return res;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index 16afbcecdae..ffd411d0cf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -127,8 +127,14 @@ public class MetaLockUtils {
     }
 
     public static void commitLockTables(List<Table> tableList) {
-        for (Table table : tableList) {
-            table.commitLock();
+        for (int i = 0; i < tableList.size(); i++) {
+            try {
+                tableList.get(i).commitLock();
+            } catch (Exception e) {
+                for (int j = i - 1; j >= 0; j--) {
+                    tableList.get(i).commitUnlock();
+                }
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fb5f06fced5..5e1b085b239 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.QuotaExceedException;
@@ -335,42 +336,59 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
         Database db = null;
         List<Table> tableList = null;
-        try {
-            db = getDb();
-            tableList = 
db.getTablesOnIdOrderOrThrowException(Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
-            if (Config.isCloudMode()) {
-                MetaLockUtils.commitLockTables(tableList);
-            } else {
-                MetaLockUtils.writeLockTablesOrMetaException(tableList);
+        int retryTimes = 0;
+        while (true) {
+            try {
+                db = getDb();
+                tableList = db.getTablesOnIdOrderOrThrowException(
+                        Lists.newArrayList(fileGroupAggInfo.getAllTableIds()));
+                if (Config.isCloudMode()) {
+                    MetaLockUtils.commitLockTables(tableList);
+                } else {
+                    MetaLockUtils.writeLockTablesOrMetaException(tableList);
+                }
+            } catch (MetaNotFoundException e) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                        .add("database_id", dbId)
+                        .add("error_msg", "db has been deleted when job is 
loading")
+                        .build(), e);
+                cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
+                return;
             }
-        } catch (MetaNotFoundException e) {
-            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                    .add("database_id", dbId)
-                    .add("error_msg", "db has been deleted when job is 
loading")
-                    .build(), e);
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
-            return;
-        }
-        try {
-            LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
-                    .add("txn_id", transactionId)
-                    .add("msg", "Load job try to commit txn")
-                    .build());
-            Env.getCurrentGlobalTransactionMgr().commitTransaction(
-                    dbId, tableList, transactionId, commitInfos, 
getLoadJobFinalOperation());
-            afterLoadingTaskCommitTransaction(tableList);
-            afterCommit();
-        } catch (UserException e) {
-            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
-                    .add("database_id", dbId)
-                    .add("error_msg", "Failed to commit txn with error:" + 
e.getMessage())
-                    .build(), e);
-            cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
-        } finally {
-            if (Config.isCloudMode()) {
-                MetaLockUtils.commitUnlockTables(tableList);
-            } else {
-                MetaLockUtils.writeUnlockTables(tableList);
+            try {
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+                        .add("txn_id", transactionId)
+                        .add("msg", "Load job try to commit txn")
+                        .build());
+                Env.getCurrentGlobalTransactionMgr().commitTransaction(
+                        dbId, tableList, transactionId, commitInfos, 
getLoadJobFinalOperation());
+                afterLoadingTaskCommitTransaction(tableList);
+                afterCommit();
+                return;
+            } catch (UserException e) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                        .add("database_id", dbId)
+                        .add("retry_times", retryTimes)
+                        .add("error_msg", "Failed to commit txn with error:" + 
e.getMessage())
+                        .build(), e);
+                if (e.getErrorCode() == 
InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+                    retryTimes++;
+                    if (retryTimes >= 
Config.mow_calculate_delete_bitmap_retry_times) {
+                        LOG.warn("cancelJob {} because up to max retry 
time,exception {}", id, e);
+                        cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true,
+                                true);
+                        return;
+                    }
+                } else {
+                    cancelJobWithoutCheck(new 
FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), true, true);
+                    return;
+                }
+            } finally {
+                if (Config.isCloudMode()) {
+                    MetaLockUtils.commitUnlockTables(tableList);
+                } else {
+                    MetaLockUtils.writeUnlockTables(tableList);
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 1f1c71d7a90..f01f205e96d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -45,9 +45,11 @@ import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DataQualityException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -656,21 +658,50 @@ public class SparkLoadJob extends BulkLoadJob {
     }
 
     private void tryCommitJob() throws UserException {
-        LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", 
transactionId)
-                .add("msg", "Load job try to commit txn").build());
-        Database db = getDb();
-        List<Table> tableList = db.getTablesOnIdOrderOrThrowException(
-                Lists.newArrayList(tableToLoadPartitions.keySet()));
-        MetaLockUtils.writeLockTablesOrMetaException(tableList);
-        try {
-            Env.getCurrentGlobalTransactionMgr().commitTransaction(
-                    dbId, tableList, transactionId, commitInfos,
-                    new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
-                                              finishTimestamp, state, 
failMsg));
-        } catch (TabletQuorumFailedException e) {
-            // retry in next loop
-        } finally {
-            MetaLockUtils.writeUnlockTables(tableList);
+        int retryTimes = 0;
+        while (true) {
+            Database db = getDb();
+            List<Table> tableList = db.getTablesOnIdOrderOrThrowException(
+                    Lists.newArrayList(tableToLoadPartitions.keySet()));
+            if (Config.isCloudMode()) {
+                MetaLockUtils.commitLockTables(tableList);
+            } else {
+                MetaLockUtils.writeLockTablesOrMetaException(tableList);
+            }
+            try {
+                LOG.info(new LogBuilder(LogKey.LOAD_JOB, id).add("txn_id", 
transactionId)
+                        .add("msg", "Load job try to commit txn").build());
+                Env.getCurrentGlobalTransactionMgr().commitTransaction(
+                        dbId, tableList, transactionId, commitInfos,
+                        new LoadJobFinalOperation(id, loadingStatus, progress, 
loadStartTimestamp,
+                                finishTimestamp, state, failMsg));
+                return;
+            } catch (TabletQuorumFailedException e) {
+                // retry in next loop
+                return;
+            } catch (UserException e) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                        .add("txn_id", transactionId)
+                        .add("database_id", dbId)
+                        .add("retry_times", retryTimes)
+                        .add("error_msg", "Failed to commit txn with error:" + 
e.getMessage())
+                        .build(), e);
+                if (e.getErrorCode() == 
InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
+                    retryTimes++;
+                    if (retryTimes >= 
Config.mow_calculate_delete_bitmap_retry_times) {
+                        LOG.warn("cancelJob {} because up to max retry time, 
exception {}", id, e);
+                        throw e;
+                    }
+                } else {
+                    throw e;
+                }
+            } finally {
+                if (Config.isCloudMode()) {
+                    MetaLockUtils.commitUnlockTables(tableList);
+                } else {
+                    MetaLockUtils.writeUnlockTables(tableList);
+                }
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index cdf74f5e9ac..cafffab295e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -193,25 +192,7 @@ public abstract class AbstractInsertExecutor {
             executor.updateProfile(false);
             execImpl(executor, jobId);
             checkStrictModeAndFilterRatio();
-            int retryTimes = 0;
-            while (true) {
-                try {
-                    onComplete();
-                    break;
-                } catch (UserException e) {
-                    LOG.warn("failed to commit txn, txnId={}, jobId={}, 
retryTimes={}",
-                            getTxnId(), jobId, retryTimes, e);
-                    if (e.getErrorCode() == 
InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
-                        retryTimes++;
-                        if (retryTimes >= 
Config.mow_insert_into_commit_retry_times) {
-                            // should throw exception after running out of 
retry times
-                            throw e;
-                        }
-                    } else {
-                        throw e;
-                    }
-                }
-            }
+            onComplete();
         } catch (Throwable t) {
             onFail(t);
             // retry insert into from select when meet E-230 in cloud
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
new file mode 100644
index 00000000000..9369fd5ae32
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+19
+
+-- !select --
+19
+
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
new file mode 100644
index 00000000000..979483692d3
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1
+2      2       2
+
+-- !sql --
+1      1       1
+
+-- !sql --
+1      1       1
+2      2       2
+
+-- !sql --
+1      1       1
+
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
new file mode 100644
index 00000000000..035a6307d46
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_broker_load_with_retry.groovy
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_broker_load_with_retry", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def customFeConfig = [
+            calculate_delete_bitmap_task_timeout_seconds: 2
+    ]
+
+    def table = "tbl_basic"
+    setFeConfigTemporary(customFeConfig) {
+
+        def attributesList = [
+
+        ]
+
+        /* ========================================================== normal 
========================================================== */
+        attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+                "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED 
BY \"|\"", "FORMAT AS \"CSV\"", 
"(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18)",
+                "", "", "", "", ""))
+
+        attributesList.add(new 
LoadAttributes("s3://${s3BucketName}/regression/load/data/basic_data.csv",
+                "${table}", "LINES TERMINATED BY \"\n\"", "COLUMNS TERMINATED 
BY \"|\"", "FORMAT AS \"CSV\"", 
"(K00,K01,K02,K03,K04,K05,K06,K07,K08,K09,K10,K11,K12,K13,K14,K15,K16,K17,K18)",
+                "", "", "", "", ""))
+        def ak = getS3AK()
+        def sk = getS3SK()
+        try {
+            sql """ DROP TABLE IF EXISTS ${table} """
+            sql """
+                CREATE TABLE ${table}
+                (
+                    k00 INT             NOT NULL,
+                    k01 DATE            NOT NULL,
+                    k02 BOOLEAN         NULL,
+                    k03 TINYINT         NULL,
+                    k04 SMALLINT        NULL,
+                    k05 INT             NULL,
+                    k06 BIGINT          NULL,
+                    k07 LARGEINT        NULL,
+                    k08 FLOAT           NULL,
+                    k09 DOUBLE          NULL,
+                    k10 DECIMAL(9,1)           NULL,
+                    k11 DECIMALV3(9,1)         NULL,
+                    k12 DATETIME        NULL,
+                    k13 DATEV2          NULL,
+                    k14 DATETIMEV2      NULL,
+                    k15 CHAR            NULL,
+                    k16 VARCHAR         NULL,
+                    k17 STRING          NULL,
+                    k18 JSON            NULL,
+                    kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
+                    kd02 TINYINT         NOT NULL DEFAULT "1",
+                    kd03 SMALLINT        NOT NULL DEFAULT "2",
+                    kd04 INT             NOT NULL DEFAULT "3",
+                    kd05 BIGINT          NOT NULL DEFAULT "4",
+                    kd06 LARGEINT        NOT NULL DEFAULT "5",
+                    kd07 FLOAT           NOT NULL DEFAULT "6.0",
+                    kd08 DOUBLE          NOT NULL DEFAULT "7.0",
+                    kd09 DECIMAL         NOT NULL DEFAULT "888888888",
+                    kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
+                    kd11 DATE            NOT NULL DEFAULT "2023-08-24",
+                    kd12 DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP,
+                    kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
+                    kd14 DATETIMEV2      NOT NULL DEFAULT CURRENT_TIMESTAMP,
+                    kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                    kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                    kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+                    kd18 JSON            NULL,
+        
+                    INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+                    INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+                    INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+                    INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+                    INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
+                    INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                    INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+                    INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
+        
+                    INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+                    INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+        
+                )
+                    UNIQUE KEY(k00)
+                DISTRIBUTED BY HASH(k00) BUCKETS 1
+                PROPERTIES (
+                    "enable_unique_key_merge_on_write" = "true",
+                    "bloom_filter_columns"="k05",
+                    "replication_num" = "1"
+                );
+            """
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+            def i = 0
+            for (LoadAttributes attributes : attributesList) {
+                def label = "test_s3_load_" + 
UUID.randomUUID().toString().replace("-", "_") + "_" + i
+                attributes.label = label
+                def prop = attributes.getPropertiesStr()
+
+                def sql_str = """
+                    LOAD LABEL $label (
+                        $attributes.dataDesc.mergeType
+                        DATA INFILE("$attributes.dataDesc.path")
+                        INTO TABLE $attributes.dataDesc.tableName
+                        $attributes.dataDesc.columnTermClause
+                        $attributes.dataDesc.lineTermClause
+                        $attributes.dataDesc.formatClause
+                        $attributes.dataDesc.columns
+                        $attributes.dataDesc.columnsFromPathClause
+                        $attributes.dataDesc.columnMappingClause
+                        $attributes.dataDesc.precedingFilterClause
+                        $attributes.dataDesc.orderByClause
+                        $attributes.dataDesc.whereExpr
+                    )
+                    WITH S3 (
+                        "AWS_ACCESS_KEY" = "$ak",
+                        "AWS_SECRET_KEY" = "$sk",
+                        "AWS_ENDPOINT" = "${s3Endpoint}",
+                        "AWS_REGION" = "${s3Region}",
+                        "use_path_style" = "$attributes.usePathStyle",
+                        "provider" = "${getS3Provider()}"
+                    )
+                    ${prop}
+                """
+                logger.info("submit sql: ${sql_str}");
+                sql """${sql_str}"""
+                logger.info("Submit load with lable: $label, table: 
$attributes.dataDesc.tableName, path: $attributes.dataDesc.path")
+
+                def max_try_milli_secs = 600000
+                while (max_try_milli_secs > 0) {
+                    String[][] result = sql """ show load where 
label="$attributes.label" order by createtime desc limit 1; """
+                    if (result[0][2].equals("FINISHED")) {
+                        if (attributes.isExceptFailed) {
+                            assertTrue(false, "load should be failed but was 
success: $result")
+                        }
+                        logger.info("Load FINISHED " + attributes.label + ": 
$result")
+                        break
+                    }
+                    if (result[0][2].equals("CANCELLED")) {
+                        if (attributes.isExceptFailed) {
+                            logger.info("Load FINISHED " + attributes.label)
+                            break
+                        }
+                        assertTrue(false, "load failed: $result")
+                        break
+                    }
+                    Thread.sleep(1000)
+                    max_try_milli_secs -= 1000
+                    if (max_try_milli_secs <= 0) {
+                        assertTrue(false, "load Timeout: $attributes.label")
+                    }
+                }
+                qt_select """ select count(*) from 
$attributes.dataDesc.tableName """
+                ++i
+            }
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+            sql "DROP TABLE IF EXISTS ${table};"
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+    }
+
+}
+
+class DataDesc {
+    public String mergeType = ""
+    public String path
+    public String tableName
+    public String lineTermClause
+    public String columnTermClause
+    public String formatClause
+    public String columns
+    public String columnsFromPathClause
+    public String precedingFilterClause
+    public String columnMappingClause
+    public String whereExpr
+    public String orderByClause
+}
+
+class LoadAttributes {
+    LoadAttributes(String path, String tableName, String lineTermClause, 
String columnTermClause, String formatClause,
+                   String columns, String columnsFromPathClause, String 
precedingFilterClause, String columnMappingClause, String whereExpr, String 
orderByClause, boolean isExceptFailed = false) {
+        this.dataDesc = new DataDesc()
+        this.dataDesc.path = path
+        this.dataDesc.tableName = tableName
+        this.dataDesc.lineTermClause = lineTermClause
+        this.dataDesc.columnTermClause = columnTermClause
+        this.dataDesc.formatClause = formatClause
+        this.dataDesc.columns = columns
+        this.dataDesc.columnsFromPathClause = columnsFromPathClause
+        this.dataDesc.precedingFilterClause = precedingFilterClause
+        this.dataDesc.columnMappingClause = columnMappingClause
+        this.dataDesc.whereExpr = whereExpr
+        this.dataDesc.orderByClause = orderByClause
+
+        this.isExceptFailed = isExceptFailed
+
+        properties = new HashMap<>()
+    }
+
+    LoadAttributes addProperties(String k, String v) {
+        properties.put(k, v)
+        return this
+    }
+
+    String getPropertiesStr() {
+        if (properties.isEmpty()) {
+            return ""
+        }
+        String prop = "PROPERTIES ("
+        properties.forEach (k, v) -> {
+            prop += "\"${k}\" = \"${v}\","
+        }
+        prop = prop.substring(0, prop.size() - 1)
+        prop += ")"
+        return prop
+    }
+
+    LoadAttributes withPathStyle() {
+        usePathStyle = "true"
+        return this
+    }
+
+    public DataDesc dataDesc
+    public Map<String, String> properties
+    public String label
+    public String usePathStyle = "false"
+    public boolean isExceptFailed
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
index 23d92f31e5a..7baf18c7722 100644
--- 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_timeout.groovy
@@ -50,7 +50,7 @@ suite("test_cloud_mow_insert_timeout", "nonConcurrent") {
     def customFeConfig = [
         delete_bitmap_lock_expiration_seconds : 5,
         calculate_delete_bitmap_task_timeout_seconds : 2,
-        mow_insert_into_commit_retry_times : 2
+        mow_calculate_delete_bitmap_retry_times : 2
     ]
 
     setFeConfigTemporary(customFeConfig) {
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
new file mode 100644
index 00000000000..f7038b80e42
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_insert_with_retry.groovy
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_insert_with_retry", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def customFeConfig = [
+            calculate_delete_bitmap_task_timeout_seconds: 2
+    ]
+    def dbName = "regression_test_fault_injection_p0_cloud"
+    def table1 = dbName + ".test_cloud_mow_insert_with_retry"
+    setFeConfigTemporary(customFeConfig) {
+        for (item in ["legacy", "nereids"]) {
+            try {
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                        `k1` int NOT NULL,
+                        `c1` int,
+                        `c2` int
+                        )UNIQUE KEY(k1)
+                    DISTRIBUTED BY HASH(k1) BUCKETS 1
+                    PROPERTIES (
+                        "enable_unique_key_merge_on_write" = "true",
+                        "disable_auto_compaction" = "true",
+                        "replication_num" = "1"); """
+                connect(user = context.config.jdbcUser, password = 
context.config.jdbcPassword, url = context.config.jdbcUrl + 
"&useLocalSessionState=true") {
+                    if (item == "nereids") {
+                        sql """ set enable_nereids_planner=true; """
+                        sql """ set enable_fallback_to_original_planner=false; 
"""
+                    } else {
+                        sql """ set enable_nereids_planner = false; """
+                    }
+                    def timeout = 2000
+                    def now = System.currentTimeMillis()
+                    sql "insert into ${table1} values(1,1,1);"
+                    def time_diff = System.currentTimeMillis() - now
+                    logger.info("time_diff:" + time_diff)
+                    assertTrue(time_diff > timeout, "insert or delete should 
take over " + timeout + " ms")
+
+                    now = System.currentTimeMillis()
+                    sql "insert into ${table1} values(2,2,2);"
+                    time_diff = System.currentTimeMillis() - now
+                    logger.info("time_diff:" + time_diff)
+                    assertTrue(time_diff > timeout, "insert or delete should 
take over " + timeout + " ms")
+                    order_qt_sql "select * from ${table1};"
+
+                    now = System.currentTimeMillis()
+                    sql "delete from ${table1} where k1=2;"
+                    time_diff = System.currentTimeMillis() - now
+                    logger.info("time_diff:" + time_diff)
+                    assertTrue(time_diff > timeout, "insert or delete should 
take over " + timeout + " ms")
+                    order_qt_sql "select * from ${table1};"
+                }
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                
GetDebugPoint().disableDebugPointForAllFEs("CloudEngineCalcDeleteBitmapTask.execute.enable_wait")
+                sql "DROP TABLE IF EXISTS ${table1};"
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to