This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 128af1a3a88 [fix](load) Add channel records to fix the unknown load_id
error (#57767)
128af1a3a88 is described below
commit 128af1a3a889f08639e32ce7880aaa9f75f2c807
Author: Refrain <[email protected]>
AuthorDate: Thu Nov 27 11:02:07 2025 +0800
[fix](load) Add channel records to fix the unknown load_id error (#57767)
Problem Summary:
if DataQualityError happened, this instance will cancel itself
downstream BE's load channel , the LoadChannelMgr will remove this
load_id record.
But if there are a tablet_writer_add_block rpc in flight, it will cause
the unknow load_id error, cuz it can not find this load_id record in the
_get_load_channel
---
be/src/runtime/fragment_mgr.cpp | 7 ++
be/src/runtime/load_channel_mgr.cpp | 53 ++++++++--
be/src/runtime/load_channel_mgr.h | 19 +++-
be/src/runtime/memory/cache_policy.h | 8 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 1 +
gensrc/proto/internal_service.proto | 1 +
.../insert/test_insert_rpc_order_problem.groovy | 114 +++++++++++++++++++++
7 files changed, 183 insertions(+), 20 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 88bb449fc40..fed6c3931fe 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -367,6 +367,13 @@ Status FragmentMgr::trigger_pipeline_context_report(
// Also, the reported status will always reflect the most recent execution
status,
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+ DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
+ int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ?
8 : 2;
+ LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id",
print_id(req.query_id));
+ std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
+ LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
+ });
+
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
if (req.coord_addr.hostname == "external") {
// External query (flink/spark read tablets) not need to report to FE.
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 0bb352d2feb..ec59a13fdc2 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -74,7 +74,7 @@ void LoadChannelMgr::stop() {
}
Status LoadChannelMgr::init(int64_t process_mem_limit) {
- _last_success_channels = std::make_unique<LastSuccessChannelCache>(1024);
+ _load_state_channels = std::make_unique<LoadStateChannelCache>(1024);
RETURN_IF_ERROR(_start_bg_worker());
return Status::OK();
}
@@ -117,15 +117,28 @@ Status
LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
std::lock_guard<std::mutex> l(_lock);
auto it = _load_channels.find(load_id);
if (it == _load_channels.end()) {
- auto* handle = _last_success_channels->lookup(load_id.to_string());
- // success only when eos be true
+ Cache::Handle* handle =
_load_state_channels->lookup(load_id.to_string());
if (handle != nullptr) {
- _last_success_channels->release(handle);
- if (request.has_eos() && request.eos()) {
- is_eof = true;
- return Status::OK();
+ // load is cancelled
+ if (auto* value = _load_state_channels->value(handle); value !=
nullptr) {
+ const auto& cancel_reason =
reinterpret_cast<CacheValue*>(value)->_cancel_reason;
+ _load_state_channels->release(handle);
+ if (!cancel_reason.empty()) {
+ LOG(INFO) << fmt::format(
+ "The channel has been cancelled, load_id = {},
error = {}",
+ print_id(load_id), cancel_reason);
+ return Status::Cancelled(cancel_reason);
+ }
+ } else {
+ // load is success, success only when eos be true
+ _load_state_channels->release(handle);
+ if (request.has_eos() && request.eos()) {
+ is_eof = true;
+ return Status::OK();
+ }
}
}
+
return Status::InternalError<false>(
"Fail to add batch in load channel: unknown load_id={}. "
"This may be due to a BE restart. Please retry the load.",
@@ -179,11 +192,11 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId
load_id) {
VLOG_NOTICE << "removing load channel " << load_id << " because it's
finished";
{
std::lock_guard<std::mutex> l(_lock);
- if (_load_channels.find(load_id) != _load_channels.end()) {
+ if (_load_channels.contains(load_id)) {
_load_channels.erase(load_id);
}
- auto* handle = _last_success_channels->insert(load_id.to_string(),
nullptr, 1, 1);
- _last_success_channels->release(handle);
+ auto* handle = _load_state_channels->insert(load_id.to_string(),
nullptr, 1, 1);
+ _load_state_channels->release(handle);
}
VLOG_CRITICAL << "removed load channel " << load_id;
}
@@ -193,10 +206,28 @@ Status LoadChannelMgr::cancel(const
PTabletWriterCancelRequest& params) {
std::shared_ptr<LoadChannel> cancelled_channel;
{
std::lock_guard<std::mutex> l(_lock);
- if (_load_channels.find(load_id) != _load_channels.end()) {
+ if (_load_channels.contains(load_id)) {
cancelled_channel = _load_channels[load_id];
_load_channels.erase(load_id);
}
+ // We just need to record the first cancel msg
+ auto* existing_handle =
_load_state_channels->lookup(load_id.to_string());
+ if (existing_handle == nullptr) {
+ if (params.has_cancel_reason() && !params.cancel_reason().empty())
{
+ std::unique_ptr<CacheValue> cancel_reason_ptr =
std::make_unique<CacheValue>();
+ cancel_reason_ptr->_cancel_reason = params.cancel_reason();
+ size_t cache_capacity =
+ cancel_reason_ptr->_cancel_reason.capacity() +
sizeof(CacheValue);
+ auto* handle = _load_state_channels->insert(
+ load_id.to_string(), cancel_reason_ptr.get(), 1,
cache_capacity);
+ cancel_reason_ptr.release();
+ _load_state_channels->release(handle);
+ LOG(INFO) << fmt::format("load_id = {}, record_error reason =
{}",
+ print_id(load_id),
params.cancel_reason());
+ }
+ } else {
+ _load_state_channels->release(existing_handle);
+ }
}
if (cancelled_channel != nullptr) {
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 74ced68a1dd..989f2ac0530 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -24,6 +24,7 @@
#include <functional>
#include <memory>
#include <mutex>
+#include <string>
#include <unordered_map>
#include <utility>
@@ -81,20 +82,28 @@ private:
Status _start_bg_worker();
- class LastSuccessChannelCache : public LRUCachePolicy {
+ class LoadStateChannelCache : public LRUCachePolicy {
public:
- LastSuccessChannelCache(size_t capacity)
- :
LRUCachePolicy(CachePolicy::CacheType::LAST_SUCCESS_CHANNEL_CACHE, capacity,
- LRUCacheType::SIZE, -1,
DEFAULT_LRU_CACHE_NUM_SHARDS,
+ class CacheValue : public LRUCacheValueBase {
+ public:
+ std::string _cancel_reason;
+ };
+
+ LoadStateChannelCache(size_t capacity)
+ :
LRUCachePolicy(CachePolicy::CacheType::LOAD_STATE_CHANNEL_CACHE, capacity,
+ LRUCacheType::NUMBER, -1,
DEFAULT_LRU_CACHE_NUM_SHARDS,
DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY,
false) {}
};
+ using CacheValue = LoadStateChannelCache::CacheValue;
+
protected:
// lock protect the load channel map
std::mutex _lock;
// load id -> load channel
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
- std::unique_ptr<LastSuccessChannelCache> _last_success_channels;
+ // load id window, remember the recently initiated load id, regardless of
whether they succeed or fail
+ std::unique_ptr<LoadStateChannelCache> _load_state_channels;
MemTableMemoryLimiter* _memtable_memory_limiter = nullptr;
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index c8eb144cffb..3a072de82f1 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -44,7 +44,7 @@ public:
POINT_QUERY_ROW_CACHE = 8,
DELETE_BITMAP_AGG_CACHE = 9,
TABLET_VERSION_CACHE = 10,
- LAST_SUCCESS_CHANNEL_CACHE = 11,
+ LOAD_STATE_CHANNEL_CACHE = 11,
COMMON_OBJ_LRU_CACHE = 12,
FOR_UT_CACHE_SIZE = 13,
TABLET_SCHEMA_CACHE = 14,
@@ -83,8 +83,8 @@ public:
return "MowDeleteBitmapAggCache";
case CacheType::TABLET_VERSION_CACHE:
return "MowTabletVersionCache";
- case CacheType::LAST_SUCCESS_CHANNEL_CACHE:
- return "LastSuccessChannelCache";
+ case CacheType::LOAD_STATE_CHANNEL_CACHE:
+ return "LoadStateChannelCache ";
case CacheType::COMMON_OBJ_LRU_CACHE:
return "CommonObjLRUCache";
case CacheType::FOR_UT_CACHE_SIZE:
@@ -126,7 +126,7 @@ public:
{"PointQueryRowCache", CacheType::POINT_QUERY_ROW_CACHE},
{"MowDeleteBitmapAggCache", CacheType::DELETE_BITMAP_AGG_CACHE},
{"MowTabletVersionCache", CacheType::TABLET_VERSION_CACHE},
- {"LastSuccessChannelCache", CacheType::LAST_SUCCESS_CHANNEL_CACHE},
+ {"LoadStateChannelCache ", CacheType::LOAD_STATE_CHANNEL_CACHE},
{"CommonObjLRUCache", CacheType::COMMON_OBJ_LRU_CACHE},
{"ForUTCacheSize", CacheType::FOR_UT_CACHE_SIZE},
{"TabletSchemaCache", CacheType::TABLET_SCHEMA_CACHE},
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index fb864435cdf..62531151f82 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1189,6 +1189,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
request->set_allocated_id(&_parent->_load_id);
request->set_index_id(_index_channel->_index_id);
request->set_sender_id(_parent->_sender_id);
+ request->set_cancel_reason(cancel_msg);
auto cancel_callback =
DummyBrpcCallback<PTabletWriterCancelResult>::create_shared();
auto closure = AutoReleaseClosure<
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 69659aed523..afd62513b83 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -226,6 +226,7 @@ message PTabletWriterCancelRequest {
required PUniqueId id = 1;
required int64 index_id = 2;
required int32 sender_id = 3;
+ optional string cancel_reason = 4;
};
message PTabletWriterCancelResult {
diff --git
a/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy
b/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy
new file mode 100644
index 00000000000..6b08d20b992
--- /dev/null
+++ b/regression-test/suites/load_p0/insert/test_insert_rpc_order_problem.groovy
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_insert_rpc_order_problem', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'enable_debug_points = true',
+ 'min_bytes_per_broker_scanner = 100',
+ 'parallel_pipeline_task_num = 2'
+ ]
+ options.beConfigs += [
+ 'enable_debug_points = true',
+ ]
+ options.beNum = 3
+
+ docker(options) {
+ def tableName = "test_insert_rpc_order_problem"
+ sql """drop table if exists ${tableName}"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+ )
+ UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+ PARTITION BY RANGE(L_ORDERKEY) (
+ PARTITION p2023 VALUES LESS THAN ("6000010")
+ )
+ DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "3"
+ );
+ """
+
+ // Disable LoadStream to use old LoadChannel mechanism
+ sql """ set enable_memtable_on_sink_node = false; """
+ sql """ set parallel_pipeline_task_num = 2; """
+
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs(
+ 'FragmentMgr::coordinator_callback.report_delay'
+ )
+ def label = "test_insert_rpc_order_problem"
+
+ sql """
+ LOAD LABEL ${label} (
+ DATA
INFILE("s3://${getS3BucketName()}/regression/tpch/sf1/{lineitem,lineitem2}.csv.split01.gz")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "|"
+ FORMAT AS "CSV"
+ )
+ WITH S3 (
+ "AWS_ACCESS_KEY" = "${getS3AK()}",
+ "AWS_SECRET_KEY" = "${getS3SK()}",
+ "AWS_ENDPOINT" = "${getS3Endpoint()}",
+ "AWS_REGION" = "${getS3Region()}",
+ "provider" = "${getS3Provider()}"
+ )
+ """
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ String[][] result = sql """ show load where label="$label"
order by createtime desc limit 1; """
+ logger.info("Load result: " + result[0])
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("SHOW LOAD : $result")
+ assertTrue(1 == 2, "should not finished")
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ def reason = result[0][7]
+ assertTrue(reason.contains("DATA_QUALITY_ERROR"), "should
have DATA_QUALITY_ERROR or unknown load_id : $reason")
+ break
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("FragmentMgr::coordinator_callback.report_delay")
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]