This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new bfa3b731b3a branch-3.1: [fix](delete) Delete should count down latch
and clear an agent task when failed #57428 (#58230)
bfa3b731b3a is described below
commit bfa3b731b3a041eb44c73bd61d1ce7a0e6a71753
Author: Siyang Tang <[email protected]>
AuthorDate: Mon Dec 15 11:13:11 2025 +0800
branch-3.1: [fix](delete) Delete should count down latch and clear an agent
task when failed #57428 (#58230)
picked from #57428
---
be/src/olap/push_handler.cpp | 6 ++---
.../java/org/apache/doris/master/MasterImpl.java | 30 ++++++++++++++--------
.../main/java/org/apache/doris/task/PushTask.java | 12 +++++++++
.../test_delete_from_timeout.groovy | 10 ++++++++
4 files changed, 44 insertions(+), 14 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index e97639b7d95..07c6b5371e7 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -122,12 +122,10 @@ Status
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
std::shared_lock base_migration_rlock(tablet->get_migration_lock(),
std::try_to_lock);
DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
- return Status::Error<TRY_LOCK_FAILED>(
- "PushHandler::_do_streaming_ingestion get lock failed");
+ return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion
get lock failed");
})
if (!base_migration_rlock.owns_lock()) {
- return Status::Error<TRY_LOCK_FAILED>(
- "PushHandler::_do_streaming_ingestion get lock failed");
+ return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion
get lock failed");
}
PUniqueId load_id;
load_id.set_hi(0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 964bb493d4b..d7fecf826aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -33,6 +33,7 @@ import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.master.CloudReportHandler;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Status;
import org.apache.doris.load.DeleteJob;
import org.apache.doris.load.loadv2.IngestionLoadJob;
import org.apache.doris.system.Backend;
@@ -319,18 +320,31 @@ public class MasterImpl {
long backendId = pushTask.getBackendId();
long signature = task.getSignature();
long transactionId = ((PushTask) task).getTransactionId();
+ long tableId = pushTask.getTableId();
+ long partitionId = pushTask.getPartitionId();
+ long pushIndexId = pushTask.getIndexId();
+ long pushTabletId = pushTask.getTabletId();
if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
if (pushTask.getPushType() == TPushType.DELETE) {
// we don't need to retry if the returned status code is
DELETE_INVALID_CONDITION
// or DELETE_INVALID_PARAMETERS
// note that they will be converted to
TStatusCode.INVALID_ARGUMENT when being sent from be to fe
- if (request.getTaskStatus().getStatusCode() ==
TStatusCode.INVALID_ARGUMENT) {
-
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
- task.getBackendId() + ": " +
request.getTaskStatus().getErrorMsgs().toString());
- AgentTaskQueue.removeTask(backendId,
TTaskType.REALTIME_PUSH, signature);
- LOG.warn("finish push replica error: {}",
request.getTaskStatus().getErrorMsgs().toString());
+ TStatus taskStatus = request.getTaskStatus();
+ String msg = task.getBackendId() + ": " +
taskStatus.getErrorMsgs().toString();
+ LOG.warn("finish push replica, signature={}, error: {}",
+ signature, taskStatus.getErrorMsgs().toString());
+ if (taskStatus.getStatusCode() ==
TStatusCode.OBTAIN_LOCK_FAILED) {
+ // retry if obtain lock failed
+ return;
+ }
+ if (taskStatus.getStatusCode() ==
TStatusCode.INVALID_ARGUMENT) {
+ pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
+ } else {
+ pushTask.countDownLatchWithStatus(backendId, pushTabletId,
+ new Status(taskStatus.getStatusCode(), msg));
}
+ AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH,
signature);
}
return;
}
@@ -344,10 +358,6 @@ public class MasterImpl {
return;
}
- long tableId = pushTask.getTableId();
- long partitionId = pushTask.getPartitionId();
- long pushIndexId = pushTask.getIndexId();
- long pushTabletId = pushTask.getTabletId();
// push finish type:
// numOfFinishTabletInfos tabletId schemaHash
// Normal: 1 / /
@@ -445,7 +455,7 @@ public class MasterImpl {
AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH,
signature);
LOG.warn("finish push replica error", e);
if (pushTask.getPushType() == TPushType.DELETE) {
- pushTask.countDownLatch(backendId, pushTabletId);
+ pushTask.countDownLatchWithStatus(backendId, pushTabletId,
Status.CANCELLED);
}
} finally {
olapTable.writeUnlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index 0dadef4dee1..af98e5bd2b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -220,6 +220,18 @@ public class PushTask extends AgentTask {
}
}
+ public void countDownLatchWithStatus(long backendId, long tabletId, Status
st) {
+ if (this.latch == null) {
+ return;
+ }
+ if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("pushTask current latch count with status: {}.
backend: {}, tablet:{}, st::{}",
+ latch.getCount(), backendId, tabletId, st);
+ }
+ }
+ }
+
// call this always means one of tasks is failed. count down to zero to
finish entire task
public void countDownToZero(TStatusCode code, String errMsg) {
if (this.latch != null) {
diff --git
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
index b15309891da..769fc88b053 100644
--- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllBEs()
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+ [error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too
many versions"])
+
+ test {
+ sql """delete from ${tableName} where col1 = "false" and col2 =
"-9999782574499444.2" and col3 = "-25"; """
+ exception "too many versions"
+ }
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
def t1 = Thread.start {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]