This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 4114178040b [branch-2.0](delete) Fix delete job timeout when executing
`delete from ...` (#40028)
4114178040b is described below
commit 4114178040b055ab0df434445b49af4ae88a7fe9
Author: bobhan1 <[email protected]>
AuthorDate: Wed Aug 28 18:07:48 2024 +0800
[branch-2.0](delete) Fix delete job timeout when executing `delete from
...` (#40028)
## Proposed changes
picks https://github.com/apache/doris/pull/37363,
https://github.com/apache/doris/pull/37834,
https://github.com/apache/doris/pull/38043
---
be/src/olap/delete_handler.cpp | 29 ++++++----
be/src/olap/push_handler.cpp | 4 ++
be/test/olap/delete_handler_test.cpp | 52 ++++++++---------
.../java/org/apache/doris/load/DeleteHandler.java | 6 ++
.../java/org/apache/doris/master/MasterImpl.java | 28 ++++++++--
.../main/java/org/apache/doris/task/PushTask.java | 12 ++++
.../test_delete_from_timeout.out | 8 +++
.../test_delete_from_timeout.groovy | 65 ++++++++++++++++++++++
8 files changed, 160 insertions(+), 44 deletions(-)
diff --git a/be/src/olap/delete_handler.cpp b/be/src/olap/delete_handler.cpp
index 684fdc5c2a5..4e88eb7c53d 100644
--- a/be/src/olap/delete_handler.cpp
+++ b/be/src/olap/delete_handler.cpp
@@ -36,6 +36,7 @@
#include "olap/olap_common.h"
#include "olap/predicate_creator.h"
#include "olap/utils.h"
+#include "util/debug_points.h"
using apache::thrift::ThriftDebugString;
using std::vector;
@@ -48,8 +49,12 @@ using namespace ErrorCode;
Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
const std::vector<TCondition>&
conditions,
DeletePredicatePB* del_pred) {
+ DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure",
{
+ return Status::Error<false>(dp->param<int>("error_code"),
+ dp->param<std::string>("error_msg"));
+ })
if (conditions.empty()) {
- return Status::Error<DELETE_INVALID_PARAMETERS>(
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid parameters for store_cond. condition_size={}",
conditions.size());
}
@@ -57,8 +62,8 @@ Status DeleteHandler::generate_delete_predicate(const
TabletSchema& schema,
for (const TCondition& condition : conditions) {
if (check_condition_valid(schema, condition) != Status::OK()) {
// Error will print log, no need to do it manually.
- return Status::Error<DELETE_INVALID_CONDITION>("invalid condition.
condition={}",
-
ThriftDebugString(condition));
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid
condition. condition={}",
+
ThriftDebugString(condition));
}
}
@@ -80,7 +85,7 @@ Status DeleteHandler::generate_delete_predicate(const
TabletSchema& schema,
if (TCondition tmp;
!DeleteHandler::parse_delete_condition(condition_str, &tmp)) {
LOG(WARNING) << "failed to parse condition_str, condtion="
<< ThriftDebugString(condition);
- return Status::Error<DELETE_INVALID_CONDITION>(
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condtion={}",
ThriftDebugString(condition));
}
VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " <<
condition_str;
@@ -172,8 +177,8 @@ Status DeleteHandler::check_condition_valid(const
TabletSchema& schema, const TC
// Check whether the column exists
int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
- return Status::Error<DELETE_INVALID_CONDITION>("field is not existent.
[field_index={}]",
- field_index);
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>("field is not
existent. [field_index={}]",
+ field_index);
}
// Delete condition should only applied on key columns or duplicate key
table, and
@@ -182,21 +187,21 @@ Status DeleteHandler::check_condition_valid(const
TabletSchema& schema, const TC
if (column.type() == FieldType::OLAP_FIELD_TYPE_DOUBLE ||
column.type() == FieldType::OLAP_FIELD_TYPE_FLOAT) {
- return Status::Error<DELETE_INVALID_CONDITION>("data type is float or
double.");
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>("data type is float
or double.");
}
// Check operator and operands size are matched.
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
cond.condition_values.size() != 1) {
- return Status::Error<DELETE_INVALID_CONDITION>("invalid condition
value size. [size={}]",
-
cond.condition_values.size());
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition
value size. [size={}]",
+
cond.condition_values.size());
}
// Check each operand is valid
for (const auto& condition_value : cond.condition_values) {
if (!is_condition_value_valid(column, cond.condition_op,
condition_value)) {
- return Status::Error<DELETE_INVALID_CONDITION>("invalid condition
value. [value={}]",
- condition_value);
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid
condition value. [value={}]",
+ condition_value);
}
}
@@ -267,7 +272,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
for (const auto& sub_predicate : delete_condition.sub_predicates()) {
TCondition condition;
if (!parse_delete_condition(sub_predicate, &condition)) {
- return Status::Error<DELETE_INVALID_PARAMETERS>(
+ return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"fail to parse condition. condition={}",
sub_predicate);
}
int32_t col_unique_id =
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 8f28ec1e60b..4ac74326a38 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -115,6 +115,10 @@ Status
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}
std::shared_lock base_migration_rlock(tablet->get_migration_lock(),
std::try_to_lock);
+ DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
+ return Status::Error<TRY_LOCK_FAILED>(
+ "PushHandler::_do_streaming_ingestion get lock failed");
+ })
if (!base_migration_rlock.owns_lock()) {
return Status::Error<TRY_LOCK_FAILED>(
"PushHandler::_do_streaming_ingestion get lock failed");
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index 8fb26565ccb..75ecfb4c208 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -408,7 +408,7 @@ TEST_F(TestDeleteConditionHandler,
StoreCondInvalidParameters) {
DeletePredicatePB del_pred;
Status failed_res =
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred);
- EXPECT_EQ(Status::Error<DELETE_INVALID_PARAMETERS>(""), failed_res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), failed_res);
}
// 检测过滤条件中指定的列不存在,或者列不符合要求
@@ -424,7 +424,7 @@ TEST_F(TestDeleteConditionHandler,
StoreCondNonexistentColumn) {
DeletePredicatePB del_pred;
Status failed_res =
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), failed_res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), failed_res);
// 'v'是value列
conditions.clear();
@@ -630,7 +630,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_1;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_1);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k1的值越下界,k1类型为int8
conditions[0].condition_values.clear();
@@ -638,7 +638,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_2;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_2);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k2的值越上界,k2类型为int16
conditions[0].condition_values.clear();
@@ -647,7 +647,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_3;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_3);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k2的值越下界,k2类型为int16
conditions[0].condition_values.clear();
@@ -655,7 +655,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_4;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_4);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k3的值越上界,k3类型为int32
conditions[0].condition_values.clear();
@@ -664,7 +664,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_5;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_5);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k3的值越下界,k3类型为int32
conditions[0].condition_values.clear();
@@ -672,7 +672,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_6;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_6);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k4的值越上界,k2类型为int64
conditions[0].condition_values.clear();
@@ -681,7 +681,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_7;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_7);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k4的值越下界,k1类型为int64
conditions[0].condition_values.clear();
@@ -689,7 +689,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_8;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_8);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k5的值越上界,k5类型为int128
conditions[0].condition_values.clear();
@@ -698,7 +698,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_9;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_9);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k5的值越下界,k5类型为int128
conditions[0].condition_values.clear();
@@ -706,7 +706,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_10;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_10);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k9整数部分长度过长,k9类型为decimal, precision=6, frac=3
conditions[0].condition_values.clear();
@@ -715,7 +715,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_11;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_11);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k9小数部分长度过长,k9类型为decimal, precision=6, frac=3
conditions[0].condition_values.clear();
@@ -723,7 +723,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_12;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_12);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k9没有小数部分,但包含小数点
conditions[0].condition_values.clear();
@@ -731,7 +731,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_13;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_13);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k10类型的过滤值不符合对应格式,k10为date
conditions[0].condition_values.clear();
@@ -740,21 +740,21 @@ TEST_F(TestDeleteConditionHandler2,
InvalidConditionValue) {
DeletePredicatePB del_pred_14;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_14);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-64-01");
DeletePredicatePB del_pred_15;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_15);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-01-40");
DeletePredicatePB del_pred_16;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_16);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k11类型的过滤值不符合对应格式,k11为datetime
conditions[0].condition_values.clear();
@@ -763,42 +763,42 @@ TEST_F(TestDeleteConditionHandler2,
InvalidConditionValue) {
DeletePredicatePB del_pred_17;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_17);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-64-01 00:00:00");
DeletePredicatePB del_pred_18;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_18);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-01-40 00:00:00");
DeletePredicatePB del_pred_19;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_19);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-01-01 24:00:00");
DeletePredicatePB del_pred_20;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_20);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-01-01 00:60:00");
DeletePredicatePB del_pred_21;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_21);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].condition_values.push_back("2013-01-01 00:00:60");
DeletePredicatePB del_pred_22;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_22);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
// 测试k12和k13类型的过滤值过长,k12,k13类型分别为string(64), varchar(64)
conditions[0].condition_values.clear();
@@ -810,7 +810,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_23;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_23);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
conditions[0].condition_values.clear();
conditions[0].column_name = "k13";
@@ -821,7 +821,7 @@ TEST_F(TestDeleteConditionHandler2, InvalidConditionValue) {
DeletePredicatePB del_pred_24;
res = DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(),
conditions,
&del_pred_24);
- EXPECT_EQ(Status::Error<DELETE_INVALID_CONDITION>(""), res);
+ EXPECT_EQ(Status::Error<INVALID_ARGUMENT>(""), res);
}
class TestDeleteHandler : public testing::Test {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index 70b3765227a..7dd8bf70556 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -406,6 +406,12 @@ public class DeleteHandler implements Writable {
break;
}
} else {
+ if (!countDownLatch.getStatus().ok()) {
+ // encounter some errors that don't need to retry, abort
directly
+ LOG.warn("delete job failed, errmsg={}",
countDownLatch.getStatus().getErrorMsg());
+ throw new DdlException(String.format("delete job failed,
errmsg:%s",
+ countDownLatch.getStatus().getErrorMsg()));
+ }
commitJob(deleteJob, db, olapTable, timeoutMs);
}
} finally {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 8efbf46b13e..3b2aeba3ec0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -137,7 +137,8 @@ public class MasterImpl {
&& taskType != TTaskType.DOWNLOAD && taskType !=
TTaskType.MOVE
&& taskType != TTaskType.CLONE && taskType !=
TTaskType.PUBLISH_VERSION
&& taskType != TTaskType.CREATE && taskType !=
TTaskType.UPDATE_TABLET_META_INFO
- && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
+ && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
+ && taskType != TTaskType.REALTIME_PUSH) {
return result;
}
}
@@ -150,7 +151,6 @@ public class MasterImpl {
finishCreateReplica(task, request);
break;
case REALTIME_PUSH:
- checkHasTabletInfo(request);
Preconditions.checkState(request.isSetReportVersion());
finishRealtimePush(task, request);
break;
@@ -289,16 +289,32 @@ public class MasterImpl {
}
}
- private void finishRealtimePush(AgentTask task, TFinishTaskRequest
request) {
- List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
- Preconditions.checkState(finishTabletInfos != null &&
!finishTabletInfos.isEmpty());
-
+ private void finishRealtimePush(AgentTask task, TFinishTaskRequest
request) throws Exception {
PushTask pushTask = (PushTask) task;
long dbId = pushTask.getDbId();
long backendId = pushTask.getBackendId();
long signature = task.getSignature();
long transactionId = ((PushTask) task).getTransactionId();
+
+ if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
+ if (pushTask.getPushType() == TPushType.DELETE) {
+ // we don't need to retry if the returned status code is
DELETE_INVALID_CONDITION
+ // or DELETE_INVALID_PARAMETERS
+ // note that they will be converted to
TStatusCode.INVALID_ARGUMENT when being sent from be to fe
+ if (request.getTaskStatus().getStatusCode() ==
TStatusCode.INVALID_ARGUMENT) {
+
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
+ task.getBackendId() + ": " +
request.getTaskStatus().getErrorMsgs().toString());
+ AgentTaskQueue.removeTask(backendId,
TTaskType.REALTIME_PUSH, signature);
+ LOG.warn("finish push replica error: {}",
request.getTaskStatus().getErrorMsgs().toString());
+ }
+ }
+ return;
+ }
+
+ checkHasTabletInfo(request);
+ List<TTabletInfo> finishTabletInfos = request.getFinishTabletInfos();
+
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH,
signature);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index dd301728787..f6c47594079 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCondition;
@@ -34,6 +35,7 @@ import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushReq;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TResourceInfo;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Maps;
@@ -207,6 +209,16 @@ public class PushTask extends AgentTask {
}
}
+ // call this always means one of tasks is failed. count down to zero to
finish entire task
+ public void countDownToZero(TStatusCode code, String errMsg) {
+ if (this.latch != null) {
+ latch.countDownToZero(new Status(code, errMsg));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PushTask count down to zero. error msg: {}",
errMsg);
+ }
+ }
+ }
+
public long getReplicaId() {
return replicaId;
}
diff --git
a/regression-test/data/fault_injection_p0/test_delete_from_timeout.out
b/regression-test/data/fault_injection_p0/test_delete_from_timeout.out
new file mode 100644
index 00000000000..1703506a5af
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_delete_from_timeout.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+false -9999782574499444.2 -25
+true 99.9 234
+
+-- !sql --
+true 99.9 234
+
diff --git
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
new file mode 100644
index 00000000000..7b310ded149
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_delete_from_timeout","nonConcurrent") {
+
+ def tableName = "test_delete_from_timeout"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName} (
+ `col1` BOOLEAN NOT NULL,
+ `col2` DECIMAL(17, 1) NOT NULL,
+ `col3` INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`col1`, `col2`, `col3`)
+ DISTRIBUTED BY HASH(`col1`, `col2`, `col3`) BUCKETS 4
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ try {
+ sql "insert into ${tableName} values(1, 99.9, 234), (false,
-9999782574499444.2, -25);"
+ qt_sql "select * from ${tableName} order by col1, col2, col3;"
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+ [error_code: 33 /* INVALID_ARGUMENT */, error_msg: "invalid
parameters for store_cond. condition_size=1"])
+ test {
+ sql """delete from ${tableName} where col1 = "false" and col2 =
"-9999782574499444.2" and col3 = "-25"; """
+ exception "invalid parameters for store_cond. condition_size=1"
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
+
+ t1 = Thread.start {
+ sleep(15000)
+
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
+ }
+
+ sql """delete from ${tableName} where col1 = "false" and col3 = "-25";
"""
+ t1.join()
+ qt_sql "select * from ${tableName} order by col1, col2, col3;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assertTrue(false)
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]