This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a2a87f2ecb5 branch-3.0-pick: [Fix](cloud-mow) Handle tablet delete
bitmap calculation task's return value unifiedly in
CloudEngineCalcDeleteBitmapTask::execute #50019 (#50045)
a2a87f2ecb5 is described below
commit a2a87f2ecb5b3198e884977efaff49eef7e5441a
Author: bobhan1 <[email protected]>
AuthorDate: Tue Apr 22 11:17:47 2025 +0800
branch-3.0-pick: [Fix](cloud-mow) Handle tablet delete bitmap calculation
task's return value unifiedly in CloudEngineCalcDeleteBitmapTask::execute
#50019 (#50045)
pick https://github.com/apache/doris/pull/50019
---
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 41 ++++----
.../cloud/cloud_engine_calc_delete_bitmap_task.h | 4 +-
..._cloud_multi_segments_re_calc_in_publish.groovy | 105 +++++++++++++++------
3 files changed, 97 insertions(+), 53 deletions(-)
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 39c0575c8b1..a55d4f42f66 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -81,7 +81,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr =
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
- _engine, this, tablet_id, transaction_id, version);
+ _engine, tablet_id, transaction_id, version);
if (has_compaction_stats) {
tablet_calc_delete_bitmap_ptr->set_compaction_stats(
partition.base_compaction_cnts[i],
partition.cumulative_compaction_cnts[i],
@@ -90,10 +90,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
if (has_tablet_states) {
tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]);
}
- auto submit_st = token->submit_func([=]() {
+ auto submit_st = token->submit_func([tablet_id,
tablet_calc_delete_bitmap_ptr, this]() {
auto st = tablet_calc_delete_bitmap_ptr->handle();
- if (!st.ok()) {
+ if (st.ok()) {
+ add_succ_tablet_id(tablet_id);
+ } else {
LOG(WARNING) << "handle calc delete bitmap fail, st=" <<
st.to_string();
+ add_error_tablet_id(tablet_id, st);
}
});
VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" <<
tablet_id;
@@ -113,11 +116,11 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
return _res;
}
-CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
- CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask*
engine_task, int64_t tablet_id,
- int64_t transaction_id, int64_t version)
+CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(CloudStorageEngine&
engine,
+ int64_t
tablet_id,
+ int64_t
transaction_id,
+ int64_t
version)
: _engine(engine),
- _engine_calc_delete_bitmap_task(engine_task),
_tablet_id(tablet_id),
_transaction_id(transaction_id),
_version(version) {
@@ -146,10 +149,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_ptr<CloudTablet> tablet =
std::dynamic_pointer_cast<CloudTablet>(base_tablet);
if (tablet == nullptr) {
LOG(WARNING) << "can't get tablet when calculate delete bitmap.
tablet_id=" << _tablet_id;
- auto error_st = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
+ return Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
"can't get tablet when calculate delete bitmap. tablet_id={}",
_tablet_id);
- _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
error_st);
- return error_st;
}
int64_t max_version = tablet->max_version_unlocked();
int64_t t2 = MonotonicMicros();
@@ -177,18 +178,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" <<
sync_st;
- _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
- _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be
calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
- return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
- "invalid tablet state {}. tablet_id={}",
tablet->tablet_state(),
- tablet->tablet_id());
+ return Status::OK();
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
@@ -200,10 +197,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
LOG(WARNING) << "version not continuous, current max version=" <<
max_version
<< ", request_version=" << _version << " tablet_id="
<< _tablet_id;
}
- auto error_st =
- Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR,
false>("version not continuous");
- _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
error_st);
- return error_st;
+ return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR,
false>("version not continuous");
}
RowsetSharedPtr rowset;
@@ -219,7 +213,6 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" <<
_tablet_id
<< ", txn_id=" << _transaction_id << ", status=" <<
status;
- _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
status);
return status;
}
@@ -270,6 +263,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
// delete bitmap cache missed, should re-calculate delete bitmaps
between segments
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+
DBUG_EXECUTE_IF("_handle_rowset.inject.before.calc_between_segments", {
+ LOG_INFO("inject error when
CloudTabletCalcDeleteBitmapTask::_handle_rowset");
+ return Status::MemoryLimitExceeded("injected
MemoryLimitExceeded error");
+ });
RETURN_IF_ERROR(
tablet->calc_delete_bitmap_between_segments(rowset,
segments, delete_bitmap));
}
@@ -281,11 +278,9 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" <<
rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" <<
_transaction_id
<< ", status=" << status;
- _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
status);
return status;
}
- _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" <<
_transaction_id
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" <<
rowset->num_rows()
@@ -293,7 +288,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
<< ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", update_delete_bitmap_time_us=" <<
update_delete_bitmap_time_us
<< ", res=" << status;
- return status;
+ return Status::OK();
}
#include "common/compile_check_end.h"
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
index 62bd91b0a8a..cbff46a0b2c 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -33,8 +33,7 @@ class MemTrackerLimiter;
class CloudTabletCalcDeleteBitmapTask {
public:
- CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
- CloudEngineCalcDeleteBitmapTask*
engine_task, int64_t tablet_id,
+ CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, int64_t
tablet_id,
int64_t transaction_id, int64_t version);
~CloudTabletCalcDeleteBitmapTask() = default;
@@ -46,7 +45,6 @@ public:
private:
CloudStorageEngine& _engine;
- CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
int64_t _tablet_id;
int64_t _transaction_id;
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
index b741a6e9986..e6166d6c41a 100644
---
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
@@ -58,6 +58,36 @@ suite("test_cloud_multi_segments_re_calc_in_publish",
"nonConcurrent") {
assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
}
+ def loadMultiSegmentData = { tableName, rows, succ, String err="" ->
+ // load data that will have multi segments and there are duplicate
keys between segments
+ String content = ""
+ (1..rows).each {
+ content += "${it},${it},${it}\n"
+ }
+ content += content
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ inputStream new ByteArrayInputStream(content.getBytes())
+ time 30000
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(result)
+ if (succ) {
+ assert "success" == json.Status.toLowerCase()
+ assert rows*2 == json.NumberTotalRows
+ assert 0 == json.NumberFilteredRows
+ } else {
+ assert "fail" == json.Status.toLowerCase()
+ assert json.Message.contains(err)
+ }
+ }
+ }
+ }
+
// to cause multi segments
def customBeConfig = [
doris_scanner_row_bytes : 1
@@ -75,33 +105,7 @@ suite("test_cloud_multi_segments_re_calc_in_publish",
"nonConcurrent") {
Thread.sleep(1000)
- def t1 = Thread.start {
- // load data that will have multi segments and there are
duplicate keys between segments
- String content = ""
- (1..4096).each {
- content += "${it},${it},${it}\n"
- }
- content += content
- streamLoad {
- table "${table1}"
- set 'column_separator', ','
- inputStream new ByteArrayInputStream(content.getBytes())
- time 30000 // limit inflight 10s
-
- check { result, exception, startTime, endTime ->
- if (exception != null) {
- throw exception
- }
- def json = parseJson(result)
- assert "success" == json.Status.toLowerCase()
- assert 8192 == json.NumberTotalRows
- assert 0 == json.NumberFilteredRows
- }
- }
- }
-
-
- t1.join()
+ loadMultiSegmentData(table1, 4096, true)
GetDebugPoint().clearDebugPointsForAllBEs()
Thread.sleep(2000)
@@ -120,4 +124,51 @@ suite("test_cloud_multi_segments_re_calc_in_publish",
"nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()
}
}
+
+ // abnormal case, fail when calc between segments
+ def table2 = "test_cloud_multi_segments_re_calc_in_publish_fail"
+ sql "DROP TABLE IF EXISTS ${table2} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table2} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table2} values(99999,99999,99999);"
+ sql "insert into ${table2} values(88888,88888,88888);"
+ sql "insert into ${table2} values(77777,77777,77777);"
+ sql "sync;"
+ qt_sql "select * from ${table2} order by k1;"
+
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss")
+
+ // fail in calc_delete_bitmap_between_segments
+
GetDebugPoint().enableDebugPointForAllBEs("_handle_rowset.inject.before.calc_between_segments")
+
+ Thread.sleep(1000)
+
+ loadMultiSegmentData(table2, 4096, false, "injected
MemoryLimitExceeded error")
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ Thread.sleep(2000)
+
+ qt_sql "select count() from ${table2};"
+
+ qt_dup_key_count "select count() from (select k1,count() as cnt
from ${table2} group by k1 having cnt > 1) A;"
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]