This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 4114178040b [branch-2.0](delete) Fix delete job timeout when executing 
`delete from ...` (#40028)
4114178040b is described below

commit 4114178040b055ab0df434445b49af4ae88a7fe9
Author: bobhan1 <[email protected]>
AuthorDate: Wed Aug 28 18:07:48 2024 +0800

    [branch-2.0](delete) Fix delete job timeout when executing `delete from 
...` (#40028)
    
    ## Proposed changes
    
    picks https://github.com/apache/doris/pull/37363,
    https://github.com/apache/doris/pull/37834,
    https://github.com/apache/doris/pull/38043
---
 be/src/olap/delete_handler.cpp                     | 29 ++++++----
 be/src/olap/push_handler.cpp                       |  4 ++
 be/test/olap/delete_handler_test.cpp               | 52 ++++++++---------
 .../java/org/apache/doris/load/DeleteHandler.java  |  6 ++
 .../java/org/apache/doris/master/MasterImpl.java   | 28 ++++++++--
 .../main/java/org/apache/doris/task/PushTask.java  | 12 ++++
 .../test_delete_from_timeout.out                   |  8 +++
 .../test_delete_from_timeout.groovy                | 65 ++++++++++++++++++++++
 8 files changed, 160 insertions(+), 44 deletions(-)

diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 684fdc5c2a5..4e88eb7c53d 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -36,6 +36,7 @@
 #include "olap/olap_common.h"
 #include "olap/predicate_creator.h"
 #include "olap/utils.h"
+#include "util/debug_points.h"
 
 using apache::thrift::ThriftDebugString;
 using std::vector;
@@ -48,8 +49,12 @@ using namespace ErrorCode;
 Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
                                                 const std::vector<TCondition>& 
conditions,
                                                 DeletePredicatePB* del_pred) {
+    DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", 
{
+        return Status::Error<false>(dp->param<int>("error_code"),
+                                    dp->param<std::string>("error_msg"));
+    })
     if (conditions.empty()) {
-        return Status::Error<DELETE_INVALID_PARAMETERS>(
+        return Status::Error<ErrorCode::INVALID_ARGUMENT>(
                 "invalid parameters for store_cond. condition_size={}", 
conditions.size());
     }
 
@@ -57,8 +62,8 @@ Status DeleteHandler::generate_delete_predicate(const 
TabletSchema& schema,
     for (const TCondition& condition : conditions) {
         if (check_condition_valid(schema, condition) != Status::OK()) {
             // Error will print log, no need to do it manually.
-            return Status::Error<DELETE_INVALID_CONDITION>("invalid condition. 
condition={}",
-                                                           
ThriftDebugString(condition));
+            return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid 
condition. condition={}",
+                                                              
ThriftDebugString(condition));
         }
     }
 
@@ -80,7 +85,7 @@ Status DeleteHandler::generate_delete_predicate(const 
TabletSchema& schema,
             if (TCondition tmp; 
!DeleteHandler::parse_delete_condition(condition_str, &tmp)) {
                 LOG(WARNING) << "failed to parse condition_str, condtion="
                              << ThriftDebugString(condition);
-                return Status::Error<DELETE_INVALID_CONDITION>(
+                return Status::Error<ErrorCode::INVALID_ARGUMENT>(
                         "failed to parse condition_str, condtion={}", 
ThriftDebugString(condition));
             }
             VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " << 
condition_str;
@@ -172,8 +177,8 @@ Status DeleteHandler::check_condition_valid(const 
TabletSchema& schema, const TC
     // Check whether the column exists
     int32_t field_index = schema.field_index(cond.column_name);
     if (field_index < 0) {
-        return Status::Error<DELETE_INVALID_CONDITION>("field is not existent. 
[field_index={}]",
-                                                       field_index);
+        return Status::Error<ErrorCode::INVALID_ARGUMENT>("field is not 
existent. [field_index={}]",
+                                                          field_index);
     }
 
     // Delete condition should only applied on key columns or duplicate key 
table, and
@@ -182,21 +187,21 @@ Status DeleteHandler::check_condition_valid(const 
TabletSchema& schema, const TC
 
     if (column.type() == FieldType::OLAP_FIELD_TYPE_DOUBLE ||
         column.type() == FieldType::OLAP_FIELD_TYPE_FLOAT) {
-        return Status::Error<DELETE_INVALID_CONDITION>("data type is float or 
double.");
+        return Status::Error<ErrorCode::INVALID_ARGUMENT>("data type is float 
or double.");
     }
 
     // Check operator and operands size are matched.
     if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
         cond.condition_values.size() != 1) {
-        return Status::Error<DELETE_INVALID_CONDITION>("invalid condition 
value size. [size={}]",
-                                                       
cond.condition_values.size());
+        return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition 
value size. [size={}]",
+                                                          
cond.condition_values.size());
     }
 
     // Check each operand is valid
     for (const auto& condition_value : cond.condition_values) {
         if (!is_condition_value_valid(column, cond.condition_op, 
condition_value)) {
-            return Status::Error<DELETE_INVALID_CONDITION>("invalid condition 
value. [value={}]",
-                                                           condition_value);
+            return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid 
condition value. [value={}]",
+                                                              condition_value);
         }
     }
 
@@ -267,7 +272,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
         for (const auto& sub_predicate : delete_condition.sub_predicates()) {
             TCondition condition;
             if (!parse_delete_condition(sub_predicate, &condition)) {
-                return Status::Error<DELETE_INVALID_PARAMETERS>(
+                return Status::Error<ErrorCode::INVALID_ARGUMENT>(
                         "fail to parse condition. condition={}", 
sub_predicate);
             }
             int32_t col_unique_id =
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 8f28ec1e60b..4ac74326a38 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -115,6 +115,10 @@ Status 
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
     }
 
     std::shared_lock base_migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
+    DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
+        return Status::Error<TRY_LOCK_FAILED>(
+                "PushHandler::_do_streaming_ingestion get lock failed");
+    })
     if (!base_migration_rlock.owns_lock()) {
         return Status::Error<TRY_LOCK_FAILED>(
                 "PushHandler::_do_streaming_ingestion get lock failed");
diff --git a/be/test/olap/delete_handler_test.cpp 
b/be/test/olap/delete_handler_test.cpp
index 8fb26565ccb..75ecfb4c208 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -408,7 +408,7 @@ TEST_F(TestDeleteConditionHandler, 
StoreCondInvalidParameters) {
     DeletePredicatePB del_pred;
     Status failed_res = 
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
                                                                  conditions, 
&del_pred);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_PARAMETERS>(""), failed_res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), failed_res);
 }
 
 // 检测过滤条件中指定的列不存在,或者列不符合要求
@@ -424,7 +424,7 @@ TEST_F(TestDeleteConditionHandler, 
StoreCondNonexistentColumn) {
     DeletePredicatePB del_pred;
     Status failed_res = 
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
                                                                  conditions, 
&del_pred);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), failed_res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), failed_res);
 
     // 'v'是value列
     conditions.clear();
@@ -630,7 +630,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_1;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_1);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k1的值越下界,k1类型为int8
     conditions[0].condition_values.clear();
@@ -638,7 +638,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_2;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_2);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k2的值越上界,k2类型为int16
     conditions[0].condition_values.clear();
@@ -647,7 +647,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_3;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_3);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k2的值越下界,k2类型为int16
     conditions[0].condition_values.clear();
@@ -655,7 +655,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_4;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_4);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k3的值越上界,k3类型为int32
     conditions[0].condition_values.clear();
@@ -664,7 +664,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_5;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_5);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k3的值越下界,k3类型为int32
     conditions[0].condition_values.clear();
@@ -672,7 +672,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_6;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_6);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k4的值越上界,k2类型为int64
     conditions[0].condition_values.clear();
@@ -681,7 +681,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_7;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_7);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k4的值越下界,k1类型为int64
     conditions[0].condition_values.clear();
@@ -689,7 +689,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_8;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_8);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k5的值越上界,k5类型为int128
     conditions[0].condition_values.clear();
@@ -698,7 +698,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_9;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_9);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k5的值越下界,k5类型为int128
     conditions[0].condition_values.clear();
@@ -706,7 +706,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_10;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_10);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k9整数部分长度过长,k9类型为decimal, precision=6, frac=3
     conditions[0].condition_values.clear();
@@ -715,7 +715,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_11;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_11);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k9小数部分长度过长,k9类型为decimal, precision=6, frac=3
     conditions[0].condition_values.clear();
@@ -723,7 +723,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_12;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_12);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k9没有小数部分,但包含小数点
     conditions[0].condition_values.clear();
@@ -731,7 +731,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_13;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_13);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k10类型的过滤值不符合对应格式,k10为date
     conditions[0].condition_values.clear();
@@ -740,21 +740,21 @@ TEST_F(TestDeleteConditionHandler2, 
InvalidConditionValue) {
     DeletePredicatePB del_pred_14;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_14);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-64-01");
     DeletePredicatePB del_pred_15;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_15);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-01-40");
     DeletePredicatePB del_pred_16;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_16);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k11类型的过滤值不符合对应格式,k11为datetime
     conditions[0].condition_values.clear();
@@ -763,42 +763,42 @@ TEST_F(TestDeleteConditionHandler2, 
InvalidConditionValue) {
     DeletePredicatePB del_pred_17;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_17);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-64-01 00:00:00");
     DeletePredicatePB del_pred_18;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_18);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-01-40 00:00:00");
     DeletePredicatePB del_pred_19;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_19);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-01-01 24:00:00");
     DeletePredicatePB del_pred_20;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_20);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-01-01 00:60:00");
     DeletePredicatePB del_pred_21;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_21);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].condition_values.push_back("2013-01-01 00:00:60");
     DeletePredicatePB del_pred_22;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_22);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     // 测试k12和k13类型的过滤值过长,k12,k13类型分别为string(64), varchar(64)
     conditions[0].condition_values.clear();
@@ -810,7 +810,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_23;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_23);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 
     conditions[0].condition_values.clear();
     conditions[0].column_name = "k13";
@@ -821,7 +821,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
     DeletePredicatePB del_pred_24;
     res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), 
conditions,
                                                    &del_pred_24);
-    EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+    EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
 }
 
 class TestDeleteHandler : public testing::Test {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index 70b3765227a..7dd8bf70556 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -406,6 +406,12 @@ public class DeleteHandler implements Writable {
                         break;
                 }
             } else {
+                if (!countDownLatch.getStatus().ok()) {
+                    // encounter some errors that don't need to retry, abort 
directly
+                    LOG.warn("delete job failed, errmsg={}", 
countDownLatch.getStatus().getErrorMsg());
+                    throw new DdlException(String.format("delete job failed, 
errmsg:%s",
+                            countDownLatch.getStatus().getErrorMsg()));
+                }
                 commitJob(deleteJob, db, olapTable, timeoutMs);
             }
         } finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 8efbf46b13e..3b2aeba3ec0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -137,7 +137,8 @@ public class MasterImpl {
                         && taskType != TTaskType.DOWNLOAD && taskType != 
TTaskType.MOVE
                         && taskType != TTaskType.CLONE && taskType != 
TTaskType.PUBLISH_VERSION
                         && taskType != TTaskType.CREATE && taskType != 
TTaskType.UPDATE_TABLET_META_INFO
-                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
+                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
+                        && taskType != TTaskType.REALTIME_PUSH) {
                     return result;
                 }
             }
@@ -150,7 +151,6 @@ public class MasterImpl {
                     finishCreateReplica(task, request);
                     break;
                 case REALTIME_PUSH:
-                    checkHasTabletInfo(request);
                     Preconditions.checkState(request.isSetReportVersion());
                     finishRealtimePush(task, request);
                     break;
@@ -289,16 +289,32 @@ public class MasterImpl {
         }
     }
 
-    private void finishRealtimePush(AgentTask task, TFinishTaskRequest 
request) {
-        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
-        Preconditions.checkState(finishTabletInfos != null && 
!finishTabletInfos.isEmpty());
-
+    private void finishRealtimePush(AgentTask task, TFinishTaskRequest 
request) throws Exception {
         PushTask pushTask = (PushTask) task;
 
         long dbId = pushTask.getDbId();
         long backendId = pushTask.getBackendId();
         long signature = task.getSignature();
         long transactionId = ((PushTask) task).getTransactionId();
+
+        if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
+            if (pushTask.getPushType() == TPushType.DELETE) {
+                // we don't need to retry if the returned status code is 
DELETE_INVALID_CONDITION
+                // or DELETE_INVALID_PARAMETERS
+                // note that they will be converted to 
TStatusCode.INVALID_ARGUMENT when being sent from be to fe
+                if (request.getTaskStatus().getStatusCode() == 
TStatusCode.INVALID_ARGUMENT) {
+                    
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
+                            task.getBackendId() + ": " + 
request.getTaskStatus().getErrorMsgs().toString());
+                    AgentTaskQueue.removeTask(backendId, 
TTaskType.REALTIME_PUSH, signature);
+                    LOG.warn("finish push replica error: {}", 
request.getTaskStatus().getErrorMsgs().toString());
+                }
+            }
+            return;
+        }
+
+        checkHasTabletInfo(request);
+        List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
+
         Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
         if (db == null) {
             AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, 
signature);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index dd301728787..f6c47594079 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.Predicate;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
 import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TCondition;
@@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushReq;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TTaskType;
 
 import com.google.common.collect.Maps;
@@ -207,6 +209,16 @@ public class PushTask extends AgentTask {
         }
     }
 
+    // call this always means one of tasks is failed. count down to zero to 
finish entire task
+    public void countDownToZero(TStatusCode code, String errMsg) {
+        if (this.latch != null) {
+            latch.countDownToZero(new Status(code, errMsg));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("PushTask count down to zero. error msg: {}", 
errMsg);
+            }
+        }
+    }
+
     public long getReplicaId() {
         return replicaId;
     }
diff --git 
a/regression-test/data/fault_injection_p0/test_delete_from_timeout.out 
b/regression-test/data/fault_injection_p0/test_delete_from_timeout.out
new file mode 100644
index 00000000000..1703506a5af
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_delete_from_timeout.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+false  -9999782574499444.2     -25
+true   99.9    234
+
+-- !sql --
+true   99.9    234
+
diff --git 
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy 
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
new file mode 100644
index 00000000000..7b310ded149
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_delete_from_timeout","nonConcurrent") {
+
+    def tableName = "test_delete_from_timeout"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+        `col1` BOOLEAN NOT NULL,
+        `col2` DECIMAL(17, 1) NOT NULL,
+        `col3` INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`col1`, `col2`, `col3`)
+        DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4
+        PROPERTIES (
+        "replication_allocation" = "tag.location.default: 1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        sql "insert into ${tableName} values(1, 99.9, 234), (false, 
-9999782574499444.2, -25);"
+        qt_sql "select * from ${tableName} order by col1, col2, col3;"
+        
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+            [error_code: 33 /* INVALID_ARGUMENT */, error_msg: "invalid 
parameters for store_cond. condition_size=1"])
+        test {
+            sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+            exception "invalid parameters for store_cond. condition_size=1"
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
+
+        t1 = Thread.start {
+            sleep(15000)
+            
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
+        }
+
+        sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; 
"""
+        t1.join()
+        qt_sql "select * from ${tableName} order by col1, col2, col3;"
+
+    } catch (Exception e) {
+        logger.info(e.getMessage())
+        assertTrue(false) 
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to