This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a2a87f2ecb5 branch-3.0-pick: [Fix](cloud-mow) Handle tablet delete 
bitmap calculation task's return value unifiedly in 
CloudEngineCalcDeleteBitmapTask::execute #50019 (#50045)
a2a87f2ecb5 is described below

commit a2a87f2ecb5b3198e884977efaff49eef7e5441a
Author: bobhan1 <[email protected]>
AuthorDate: Tue Apr 22 11:17:47 2025 +0800

    branch-3.0-pick: [Fix](cloud-mow) Handle tablet delete bitmap calculation 
task's return value unifiedly in CloudEngineCalcDeleteBitmapTask::execute 
#50019 (#50045)
    
    pick https://github.com/apache/doris/pull/50019
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |  41 ++++----
 .../cloud/cloud_engine_calc_delete_bitmap_task.h   |   4 +-
 ..._cloud_multi_segments_re_calc_in_publish.groovy | 105 +++++++++++++++------
 3 files changed, 97 insertions(+), 53 deletions(-)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 39c0575c8b1..a55d4f42f66 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -81,7 +81,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
         for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
             auto tablet_id = partition.tablet_ids[i];
             auto tablet_calc_delete_bitmap_ptr = 
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
-                    _engine, this, tablet_id, transaction_id, version);
+                    _engine, tablet_id, transaction_id, version);
             if (has_compaction_stats) {
                 tablet_calc_delete_bitmap_ptr->set_compaction_stats(
                         partition.base_compaction_cnts[i], 
partition.cumulative_compaction_cnts[i],
@@ -90,10 +90,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
             if (has_tablet_states) {
                 
tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]);
             }
-            auto submit_st = token->submit_func([=]() {
+            auto submit_st = token->submit_func([tablet_id, 
tablet_calc_delete_bitmap_ptr, this]() {
                 auto st = tablet_calc_delete_bitmap_ptr->handle();
-                if (!st.ok()) {
+                if (st.ok()) {
+                    add_succ_tablet_id(tablet_id);
+                } else {
                     LOG(WARNING) << "handle calc delete bitmap fail, st=" << 
st.to_string();
+                    add_error_tablet_id(tablet_id, st);
                 }
             });
             VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" << 
tablet_id;
@@ -113,11 +116,11 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
     return _res;
 }
 
-CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
-        CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* 
engine_task, int64_t tablet_id,
-        int64_t transaction_id, int64_t version)
+CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(CloudStorageEngine&
 engine,
+                                                                 int64_t 
tablet_id,
+                                                                 int64_t 
transaction_id,
+                                                                 int64_t 
version)
         : _engine(engine),
-          _engine_calc_delete_bitmap_task(engine_task),
           _tablet_id(tablet_id),
           _transaction_id(transaction_id),
           _version(version) {
@@ -146,10 +149,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
     std::shared_ptr<CloudTablet> tablet = 
std::dynamic_pointer_cast<CloudTablet>(base_tablet);
     if (tablet == nullptr) {
         LOG(WARNING) << "can't get tablet when calculate delete bitmap. 
tablet_id=" << _tablet_id;
-        auto error_st = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
+        return Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
                 "can't get tablet when calculate delete bitmap. tablet_id={}", 
_tablet_id);
-        _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
error_st);
-        return error_st;
     }
     int64_t max_version = tablet->max_version_unlocked();
     int64_t t2 = MonotonicMicros();
@@ -177,18 +178,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
         if (!sync_st.ok()) {
             LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
                          << ", txn_id=" << _transaction_id << ", status=" << 
sync_st;
-            _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
sync_st);
             return sync_st;
         }
         if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
-            _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
             LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
                          "tablet_id: "
                       << _tablet_id << " txn_id: " << _transaction_id
                       << ", request_version=" << _version;
-            return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
-                    "invalid tablet state {}. tablet_id={}", 
tablet->tablet_state(),
-                    tablet->tablet_id());
+            return Status::OK();
         }
     }
     auto sync_rowset_time_us = MonotonicMicros() - t2;
@@ -200,10 +197,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
             LOG(WARNING) << "version not continuous, current max version=" << 
max_version
                          << ", request_version=" << _version << " tablet_id=" 
<< _tablet_id;
         }
-        auto error_st =
-                Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, 
false>("version not continuous");
-        _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
error_st);
-        return error_st;
+        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, 
false>("version not continuous");
     }
 
     RowsetSharedPtr rowset;
@@ -219,7 +213,6 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
     if (status != Status::OK()) {
         LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << 
_tablet_id
                      << ", txn_id=" << _transaction_id << ", status=" << 
status;
-        _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
status);
         return status;
     }
 
@@ -270,6 +263,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
             // delete bitmap cache missed, should re-calculate delete bitmaps 
between segments
             std::vector<segment_v2::SegmentSharedPtr> segments;
             
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+            
DBUG_EXECUTE_IF("_handle_rowset.inject.before.calc_between_segments", {
+                LOG_INFO("inject error when 
CloudTabletCalcDeleteBitmapTask::_handle_rowset");
+                return Status::MemoryLimitExceeded("injected 
MemoryLimitExceeded error");
+            });
             RETURN_IF_ERROR(
                     tablet->calc_delete_bitmap_between_segments(rowset, 
segments, delete_bitmap));
         }
@@ -281,11 +278,9 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
         LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << 
rowset->rowset_id()
                      << ", tablet_id=" << _tablet_id << ", txn_id=" << 
_transaction_id
                      << ", status=" << status;
-        _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
status);
         return status;
     }
 
-    _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
     LOG(INFO) << "calculate delete bitmap successfully on tablet"
               << ", table_id=" << tablet->table_id() << ", transaction_id=" << 
_transaction_id
               << ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << 
rowset->num_rows()
@@ -293,7 +288,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
               << ", sync_rowset_time_us=" << sync_rowset_time_us
               << ", update_delete_bitmap_time_us=" << 
update_delete_bitmap_time_us
               << ", res=" << status;
-    return status;
+    return Status::OK();
 }
 
 #include "common/compile_check_end.h"
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
index 62bd91b0a8a..cbff46a0b2c 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -33,8 +33,7 @@ class MemTrackerLimiter;
 
 class CloudTabletCalcDeleteBitmapTask {
 public:
-    CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
-                                    CloudEngineCalcDeleteBitmapTask* 
engine_task, int64_t tablet_id,
+    CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, int64_t 
tablet_id,
                                     int64_t transaction_id, int64_t version);
     ~CloudTabletCalcDeleteBitmapTask() = default;
 
@@ -46,7 +45,6 @@ public:
 
 private:
     CloudStorageEngine& _engine;
-    CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
 
     int64_t _tablet_id;
     int64_t _transaction_id;
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
index b741a6e9986..e6166d6c41a 100644
--- 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy
@@ -58,6 +58,36 @@ suite("test_cloud_multi_segments_re_calc_in_publish", 
"nonConcurrent") {
         assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
     }
 
+    def loadMultiSegmentData = { tableName, rows, succ, String err="" ->
+        // load data that will have multi segments and there are duplicate 
keys between segments
+        String content = ""
+        (1..rows).each {
+            content += "${it},${it},${it}\n"
+        }
+        content += content
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            inputStream new ByteArrayInputStream(content.getBytes())
+            time 30000
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                def json = parseJson(result)
+                if (succ) {
+                    assert "success" == json.Status.toLowerCase()
+                    assert rows*2 == json.NumberTotalRows
+                    assert 0 == json.NumberFilteredRows
+                } else {
+                    assert "fail" == json.Status.toLowerCase()
+                    assert json.Message.contains(err)
+                }
+            }
+        }
+    }
+ 
     // to cause multi segments
     def customBeConfig = [
         doris_scanner_row_bytes : 1
@@ -75,33 +105,7 @@ suite("test_cloud_multi_segments_re_calc_in_publish", 
"nonConcurrent") {
 
             Thread.sleep(1000)
 
-            def t1 = Thread.start {
-                // load data that will have multi segments and there are 
duplicate keys between segments
-                String content = ""
-                (1..4096).each {
-                    content += "${it},${it},${it}\n"
-                }
-                content += content
-                streamLoad {
-                    table "${table1}"
-                    set 'column_separator', ','
-                    inputStream new ByteArrayInputStream(content.getBytes())
-                    time 30000 // limit inflight 10s
-
-                    check { result, exception, startTime, endTime ->
-                        if (exception != null) {
-                            throw exception
-                        }
-                        def json = parseJson(result)
-                        assert "success" == json.Status.toLowerCase()
-                        assert 8192 == json.NumberTotalRows
-                        assert 0 == json.NumberFilteredRows
-                    }
-                }
-            }
-
-
-            t1.join()
+            loadMultiSegmentData(table1, 4096, true)
 
             GetDebugPoint().clearDebugPointsForAllBEs()
             Thread.sleep(2000)
@@ -120,4 +124,51 @@ suite("test_cloud_multi_segments_re_calc_in_publish", 
"nonConcurrent") {
             GetDebugPoint().clearDebugPointsForAllFEs()
         }
     }
+
+    // abnormal case, fail when calc between segments
+    def table2 = "test_cloud_multi_segments_re_calc_in_publish_fail"
+    sql "DROP TABLE IF EXISTS ${table2} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table2} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table2} values(99999,99999,99999);"
+    sql "insert into ${table2} values(88888,88888,88888);"
+    sql "insert into ${table2} values(77777,77777,77777);"
+    sql "sync;"
+    qt_sql "select * from ${table2} order by k1;"
+
+    setBeConfigTemporary(customBeConfig) {
+        try {
+            GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss")
+
+            // fail in calc_delete_bitmap_between_segments
+            
GetDebugPoint().enableDebugPointForAllBEs("_handle_rowset.inject.before.calc_between_segments")
+
+            Thread.sleep(1000)
+
+            loadMultiSegmentData(table2, 4096, false, "injected 
MemoryLimitExceeded error")
+
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            Thread.sleep(2000)
+
+            qt_sql "select count() from ${table2};"
+
+            qt_dup_key_count "select count() from (select k1,count() as cnt 
from ${table2} group by k1 having cnt > 1) A;"
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to