This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new bfa3b731b3a branch-3.1: [fix](delete) Delete should count down latch 
and clear an agent task when failed #57428 (#58230)
bfa3b731b3a is described below

commit bfa3b731b3a041eb44c73bd61d1ce7a0e6a71753
Author: Siyang Tang <[email protected]>
AuthorDate: Mon Dec 15 11:13:11 2025 +0800

    branch-3.1: [fix](delete) Delete should count down latch and clear an agent 
task when failed #57428 (#58230)
    
    picked from #57428
---
 be/src/olap/push_handler.cpp                       |  6 ++---
 .../java/org/apache/doris/master/MasterImpl.java   | 30 ++++++++++++++--------
 .../main/java/org/apache/doris/task/PushTask.java  | 12 +++++++++
 .../test_delete_from_timeout.groovy                | 10 ++++++++
 4 files changed, 44 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index e97639b7d95..07c6b5371e7 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -122,12 +122,10 @@ Status 
PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
 
     std::shared_lock base_migration_rlock(tablet->get_migration_lock(), 
std::try_to_lock);
     DBUG_EXECUTE_IF("PushHandler::_do_streaming_ingestion.try_lock_fail", {
-        return Status::Error<TRY_LOCK_FAILED>(
-                "PushHandler::_do_streaming_ingestion get lock failed");
+        return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion 
get lock failed");
     })
     if (!base_migration_rlock.owns_lock()) {
-        return Status::Error<TRY_LOCK_FAILED>(
-                "PushHandler::_do_streaming_ingestion get lock failed");
+        return Status::ObtainLockFailed("PushHandler::_do_streaming_ingestion 
get lock failed");
     }
     PUniqueId load_id;
     load_id.set_hi(0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 964bb493d4b..d7fecf826aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -33,6 +33,7 @@ import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.master.CloudReportHandler;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Status;
 import org.apache.doris.load.DeleteJob;
 import org.apache.doris.load.loadv2.IngestionLoadJob;
 import org.apache.doris.system.Backend;
@@ -319,18 +320,31 @@ public class MasterImpl {
         long backendId = pushTask.getBackendId();
         long signature = task.getSignature();
         long transactionId = ((PushTask) task).getTransactionId();
+        long tableId = pushTask.getTableId();
+        long partitionId = pushTask.getPartitionId();
+        long pushIndexId = pushTask.getIndexId();
+        long pushTabletId = pushTask.getTabletId();
 
         if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
             if (pushTask.getPushType() == TPushType.DELETE) {
                 // we don't need to retry if the returned status code is 
DELETE_INVALID_CONDITION
                 // or DELETE_INVALID_PARAMETERS
                 // note that they will be converted to 
TStatusCode.INVALID_ARGUMENT when being sent from be to fe
-                if (request.getTaskStatus().getStatusCode() == 
TStatusCode.INVALID_ARGUMENT) {
-                    
pushTask.countDownToZero(request.getTaskStatus().getStatusCode(),
-                            task.getBackendId() + ": " + 
request.getTaskStatus().getErrorMsgs().toString());
-                    AgentTaskQueue.removeTask(backendId, 
TTaskType.REALTIME_PUSH, signature);
-                    LOG.warn("finish push replica error: {}", 
request.getTaskStatus().getErrorMsgs().toString());
+                TStatus taskStatus = request.getTaskStatus();
+                String msg = task.getBackendId() + ": " + 
taskStatus.getErrorMsgs().toString();
+                LOG.warn("finish push replica, signature={}, error: {}",
+                        signature, taskStatus.getErrorMsgs().toString());
+                if (taskStatus.getStatusCode() == 
TStatusCode.OBTAIN_LOCK_FAILED) {
+                    // retry if obtain lock failed
+                    return;
+                }
+                if (taskStatus.getStatusCode() == 
TStatusCode.INVALID_ARGUMENT) {
+                    pushTask.countDownToZero(taskStatus.getStatusCode(), msg);
+                } else {
+                    pushTask.countDownLatchWithStatus(backendId, pushTabletId,
+                            new Status(taskStatus.getStatusCode(), msg));
                 }
+                AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, 
signature);
             }
             return;
         }
@@ -344,10 +358,6 @@ public class MasterImpl {
             return;
         }
 
-        long tableId = pushTask.getTableId();
-        long partitionId = pushTask.getPartitionId();
-        long pushIndexId = pushTask.getIndexId();
-        long pushTabletId = pushTask.getTabletId();
         // push finish type:
         //                  numOfFinishTabletInfos  tabletId schemaHash
         // Normal:                     1                   /          /
@@ -445,7 +455,7 @@ public class MasterImpl {
             AgentTaskQueue.removeTask(backendId, TTaskType.REALTIME_PUSH, 
signature);
             LOG.warn("finish push replica error", e);
             if (pushTask.getPushType() == TPushType.DELETE) {
-                pushTask.countDownLatch(backendId, pushTabletId);
+                pushTask.countDownLatchWithStatus(backendId, pushTabletId, 
Status.CANCELLED);
             }
         } finally {
             olapTable.writeUnlock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index 0dadef4dee1..af98e5bd2b3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -220,6 +220,18 @@ public class PushTask extends AgentTask {
         }
     }
 
+    public void countDownLatchWithStatus(long backendId, long tabletId, Status 
st) {
+        if (this.latch == null) {
+            return;
+        }
+        if (latch.markedCountDownWithStatus(backendId, tabletId, st)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("pushTask current latch count with status: {}. 
backend: {}, tablet:{}, st::{}",
+                        latch.getCount(), backendId, tabletId, st);
+            }
+        }
+    }
+
     // call this always means one of tasks is failed. count down to zero to 
finish entire task
     public void countDownToZero(TStatusCode code, String errMsg) {
         if (this.latch != null) {
diff --git 
a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy 
b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
index b15309891da..769fc88b053 100644
--- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
+++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy
@@ -46,6 +46,16 @@ suite("test_delete_from_timeout","nonConcurrent") {
 
         GetDebugPoint().clearDebugPointsForAllBEs()
 
+        
GetDebugPoint().enableDebugPointForAllBEs("DeleteHandler::generate_delete_predicate.inject_failure",
+                [error_code: -235 /* TOO MANY VERSIONS */, error_msg: "too 
many versions"])
+
+        test {
+            sql """delete from ${tableName} where col1 = "false" and col2 = 
"-9999782574499444.2" and col3 = "-25"; """
+            exception "too many versions"
+        }
+
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
         
GetDebugPoint().enableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
 
         def t1 = Thread.start {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to