This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 128af1a3a88 [fix](load) Add channel records to fix the unknown load_id 
error (#57767)
128af1a3a88 is described below

commit 128af1a3a889f08639e32ce7880aaa9f75f2c807
Author: Refrain <[email protected]>
AuthorDate: Thu Nov 27 11:02:07 2025 +0800

    [fix](load) Add channel records to fix the unknown load_id error (#57767)
    
    Problem Summary:
    
    if DataQualityError happened, this instance will cancel itself
    downstream BE's load channel , the LoadChannelMgr will remove this
    load_id record.
    
    But if there are a tablet_writer_add_block rpc in flight, it will cause
    the unknow load_id error, cuz it can not find this load_id record in the
    _get_load_channel
---
 be/src/runtime/fragment_mgr.cpp                    |   7 ++
 be/src/runtime/load_channel_mgr.cpp                |  53 ++++++++--
 be/src/runtime/load_channel_mgr.h                  |  19 +++-
 be/src/runtime/memory/cache_policy.h               |   8 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   1 +
 gensrc/proto/internal_service.proto                |   1 +
 .../insert/test_insert_rpc_order_problem.groovy    | 114 +++++++++++++++++++++
 7 files changed, 183 insertions(+), 20 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 88bb449fc40..fed6c3931fe 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -367,6 +367,13 @@ Status FragmentMgr::trigger_pipeline_context_report(
 // Also, the reported status will always reflect the most recent execution 
status,
 // including the final status when execution finishes.
 void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+    DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
+        int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 
8 : 2;
+        LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", 
print_id(req.query_id));
+        std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
+        LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
+    });
+
     DCHECK(req.status.ok() || req.done); // if !status.ok() => done
     if (req.coord_addr.hostname == "external") {
         // External query (flink/spark read tablets) not need to report to FE.
diff --git a/be/src/runtime/load_channel_mgr.cpp 
b/be/src/runtime/load_channel_mgr.cpp
index 0bb352d2feb..ec59a13fdc2 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -74,7 +74,7 @@ void LoadChannelMgr::stop() {
 }
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
-    _last_success_channels = std::make_unique<LastSuccessChannelCache>(1024);
+    _load_state_channels = std::make_unique<LoadStateChannelCache>(1024);
     RETURN_IF_ERROR(_start_bg_worker());
     return Status::OK();
 }
@@ -117,15 +117,28 @@ Status 
LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
     std::lock_guard<std::mutex> l(_lock);
     auto it = _load_channels.find(load_id);
     if (it == _load_channels.end()) {
-        auto* handle = _last_success_channels->lookup(load_id.to_string());
-        // success only when eos be true
+        Cache::Handle* handle = 
_load_state_channels->lookup(load_id.to_string());
         if (handle != nullptr) {
-            _last_success_channels->release(handle);
-            if (request.has_eos() && request.eos()) {
-                is_eof = true;
-                return Status::OK();
+            // load is cancelled
+            if (auto* value = _load_state_channels->value(handle); value != 
nullptr) {
+                const auto& cancel_reason = 
reinterpret_cast<CacheValue*>(value)->_cancel_reason;
+                _load_state_channels->release(handle);
+                if (!cancel_reason.empty()) {
+                    LOG(INFO) << fmt::format(
+                            "The channel has been cancelled, load_id = {}, 
error = {}",
+                            print_id(load_id), cancel_reason);
+                    return Status::Cancelled(cancel_reason);
+                }
+            } else {
+                // load is success, success only when eos be true
+                _load_state_channels->release(handle);
+                if (request.has_eos() && request.eos()) {
+                    is_eof = true;
+                    return Status::OK();
+                }
             }
         }
+
         return Status::InternalError<false>(
                 "Fail to add batch in load channel: unknown load_id={}. "
                 "This may be due to a BE restart. Please retry the load.",
@@ -179,11 +192,11 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId 
load_id) {
     VLOG_NOTICE << "removing load channel " << load_id << " because it's 
finished";
     {
         std::lock_guard<std::mutex> l(_lock);
-        if (_load_channels.find(load_id) != _load_channels.end()) {
+        if (_load_channels.contains(load_id)) {
             _load_channels.erase(load_id);
         }
-        auto* handle = _last_success_channels->insert(load_id.to_string(), 
nullptr, 1, 1);
-        _last_success_channels->release(handle);
+        auto* handle = _load_state_channels->insert(load_id.to_string(), 
nullptr, 1, 1);
+        _load_state_channels->release(handle);
     }
     VLOG_CRITICAL << "removed load channel " << load_id;
 }
@@ -193,10 +206,28 @@ Status LoadChannelMgr::cancel(const 
PTabletWriterCancelRequest& params) {
     std::shared_ptr<LoadChannel> cancelled_channel;
     {
         std::lock_guard<std::mutex> l(_lock);
-        if (_load_channels.find(load_id) != _load_channels.end()) {
+        if (_load_channels.contains(load_id)) {
             cancelled_channel = _load_channels[load_id];
             _load_channels.erase(load_id);
         }
+        // We just need to record the first cancel msg
+        auto* existing_handle = 
_load_state_channels->lookup(load_id.to_string());
+        if (existing_handle == nullptr) {
+            if (params.has_cancel_reason() && !params.cancel_reason().empty()) 
{
+                std::unique_ptr<CacheValue> cancel_reason_ptr = 
std::make_unique<CacheValue>();
+                cancel_reason_ptr->_cancel_reason = params.cancel_reason();
+                size_t cache_capacity =
+                        cancel_reason_ptr->_cancel_reason.capacity() + 
sizeof(CacheValue);
+                auto* handle = _load_state_channels->insert(
+                        load_id.to_string(), cancel_reason_ptr.get(), 1, 
cache_capacity);
+                cancel_reason_ptr.release();
+                _load_state_channels->release(handle);
+                LOG(INFO) << fmt::format("load_id = {}, record_error reason = 
{}",
+                                         print_id(load_id), 
params.cancel_reason());
+            }
+        } else {
+            _load_state_channels->release(existing_handle);
+        }
     }
 
     if (cancelled_channel != nullptr) {
diff --git a/be/src/runtime/load_channel_mgr.h 
b/be/src/runtime/load_channel_mgr.h
index 74ced68a1dd..989f2ac0530 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -24,6 +24,7 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <string>
 #include <unordered_map>
 #include <utility>
 
@@ -81,20 +82,28 @@ private:
 
     Status _start_bg_worker();
 
-    class LastSuccessChannelCache : public LRUCachePolicy {
+    class LoadStateChannelCache : public LRUCachePolicy {
     public:
-        LastSuccessChannelCache(size_t capacity)
-                : 
LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity,
-                                 LRUCacheType::SIZE, -1, 
DEFAULT_LRU_CACHE_NUM_SHARDS,
+        class CacheValue : public LRUCacheValueBase {
+        public:
+            std::string _cancel_reason;
+        };
+
+        LoadStateChannelCache(size_t capacity)
+                : 
LRUCachePolicy(CachePolicy::CacheType::LOAD_STATE_CHANNEL_CACHE, capacity,
+                                 LRUCacheType::NUMBER, -1, 
DEFAULT_LRU_CACHE_NUM_SHARDS,
                                  DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY, 
false) {}
     };
 
+    using CacheValue = LoadStateChannelCache::CacheValue;
+
 protected:
     // lock protect the load channel map
     std::mutex _lock;
     // load id -> load channel
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
-    std::unique_ptr<LastSuccessChannelCache> _last_success_channels;
+    // load id window, remember the recently initiated load id, regardless of 
whether they succeed or fail
+    std::unique_ptr<LoadStateChannelCache> _load_state_channels;
 
     MemTableMemoryLimiter* _memtable_memory_limiter = nullptr;
 
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index c8eb144cffb..3a072de82f1 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -44,7 +44,7 @@ public:
         POINT_QUERY_ROW_CACHE = 8,
         DELETE_BITMAP_AGG_CACHE = 9,
         TABLET_VERSION_CACHE = 10,
-        LAST_SUCCESS_CHANNEL_CACHE = 11,
+        LOAD_STATE_CHANNEL_CACHE = 11,
         COMMON_OBJ_LRU_CACHE = 12,
         FOR_UT_CACHE_SIZE = 13,
         TABLET_SCHEMA_CACHE = 14,
@@ -83,8 +83,8 @@ public:
             return "MowDeleteBitmapAggCache";
         case CacheType::TABLET_VERSION_CACHE:
             return "MowTabletVersionCache";
-        case CacheType::LAST_SUCCESS_CHANNEL_CACHE:
-            return "LastSuccessChannelCache";
+        case CacheType::LOAD_STATE_CHANNEL_CACHE:
+            return "LoadStateChannelCache ";
         case CacheType::COMMON_OBJ_LRU_CACHE:
             return "CommonObjLRUCache";
         case CacheType::FOR_UT_CACHE_SIZE:
@@ -126,7 +126,7 @@ public:
             {"PointQueryRowCache", CacheType::POINT_QUERY_ROW_CACHE},
             {"MowDeleteBitmapAggCache", CacheType::DELETE_BITMAP_AGG_CACHE},
             {"MowTabletVersionCache", CacheType::TABLET_VERSION_CACHE},
-            {"LastSuccessChannelCache", CacheType::LAST_SUCCESS_CHANNEL_CACHE},
+            {"LoadStateChannelCache ", CacheType::LOAD_STATE_CHANNEL_CACHE},
             {"CommonObjLRUCache", CacheType::COMMON_OBJ_LRU_CACHE},
             {"ForUTCacheSize", CacheType::FOR_UT_CACHE_SIZE},
             {"TabletSchemaCache", CacheType::TABLET_SCHEMA_CACHE},
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fb864435cdf..62531151f82 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1189,6 +1189,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
     request->set_allocated_id(&_parent->_load_id);
     request->set_index_id(_index_channel->_index_id);
     request->set_sender_id(_parent->_sender_id);
+    request->set_cancel_reason(cancel_msg);
 
     auto cancel_callback = 
DummyBrpcCallback<PTabletWriterCancelResult>::create_shared();
     auto closure = AutoReleaseClosure<
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 69659aed523..afd62513b83 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -226,6 +226,7 @@ message PTabletWriterCancelRequest {
     required PUniqueId id = 1;
     required int64 index_id = 2;
     required int32 sender_id = 3;
+    optional string cancel_reason = 4;
 };
 
 message PTabletWriterCancelResult {
diff --git 
a/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy 
b/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy
new file mode 100644
index 00000000000..6b08d20b992
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_insert_rpc_order_problem', 'docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'enable_debug_points = true',
+        'min_bytes_per_broker_scanner = 100',
+        'parallel_pipeline_task_num = 2'
+    ]
+    options.beConfigs += [
+        'enable_debug_points = true',
+    ]
+    options.beNum = 3
+
+    docker(options) {
+        def tableName = "test_insert_rpc_order_problem"
+        sql """drop table if exists ${tableName}"""
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                L_ORDERKEY    INTEGER NOT NULL,
+                L_PARTKEY     INTEGER NOT NULL,
+                L_SUPPKEY     INTEGER NOT NULL,
+                L_LINENUMBER  INTEGER NOT NULL,
+                L_QUANTITY    DECIMAL(15,2) NOT NULL,
+                L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
+                L_DISCOUNT    DECIMAL(15,2) NOT NULL,
+                L_TAX         DECIMAL(15,2) NOT NULL,
+
+                L_RETURNFLAG  CHAR(1) NOT NULL,
+                L_LINESTATUS  CHAR(1) NOT NULL,
+                L_SHIPDATE    DATE NOT NULL,
+                L_COMMITDATE  DATE NOT NULL,
+                L_RECEIPTDATE DATE NOT NULL,
+                L_SHIPINSTRUCT CHAR(25) NOT NULL,
+                L_SHIPMODE     CHAR(10) NOT NULL,
+                L_COMMENT      VARCHAR(44) NOT NULL
+            )
+            UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+            PARTITION BY RANGE(L_ORDERKEY) (
+                PARTITION p2023 VALUES LESS THAN ("6000010") 
+            )
+            DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+            PROPERTIES (
+                "replication_num" = "3"
+            );
+        """
+
+        // Disable LoadStream to use old LoadChannel mechanism
+        sql """ set enable_memtable_on_sink_node = false; """
+        sql """ set parallel_pipeline_task_num = 2; """
+
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs(
+                'FragmentMgr::coordinator_callback.report_delay'
+            )
+            def label = "test_insert_rpc_order_problem"
+
+            sql """
+            LOAD LABEL ${label} (
+                DATA 
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/{lineitem,lineitem2}.csv.split01.gz")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY "|"
+                FORMAT AS "CSV"
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "${getS3AK()}",
+                "AWS_SECRET_KEY" = "${getS3SK()}",
+                "AWS_ENDPOINT" = "${getS3Endpoint()}",
+                "AWS_REGION" = "${getS3Region()}",
+                "provider" = "${getS3Provider()}"
+            ) 
+            """
+
+            def max_try_milli_secs = 600000
+            while (max_try_milli_secs > 0) {
+                String[][] result = sql """ show load where label="$label" 
order by createtime desc limit 1; """
+                logger.info("Load result: " + result[0])
+                if (result[0][2].equals("FINISHED")) {
+                    logger.info("SHOW LOAD : $result")
+                    assertTrue(1 == 2, "should not finished")
+                }
+                if (result[0][2].equals("CANCELLED")) {
+                    def reason = result[0][7]
+                    assertTrue(reason.contains("DATA_QUALITY_ERROR"), "should 
have DATA_QUALITY_ERROR or unknown load_id : $reason")
+                    break
+                }
+                Thread.sleep(1000)
+                max_try_milli_secs -= 1000
+                if(max_try_milli_secs <= 0) {
+                    assertTrue(1 == 2, "load Timeout: $label")
+                }
+            }
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::coordinator_callback.report_delay")
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to