This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f9c7c035cfd [Fix](merge-on-write) Fix FE may use the staled response 
to wrongly commit txn (#39018)
f9c7c035cfd is described below

commit f9c7c035cfdeba58635b102976637761582398cc
Author: bobhan1 <[email protected]>
AuthorDate: Sat Aug 10 09:31:56 2024 +0800

    [Fix](merge-on-write) Fix FE may use the staled response to wrongly commit 
txn (#39018)
    
    ## Problem
    
    consider the following scenarios for merge-on-write table in cloud mode
    ### Scenario 1: Load-Load Conflict
    1. load txn1 tries to commit version n and gets the delete bitmap update
    lock
    2. load txn1 begins to calculate delete bitmap on BEs, this is a heavy
    calculating process and lasts long
    3. load txn2 tries to commit version n and gets the delete bitmap update
    lock because load txn1's delete bitmap update lock has expired
    4. load txn1's delete bitmap update lock expires and load txn2 get the
    delete bitmap update lock
    5. load txn2 commits successfully with version n and release the delete
    bitmap update lock
    6. load txn1 fails to commit due to timeout of the calculation of delete
    bitmap
    7. load txn1 retries the commit process with version n+1, gets the
    bitmap update lock and sends delete bitmap calculation task to BEs
    8. BE fails to register this new calculation task because there is a
    task with the same signatrure(txn_id) running in the task_worker_pool
    9. BE finishes the calculation of delete bitmap and report success
    status to FE
    10. load txn1 commits successfully with n+1
    
    Finally, load txn1 failed to calculate delete bitmap for version n from
    load txn2
    ### Scenario 2: Load-Compaction Conflict
    1. load txn tries to commit and gets the delete bitmap update lock
    2. load txn collects rowset_ids and submit a delete bitmap calculation
    task to the threadpool for the diff rowsets. But the theadpool is full,
    so the task is queued in the threadpool.
    3. load txn's delete bitmap update lock expired and a compaction job on
    the same tablet finished successfully.
    4. load txn fails to commit due to timeout of the calculation of delete
    bitmap
    5. load txn retries the commit process, gets the bitmap update lock and
    sends delete bitmap calculation task to BEs
    6. BE fails to register this new calculation task because there is a
    task with the same signatrure(txn_id) running in the task_worker_pool
    7. BE finishes the calculation of delete bitmap and report success
    status to FE
    8. load txn1 commits successfully
    
    Finally, load txn failed to calculate delete bitmap for the compaction
    produced by compaction
    
    ## Solution
    The root cause of the above failures is that when the commit process is
    retried many times, FE may use the previous stale success response from
    BEs and commit txns. One solution for that problem is that FE attaches
    an unique id within the delete bitmap calculation task sent to BE and BE
    takes it in the response for FE to check if the response is for the
    current latest task. However, if the delete bitmap calculation always
    consumes more time than the timeout of the delete bitmap calculation, FE
    will retry the commit process infinitely which causes live lock.
    
    This PR let the BE's response take the compaction stats(to avoid
    load-compaction conflict) and versions(to avoid load-load conflict) from
    the task request and let the FE compares it with the current task's to
    know that if there is any compaction or load finished during the time
    periods since the current load get the delete bitmap lock due to lock
    expiration. If so, the current txn should retry or abort. If not, the
    current txn can commit successfully.
---
 be/src/agent/task_worker_pool.cpp                  |   1 +
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp |  16 ++-
 be/src/cloud/cloud_meta_mgr.cpp                    |  23 +++-
 be/src/cloud/cloud_tablet.cpp                      |   9 +-
 be/src/cloud/cloud_txn_delete_bitmap_cache.cpp     |  19 ++-
 be/src/cloud/cloud_txn_delete_bitmap_cache.h       |  11 +-
 be/src/olap/base_tablet.cpp                        |  28 +++--
 be/src/olap/base_tablet.h                          |   1 -
 be/src/olap/tablet_meta.cpp                        |  10 ++
 be/src/olap/tablet_meta.h                          |   2 +
 be/src/olap/txn_manager.h                          |  32 +++--
 .../java/org/apache/doris/master/MasterImpl.java   |  11 ++
 .../apache/doris/task/CalcDeleteBitmapTask.java    |   4 +
 gensrc/thrift/MasterService.thrift                 |   2 +
 ...oud_mow_stale_resp_load_compaction_conflict.out |  16 +++
 ...est_cloud_mow_stale_resp_load_load_conflict.out |  16 +++
 ..._mow_stale_resp_load_compaction_conflict.groovy | 129 +++++++++++++++++++++
 ..._cloud_mow_stale_resp_load_load_conflict.groovy |  97 ++++++++++++++++
 18 files changed, 391 insertions(+), 36 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 7bbd602f571..8a6378794e9 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -2052,6 +2052,7 @@ void calc_delete_bitmap_callback(CloudStorageEngine& 
engine, const TAgentTaskReq
     finish_task_request.__set_signature(req.signature);
     finish_task_request.__set_report_version(s_report_version);
     finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+    
finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions);
 
     finish_task(finish_task_request);
     remove_task_info(req.task_type, req.signature);
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 5d1a957d14d..b6c9aa318f3 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -186,9 +186,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
     std::shared_ptr<PartialUpdateInfo> partial_update_info;
     std::shared_ptr<PublishStatus> publish_status;
     int64_t txn_expiration;
+    TxnPublishInfo previous_publish_info;
     Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
             _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, 
&txn_expiration,
-            &partial_update_info, &publish_status);
+            &partial_update_info, &publish_status, &previous_publish_info);
     if (status != Status::OK()) {
         LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << 
_tablet_id
                      << ", txn_id=" << _transaction_id << ", status=" << 
status;
@@ -204,8 +205,19 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
     txn_info.rowset_ids = rowset_ids;
     txn_info.partial_update_info = partial_update_info;
     txn_info.publish_status = publish_status;
+    txn_info.publish_info = {.publish_version = _version,
+                             .base_compaction_cnt = _ms_base_compaction_cnt,
+                             .cumulative_compaction_cnt = 
_ms_cumulative_compaction_cnt,
+                             .cumulative_point = _ms_cumulative_point};
     auto update_delete_bitmap_time_us = 0;
-    if (txn_info.publish_status && (*(txn_info.publish_status) == 
PublishStatus::SUCCEED)) {
+    if (txn_info.publish_status && (*(txn_info.publish_status) == 
PublishStatus::SUCCEED) &&
+        _version == previous_publish_info.publish_version &&
+        _ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
+        _ms_cumulative_compaction_cnt == 
previous_publish_info.cumulative_compaction_cnt &&
+        _ms_cumulative_point == previous_publish_info.cumulative_point) {
+        // if version or compaction stats can't match, it means that this is a 
retry and there are
+        // compaction or other loads finished successfully on the same tablet. 
So the previous publish
+        // is stale and we should re-calculate the delete bitmap
         LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
                   << ",publish_status=SUCCEED,not need to recalculate and 
update delete_bitmap.";
     } else {
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index e743ea9b12c..88725b17786 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -559,14 +559,29 @@ bool 
CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64
         }
         txn_processed.insert(txn_id);
         DeleteBitmapPtr tmp_delete_bitmap;
-        RowsetIdUnorderedSet tmp_rowset_ids;
         std::shared_ptr<PublishStatus> publish_status =
                 std::make_shared<PublishStatus>(PublishStatus::INIT);
         CloudStorageEngine& engine = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
         Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
-                txn_id, tablet->tablet_id(), &tmp_delete_bitmap, 
&tmp_rowset_ids, &publish_status);
-        if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
-            delete_bitmap->merge(*tmp_delete_bitmap);
+                txn_id, tablet->tablet_id(), &tmp_delete_bitmap, nullptr, 
&publish_status);
+        // CloudMetaMgr::sync_tablet_delete_bitmap_by_cache() is called after 
we sync rowsets from meta services.
+        // If the control flows reaches here, it's gauranteed that the rowsets 
is commited in meta services, so we can
+        // use the delete bitmap from cache directly if *publish_status == 
PublishStatus::SUCCEED without checking other
+        // stats(version or compaction stats)
+        if (status.ok() && *publish_status == PublishStatus::SUCCEED) {
+            // tmp_delete_bitmap contains sentinel marks, we should remove it 
before merge it to delete bitmap.
+            // Also, the version of delete bitmap key in tmp_delete_bitmap is 
DeleteBitmap::TEMP_VERSION_COMMON,
+            // we should replace it with the rowset's real version
+            DCHECK(rs_meta.start_version() == rs_meta.end_version());
+            int64_t rowset_version = rs_meta.start_version();
+            for (const auto& [delete_bitmap_key, bitmap_value] : 
tmp_delete_bitmap->delete_bitmap) {
+                // skip sentinel mark, which is used for delete bitmap 
correctness check
+                if (std::get<1>(delete_bitmap_key) != 
DeleteBitmap::INVALID_SEGMENT_ID) {
+                    delete_bitmap->merge({std::get<0>(delete_bitmap_key),
+                                          std::get<1>(delete_bitmap_key), 
rowset_version},
+                                         bitmap_value);
+                }
+            }
             
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
                                                                            
tablet->tablet_id());
         } else {
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 2c6b841be54..7f308ddb7be 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -680,8 +680,13 @@ Status CloudTablet::save_delete_bitmap(const 
TabletTxnInfo* txn_info, int64_t tx
 
     RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
             *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, 
new_delete_bitmap.get()));
-    _engine.txn_delete_bitmap_cache().update_tablet_txn_info(
-            txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, 
PublishStatus::SUCCEED);
+
+    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache 
because if the txn is retried for some reason,
+    // it will use the delete bitmap from txn_delete_bitmap_cache when 
re-calculating the delete bitmap, during which it will do
+    // delete bitmap correctness check. If we store the new_delete_bitmap, the 
delete bitmap correctness check will fail
+    _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, 
tablet_id(), delete_bitmap,
+                                                             cur_rowset_ids, 
PublishStatus::SUCCEED,
+                                                             
txn_info->publish_info);
 
     return Status::OK();
 }
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index 583992e76f7..c6a3b54edc3 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -27,6 +27,7 @@
 #include "cpp/sync_point.h"
 #include "olap/olap_common.h"
 #include "olap/tablet_meta.h"
+#include "olap/txn_manager.h"
 
 namespace doris {
 
@@ -54,7 +55,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
         TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* 
rowset,
         DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, 
int64_t* txn_expiration,
         std::shared_ptr<PartialUpdateInfo>* partial_update_info,
-        std::shared_ptr<PublishStatus>* publish_status) {
+        std::shared_ptr<PublishStatus>* publish_status, TxnPublishInfo* 
previous_publish_info) {
     {
         std::shared_lock<std::shared_mutex> rlock(_rwlock);
         TxnKey key(transaction_id, tablet_id);
@@ -68,6 +69,7 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
         *txn_expiration = iter->second.txn_expiration;
         *partial_update_info = iter->second.partial_update_info;
         *publish_status = iter->second.publish_status;
+        *previous_publish_info = iter->second.publish_info;
     }
     RETURN_IF_ERROR(
             get_delete_bitmap(transaction_id, tablet_id, delete_bitmap, 
rowset_ids, nullptr));
@@ -96,7 +98,9 @@ Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
             handle == nullptr ? nullptr : 
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
     if (val) {
         *delete_bitmap = val->delete_bitmap;
-        *rowset_ids = val->rowset_ids;
+        if (rowset_ids) {
+            *rowset_ids = val->rowset_ids;
+        }
         // must call release handle to reduce the reference count,
         // otherwise there will be memory leak
         release(handle);
@@ -153,12 +157,17 @@ void 
CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId transactio
                                                        int64_t tablet_id,
                                                        DeleteBitmapPtr 
delete_bitmap,
                                                        const 
RowsetIdUnorderedSet& rowset_ids,
-                                                       PublishStatus 
publish_status) {
+                                                       PublishStatus 
publish_status,
+                                                       TxnPublishInfo 
publish_info) {
     {
         std::unique_lock<std::shared_mutex> wlock(_rwlock);
         TxnKey txn_key(transaction_id, tablet_id);
-        CHECK(_txn_map.count(txn_key) > 0);
-        *(_txn_map[txn_key].publish_status.get()) = publish_status;
+        CHECK(_txn_map.contains(txn_key));
+        TxnVal& txn_val = _txn_map[txn_key];
+        *(txn_val.publish_status) = publish_status;
+        if (publish_status == PublishStatus::SUCCEED) {
+            txn_val.publish_info = publish_info;
+        }
     }
     std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
     CacheKey key(key_str);
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
index 5012db6b8e5..75577ae2e3f 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
@@ -42,7 +42,8 @@ public:
                                RowsetSharedPtr* rowset, DeleteBitmapPtr* 
delete_bitmap,
                                RowsetIdUnorderedSet* rowset_ids, int64_t* 
txn_expiration,
                                std::shared_ptr<PartialUpdateInfo>* 
partial_update_info,
-                               std::shared_ptr<PublishStatus>* publish_status);
+                               std::shared_ptr<PublishStatus>* publish_status,
+                               TxnPublishInfo* previous_publish_info);
 
     void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
                              DeleteBitmapPtr delete_bitmap, const 
RowsetIdUnorderedSet& rowset_ids,
@@ -52,12 +53,16 @@ public:
     void update_tablet_txn_info(TTransactionId transaction_id, int64_t 
tablet_id,
                                 DeleteBitmapPtr delete_bitmap,
                                 const RowsetIdUnorderedSet& rowset_ids,
-                                PublishStatus publish_status);
+                                PublishStatus publish_status, TxnPublishInfo 
publish_info = {});
 
     void remove_expired_tablet_txn_info();
 
     void remove_unused_tablet_txn_info(TTransactionId transaction_id, int64_t 
tablet_id);
 
+    // !!!ATTENTION!!!: the delete bitmap stored in CloudTxnDeleteBitmapCache 
contains sentinel marks,
+    // and the version in BitmapKey is DeleteBitmap::TEMP_VERSION_COMMON.
+    // when using delete bitmap from this cache, the caller should manually 
remove these marks if don't need it
+    // and should replace versions in BitmapKey by the correct version
     Status get_delete_bitmap(TTransactionId transaction_id, int64_t tablet_id,
                              DeleteBitmapPtr* delete_bitmap, 
RowsetIdUnorderedSet* rowset_ids,
                              std::shared_ptr<PublishStatus>* publish_status);
@@ -88,6 +93,8 @@ private:
         int64_t txn_expiration;
         std::shared_ptr<PartialUpdateInfo> partial_update_info;
         std::shared_ptr<PublishStatus> publish_status = nullptr;
+        // used to determine if the retry needs to re-calculate the delete 
bitmap
+        TxnPublishInfo publish_info;
         TxnVal() : txn_expiration(0) {};
         TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
                std::shared_ptr<PartialUpdateInfo> partial_update_info_,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 4ca36684383..0fb12dd074f 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1208,17 +1208,6 @@ Status 
BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
     return Status::OK();
 }
 
-void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap) {
-    for (auto it = delete_bitmap->delete_bitmap.begin(), end = 
delete_bitmap->delete_bitmap.end();
-         it != end;) {
-        if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
-            it = delete_bitmap->delete_bitmap.erase(it);
-        } else {
-            ++it;
-        }
-    }
-}
-
 Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, 
TabletTxnInfo* txn_info,
                                         int64_t txn_id, int64_t 
txn_expiration) {
     SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
@@ -1296,6 +1285,21 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
         }
     }
 
+    DBUG_EXECUTE_IF("BaseTablet::update_delete_bitmap.enable_spin_wait", {
+        auto token = dp->param<std::string>("token", "invalid_token");
+        while 
(DebugPoints::instance()->is_enable("BaseTablet::update_delete_bitmap.block")) {
+            auto block_dp = DebugPoints::instance()->get_debug_point(
+                    "BaseTablet::update_delete_bitmap.block");
+            if (block_dp) {
+                auto wait_token = block_dp->param<std::string>("wait_token", 
"");
+                if (wait_token != token) {
+                    break;
+                }
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(50));
+        }
+    });
+
     if (!rowsets_skip_alignment.empty()) {
         auto token = self->calc_delete_bitmap_executor()->create_token();
         // set rowset_writer to nullptr to skip the alignment process
@@ -1544,7 +1548,7 @@ Status BaseTablet::update_delete_bitmap_without_lock(
         if (!st.ok()) {
             LOG(WARNING) << fmt::format("delete bitmap correctness check 
failed in publish phase!");
         }
-        self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
+        delete_bitmap->remove_sentinel_marks();
     }
     for (auto& iter : delete_bitmap->delete_bitmap) {
         self->_tablet_meta->delete_bitmap().merge(
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index f958d398fd5..d329c786fc9 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -289,7 +289,6 @@ protected:
     static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur,
                                        const RowsetIdUnorderedSet& pre,
                                        RowsetIdUnorderedSet* to_add, 
RowsetIdUnorderedSet* to_del);
-    static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap);
 
     Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& 
version_path,
                                                 std::vector<RowsetSharedPtr>* 
rowsets) const;
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index a3526781ddd..ed9a446551d 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -1080,6 +1080,16 @@ bool DeleteBitmap::contains_agg_without_cache(const 
BitmapKey& bmk, uint32_t row
     return false;
 }
 
+void DeleteBitmap::remove_sentinel_marks() {
+    for (auto it = delete_bitmap.begin(), end = delete_bitmap.end(); it != 
end;) {
+        if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+            it = delete_bitmap.erase(it);
+        } else {
+            ++it;
+        }
+    }
+}
+
 int DeleteBitmap::set(const BitmapKey& bmk, const roaring::Roaring& 
segment_delete_bitmap) {
     std::lock_guard l(lock);
     auto [_, inserted] = delete_bitmap.insert_or_assign(bmk, 
segment_delete_bitmap);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 32c6fde568c..bb6b5b8cd51 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -516,6 +516,8 @@ public:
      */
     std::shared_ptr<roaring::Roaring> get_agg(const BitmapKey& bmk) const;
 
+    void remove_sentinel_marks();
+
     class AggCachePolicy : public LRUCachePolicyTrackingManual {
     public:
         AggCachePolicy(size_t capacity)
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 5a0a74c76a2..5944bbf0fc3 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -63,6 +63,13 @@ enum class TxnState {
 };
 enum class PublishStatus { INIT = 0, PREPARE = 1, SUCCEED = 2 };
 
+struct TxnPublishInfo {
+    int64_t publish_version {-1};
+    int64_t base_compaction_cnt {-1};
+    int64_t cumulative_compaction_cnt {-1};
+    int64_t cumulative_point {-1};
+};
+
 struct TabletTxnInfo {
     PUniqueId load_id;
     RowsetSharedPtr rowset;
@@ -74,24 +81,33 @@ struct TabletTxnInfo {
     int64_t creation_time;
     bool ingest {false};
     std::shared_ptr<PartialUpdateInfo> partial_update_info;
+
+    // for cloud only, used to determine if a retry 
CloudTabletCalcDeleteBitmapTask
+    // needs to re-calculate the delete bitmap
     std::shared_ptr<PublishStatus> publish_status;
-    TxnState state {TxnState::PREPARED};
+    TxnPublishInfo publish_info;
 
+    TxnState state {TxnState::PREPARED};
     TabletTxnInfo() = default;
 
     TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
-            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
+            : load_id(std::move(load_id)),
+              rowset(std::move(rowset)),
+              creation_time(UnixSeconds()) {}
 
     TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool ingest_arg)
-            : load_id(load_id), rowset(rowset), creation_time(UnixSeconds()), 
ingest(ingest_arg) {}
+            : load_id(std::move(load_id)),
+              rowset(std::move(rowset)),
+              creation_time(UnixSeconds()),
+              ingest(ingest_arg) {}
 
     TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool 
merge_on_write,
-                  DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& 
ids)
-            : load_id(load_id),
-              rowset(rowset),
+                  DeleteBitmapPtr delete_bitmap, RowsetIdUnorderedSet ids)
+            : load_id(std::move(load_id)),
+              rowset(std::move(rowset)),
               unique_key_merge_on_write(merge_on_write),
-              delete_bitmap(delete_bitmap),
-              rowset_ids(ids),
+              delete_bitmap(std::move(delete_bitmap)),
+              rowset_ids(std::move(ids)),
               creation_time(UnixSeconds()) {}
 
     void prepare() { state = TxnState::PREPARED; }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 0eef0c684d6..4e01f3a5058 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -671,6 +671,17 @@ public class MasterImpl {
                         "backend: " + task.getBackendId() + ", 
error_tablet_size: "
                                 + request.getErrorTabletIdsSize() + ", 
err_msg: "
                                 + 
request.getTaskStatus().getErrorMsgs().toString());
+            } else if (request.isSetRespPartitions()
+                    && 
calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) {
+                LOG.warn("get staled response from backend: {}, report 
version: {}. calcDeleteBitmapTask's"
+                        + "partitionInfos: {}. response's partitionInfos: {}", 
task.getBackendId(),
+                                request.getReportVersion(),
+                                        
calcDeleteBitmapTask.getCalcDeleteBimapPartitionInfos().toString(),
+                                                
request.getRespPartitions().toString());
+                // DELETE_BITMAP_LOCK_ERROR will be retried
+                
calcDeleteBitmapTask.countDownToZero(TStatusCode.DELETE_BITMAP_LOCK_ERROR,
+                        "get staled response from backend " + 
task.getBackendId() + ", report version: "
+                                + request.getReportVersion());
             } else {
                 calcDeleteBitmapTask.countDownLatch(task.getBackendId(), 
calcDeleteBitmapTask.getTransactionId());
                 if (LOG.isDebugEnabled()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
index 4188cf61849..49a653c7a32 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
@@ -79,6 +79,10 @@ public class CalcDeleteBitmapTask extends AgentTask  {
         }
     }
 
+    public boolean isFinishRequestStale(List<TCalcDeleteBitmapPartitionInfo> 
respPartitionInfos) {
+        return !respPartitionInfos.equals(partitionInfos);
+    }
+
     public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
         this.latch = latch;
     }
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 1db7a109f55..ecedf0ee1af 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -72,6 +72,8 @@ struct TFinishTaskRequest {
     17: optional map<Types.TTabletId, Types.TVersion> succ_tablets
     18: optional map<i64, i64> table_id_to_delta_num_rows
     19: optional map<i64, map<i64, i64>> 
table_id_to_tablet_id_to_delta_num_rows
+    // for Cloud mow table only, used by FE to check if the response is for 
the latest request
+    20: optional list<AgentService.TCalcDeleteBitmapPartitionInfo> 
resp_partitions;
 }
 
 struct TTablet {
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out
new file mode 100644
index 00000000000..09882a909b3
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1
+2      2       2
+3      3       3
+
+-- !sql --
+1      1       1
+2      2       2
+3      3       3
+
+-- !sql --
+1      999     999
+2      888     888
+3      3       3
+
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out
new file mode 100644
index 00000000000..6fd2178fd94
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.out
@@ -0,0 +1,16 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+1      1       1
+2      2       2
+3      3       3
+
+-- !sql --
+1      666     666
+2      555     555
+3      3       3
+
+-- !sql --
+1      999     999
+2      888     888
+3      3       3
+
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy
new file mode 100644
index 00000000000..8f4fa45700b
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_compaction_conflict.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_mow_stale_resp_load_compaction_conflict", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def customFeConfig = [
+        delete_bitmap_lock_expiration_seconds : 10,
+        calculate_delete_bitmap_task_timeout_seconds : 15,
+    ]
+
+    setFeConfigTemporary(customFeConfig) {
+
+        def table1 = "test_cloud_mow_stale_resp_load_compaction_conflict"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(1,1,1);"
+        sql "insert into ${table1} values(2,2,2);"
+        sql "insert into ${table1} values(3,3,3);"
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+
+        def beNodes = sql_return_maparray("show backends;")
+        def tabletStat = sql_return_maparray("show tablets from 
${table1};").get(0)
+        def tabletBackendId = tabletStat.BackendId
+        def tabletId = tabletStat.TabletId
+        def tabletBackend;
+        for (def be : beNodes) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+
+
+        try {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+
+            // block the first load
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait",
 [token: "token1"])
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block",
 [wait_token: "token1"])
+
+            // the first load
+            t1 = Thread.start {
+                sql "insert into ${table1} values(1,999,999),(2,888,888);"
+            }
+
+            // wait util the first load's delete bitmap update lock expired
+            // to ensure that the second load can take the delete bitmap 
update lock
+            // Config.delete_bitmap_lock_expiration_seconds = 10s
+            Thread.sleep(11 * 1000)
+
+            // trigger full compaction on tablet
+            logger.info("trigger compaction on another BE 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+            def (code, out, err) = be_run_full_compaction(tabletBackend.Host, 
tabletBackend.HttpPort, tabletId)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            Assert.assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            Assert.assertEquals("success", compactJson.status.toLowerCase())
+
+            // wait for full compaction to complete
+            Awaitility.await().atMost(3, TimeUnit.SECONDS).pollDelay(200, 
TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).until(
+                {
+                    (code, out, err) = 
be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+                    logger.info("Get compaction status: code=" + code + ", 
out=" + out + ", err=" + err)
+                    Assert.assertEquals(code, 0)
+                    def compactionStatus = parseJson(out.trim())
+                    Assert.assertEquals("success", 
compactionStatus.status.toLowerCase())
+                    return !compactionStatus.run_status
+                }
+            )
+            order_qt_sql "select * from ${table1};"
+
+
+            // keep waiting util the delete bitmap calculation 
timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s)
+            // and the coordinator BE will retry to commit the first load's txn
+            Thread.sleep(15 * 1000)
+
+            // let the first partial update load finish
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block")
+            t1.join()
+
+            Thread.sleep(1000)
+
+            order_qt_sql "select * from ${table1};"
+            
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+
+        sql "DROP TABLE IF EXISTS ${table1};"
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy
new file mode 100644
index 00000000000..377ff70cf21
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stale_resp_load_load_conflict.groovy
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_stale_resp_load_load_conflict", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def customFeConfig = [
+        delete_bitmap_lock_expiration_seconds : 10,
+        calculate_delete_bitmap_task_timeout_seconds : 15,
+    ]
+
+    setFeConfigTemporary(customFeConfig) {
+
+        def table1 = "test_cloud_mow_stale_resp_load_load_conflict"
+        sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+        sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                `k1` int NOT NULL,
+                `c1` int,
+                `c2` int
+                )UNIQUE KEY(k1)
+            DISTRIBUTED BY HASH(k1) BUCKETS 1
+            PROPERTIES (
+                "enable_unique_key_merge_on_write" = "true",
+                "disable_auto_compaction" = "true",
+                "replication_num" = "1"); """
+
+        sql "insert into ${table1} values(1,1,1);"
+        sql "insert into ${table1} values(2,2,2);"
+        sql "insert into ${table1} values(3,3,3);"
+        sql "sync;"
+        order_qt_sql "select * from ${table1};"
+
+        try {
+            GetDebugPoint().clearDebugPointsForAllFEs()
+            GetDebugPoint().clearDebugPointsForAllBEs()
+
+            // block the first load
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait",
 [token: "token1"])
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block",
 [wait_token: "token1"])
+
+            // the first load
+            t1 = Thread.start {
+                sql "insert into ${table1} values(1,999,999),(2,888,888);"
+            }
+
+            // wait util the first load's delete bitmap update lock expired
+            // to ensure that the second load can take the delete bitmap 
update lock
+            // Config.delete_bitmap_lock_expiration_seconds = 10s
+            Thread.sleep(11 * 1000)
+
+            // the second load
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.enable_spin_wait",
 [token: "token2"])
+            Thread.sleep(200)
+
+            sql "insert into ${table1}(k1,c1,c2) 
values(1,666,666),(2,555,555);"
+
+            order_qt_sql "select * from ${table1};"
+
+
+            // keep waiting util the delete bitmap calculation 
timeout(Config.calculate_delete_bitmap_task_timeout_seconds = 15s)
+            // and the coordinator BE will retry to commit the first load's txn
+            Thread.sleep(15 * 1000)
+
+            // let the first partial update load finish
+            
GetDebugPoint().enableDebugPointForAllBEs("BaseTablet::update_delete_bitmap.block")
+            t1.join()
+
+            Thread.sleep(1000)
+
+            order_qt_sql "select * from ${table1};"
+            
+        } catch(Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            GetDebugPoint().clearDebugPointsForAllBEs()
+        }
+
+        sql "DROP TABLE IF EXISTS ${table1};"
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to