This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3714bf5561 [fix](cloud) cloud mode support txn load for mow tables 
(#41932)
d3714bf5561 is described below

commit d3714bf55619fffed08250687f614dcaea271458
Author: meiyi <[email protected]>
AuthorDate: Tue Nov 26 22:55:56 2024 +0800

    [fix](cloud) cloud mode support txn load for mow tables (#41932)
---
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 106 ++++++++++--
 .../cloud/cloud_engine_calc_delete_bitmap_task.h   |   9 +-
 be/src/cloud/cloud_tablet.cpp                      |   6 +-
 be/src/cloud/cloud_tablet.h                        |   3 +-
 be/src/olap/base_tablet.cpp                        |  60 +++++--
 be/src/olap/base_tablet.h                          |  15 +-
 be/src/olap/calc_delete_bitmap_executor.cpp        |   6 +-
 be/src/olap/calc_delete_bitmap_executor.h          |   3 +-
 be/src/olap/tablet.cpp                             |   2 +-
 be/src/olap/tablet.h                               |   3 +-
 be/src/olap/tablet_meta.h                          |   1 +
 be/src/olap/txn_manager.h                          |   6 +
 cloud/src/meta-service/meta_service_txn.cpp        | 181 +++++++++------------
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../transaction/CloudGlobalTransactionMgr.java     | 161 ++++++++++++------
 .../apache/doris/transaction/TransactionEntry.java |   9 -
 gensrc/thrift/AgentService.thrift                  |   1 +
 .../suites/insert_p0/transaction/txn_insert.groovy |  11 +-
 .../txn_insert_concurrent_insert_mow.groovy        | 135 +++++++++++++++
 .../txn_insert_concurrent_insert_aggregate.groovy  |   2 +-
 .../txn_insert_concurrent_insert_duplicate.groovy  |   2 +-
 .../txn_insert_concurrent_insert_mor.groovy        |   2 +-
 .../txn_insert_concurrent_insert_mow.groovy        |  13 +-
 .../txn_insert_concurrent_insert_ud.groovy         |   7 +-
 .../txn_insert_concurrent_insert_update.groovy     |   2 +-
 .../txn_insert_with_schema_change.groovy           |   2 +-
 26 files changed, 511 insertions(+), 240 deletions(-)

diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 6abc3958650..7391449b73f 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -75,7 +75,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
         for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
             auto tablet_id = partition.tablet_ids[i];
             auto tablet_calc_delete_bitmap_ptr = 
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
-                    _engine, this, tablet_id, transaction_id, version);
+                    _engine, this, tablet_id, transaction_id, version, 
partition.sub_txn_ids);
             if (has_compaction_stats) {
                 tablet_calc_delete_bitmap_ptr->set_compaction_stats(
                         partition.base_compaction_cnts[i], 
partition.cumulative_compaction_cnts[i],
@@ -107,12 +107,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
 
 CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
         CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* 
engine_task, int64_t tablet_id,
-        int64_t transaction_id, int64_t version)
+        int64_t transaction_id, int64_t version, const std::vector<int64_t>& 
sub_txn_ids)
         : _engine(engine),
           _engine_calc_delete_bitmap_task(engine_task),
           _tablet_id(tablet_id),
           _transaction_id(transaction_id),
-          _version(version) {
+          _version(version),
+          _sub_txn_ids(sub_txn_ids) {
     _mem_tracker = MemTrackerLimiter::create_shared(
             MemTrackerLimiter::Type::OTHER,
             fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", 
_transaction_id));
@@ -189,6 +190,60 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
         return error_st;
     }
 
+    int64_t t3 = MonotonicMicros();
+    Status status;
+    if (_sub_txn_ids.empty()) {
+        status = _handle_rowset(tablet, _version);
+    } else {
+        std::stringstream ss;
+        for (const auto& sub_txn_id : _sub_txn_ids) {
+            ss << sub_txn_id << ", ";
+        }
+        LOG(INFO) << "start calc delete bitmap for txn_id=" << _transaction_id 
<< ", sub_txn_ids=["
+                  << ss.str() << "], table_id=" << tablet->table_id()
+                  << ", partition_id=" << tablet->partition_id() << ", 
tablet_id=" << _tablet_id
+                  << ", start_version=" << _version;
+        std::vector<RowsetSharedPtr> invisible_rowsets;
+        DeleteBitmapPtr tablet_delete_bitmap =
+                
std::make_shared<DeleteBitmap>(tablet->tablet_meta()->delete_bitmap());
+        for (int i = 0; i < _sub_txn_ids.size(); ++i) {
+            int64_t sub_txn_id = _sub_txn_ids[i];
+            int64_t version = _version + i;
+            LOG(INFO) << "start calc delete bitmap for txn_id=" << 
_transaction_id
+                      << ", sub_txn_id=" << sub_txn_id << ", table_id=" << 
tablet->table_id()
+                      << ", partition_id=" << tablet->partition_id() << ", 
tablet_id=" << _tablet_id
+                      << ", start_version=" << _version << ", cur_version=" << 
version;
+            status = _handle_rowset(tablet, version, sub_txn_id, 
&invisible_rowsets,
+                                    tablet_delete_bitmap);
+            if (!status.ok()) {
+                LOG(INFO) << "failed to calculate delete bitmap on tablet"
+                          << ", table_id=" << tablet->table_id()
+                          << ", transaction_id=" << _transaction_id << ", 
sub_txn_id=" << sub_txn_id
+                          << ", tablet_id=" << tablet->tablet_id() << ", start 
version=" << _version
+                          << ", cur_version=" << version << ", status=" << 
status;
+                return status;
+            }
+            DCHECK(invisible_rowsets.size() == i + 1);
+        }
+    }
+    auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
+    LOG(INFO) << "calculate delete bitmap successfully on tablet"
+              << ", table_id=" << tablet->table_id() << ", transaction_id=" << 
_transaction_id
+              << ", tablet_id=" << tablet->tablet_id()
+              << ", get_tablet_time_us=" << get_tablet_time_us
+              << ", sync_rowset_time_us=" << sync_rowset_time_us
+              << ", total_update_delete_bitmap_time_us=" << 
total_update_delete_bitmap_time_us
+              << ", res=" << status;
+    return status;
+}
+
+Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
+        std::shared_ptr<CloudTablet> tablet, int64_t version, int64_t 
sub_txn_id,
+        std::vector<RowsetSharedPtr>* invisible_rowsets,
+        DeleteBitmapPtr tablet_delete_bitmap) const {
+    int64_t transaction_id = sub_txn_id == -1 ? _transaction_id : sub_txn_id;
+    std::string txn_str = "txn_id=" + std::to_string(_transaction_id) +
+                          (sub_txn_id == -1 ? "" : ", sub_txn_id=" + 
std::to_string(sub_txn_id));
     RowsetSharedPtr rowset;
     DeleteBitmapPtr delete_bitmap;
     RowsetIdUnorderedSet rowset_ids;
@@ -197,59 +252,76 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
     int64_t txn_expiration;
     TxnPublishInfo previous_publish_info;
     Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
-            _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, 
&txn_expiration,
+            transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, 
&txn_expiration,
             &partial_update_info, &publish_status, &previous_publish_info);
     if (status != Status::OK()) {
-        LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << 
_tablet_id
-                     << ", txn_id=" << _transaction_id << ", status=" << 
status;
+        LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << 
_tablet_id << ", " << txn_str
+                     << ", status=" << status;
         _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
status);
         return status;
     }
 
     int64_t t3 = MonotonicMicros();
-    rowset->set_version(Version(_version, _version));
+    rowset->set_version(Version(version, version));
     TabletTxnInfo txn_info;
     txn_info.rowset = rowset;
     txn_info.delete_bitmap = delete_bitmap;
     txn_info.rowset_ids = rowset_ids;
     txn_info.partial_update_info = partial_update_info;
     txn_info.publish_status = publish_status;
-    txn_info.publish_info = {.publish_version = _version,
+    txn_info.publish_info = {.publish_version = version,
                              .base_compaction_cnt = _ms_base_compaction_cnt,
                              .cumulative_compaction_cnt = 
_ms_cumulative_compaction_cnt,
                              .cumulative_point = _ms_cumulative_point};
     auto update_delete_bitmap_time_us = 0;
     if (txn_info.publish_status && (*(txn_info.publish_status) == 
PublishStatus::SUCCEED) &&
-        _version == previous_publish_info.publish_version &&
+        version == previous_publish_info.publish_version &&
         _ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
         _ms_cumulative_compaction_cnt == 
previous_publish_info.cumulative_compaction_cnt &&
         _ms_cumulative_point == previous_publish_info.cumulative_point) {
         // if version or compaction stats can't match, it means that this is a 
retry and there are
         // compaction or other loads finished successfully on the same tablet. 
So the previous publish
         // is stale and we should re-calculate the delete bitmap
-        LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
+        LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
                   << ",publish_status=SUCCEED,not need to recalculate and 
update delete_bitmap.";
     } else {
-        status = CloudTablet::update_delete_bitmap(tablet, &txn_info, 
_transaction_id,
-                                                   txn_expiration);
+        if (invisible_rowsets == nullptr) {
+            status = CloudTablet::update_delete_bitmap(tablet, &txn_info, 
transaction_id,
+                                                       txn_expiration);
+        } else {
+            txn_info.is_txn_load = true;
+            txn_info.invisible_rowsets = *invisible_rowsets;
+            txn_info.lock_id = _transaction_id;
+            txn_info.next_visible_version = _version;
+            status = CloudTablet::update_delete_bitmap(tablet, &txn_info, 
transaction_id,
+                                                       txn_expiration, 
tablet_delete_bitmap);
+        }
         update_delete_bitmap_time_us = MonotonicMicros() - t3;
     }
     if (status != Status::OK()) {
         LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << 
rowset->rowset_id()
-                     << ", tablet_id=" << _tablet_id << ", txn_id=" << 
_transaction_id
-                     << ", status=" << status;
+                     << ", tablet_id=" << _tablet_id << ", " << txn_str << ", 
status=" << status;
         _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, 
status);
         return status;
     }
 
     _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
     LOG(INFO) << "calculate delete bitmap successfully on tablet"
-              << ", table_id=" << tablet->table_id() << ", transaction_id=" << 
_transaction_id
+              << ", table_id=" << tablet->table_id() << ", " << txn_str
               << ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << 
rowset->num_rows()
-              << ", get_tablet_time_us=" << get_tablet_time_us
-              << ", sync_rowset_time_us=" << sync_rowset_time_us
               << ", update_delete_bitmap_time_us=" << 
update_delete_bitmap_time_us
               << ", res=" << status;
+    if (invisible_rowsets != nullptr) {
+        invisible_rowsets->push_back(rowset);
+        // see CloudTablet::save_delete_bitmap
+        auto dm = txn_info.delete_bitmap->delete_bitmap;
+        for (auto it = dm.begin(); it != dm.end(); ++it) {
+            if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
+                tablet_delete_bitmap->merge(
+                        {std::get<0>(it->first), std::get<1>(it->first), 
version}, it->second);
+            }
+        }
+    }
     return status;
 }
 
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
index e3733d3e696..c70a9cfa390 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -34,7 +34,8 @@ class CloudTabletCalcDeleteBitmapTask {
 public:
     CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
                                     CloudEngineCalcDeleteBitmapTask* 
engine_task, int64_t tablet_id,
-                                    int64_t transaction_id, int64_t version);
+                                    int64_t transaction_id, int64_t version,
+                                    const std::vector<int64_t>& sub_txn_ids);
     ~CloudTabletCalcDeleteBitmapTask() = default;
 
     void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t 
ms_cumulative_compaction_cnt,
@@ -43,12 +44,18 @@ public:
     Status handle() const;
 
 private:
+    Status _handle_rowset(std::shared_ptr<CloudTablet> tablet, int64_t version,
+                          int64_t sub_txn_id = -1,
+                          std::vector<RowsetSharedPtr>* invisible_rowsets = 
nullptr,
+                          DeleteBitmapPtr tablet_delete_bitmap = nullptr) 
const;
+
     CloudStorageEngine& _engine;
     CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
 
     int64_t _tablet_id;
     int64_t _transaction_id;
     int64_t _version;
+    std::vector<int64_t> _sub_txn_ids;
 
     int64_t _ms_base_compaction_cnt {-1};
     int64_t _ms_cumulative_compaction_cnt {-1};
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index c88b073e964..267c204c0e6 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -690,7 +690,8 @@ CalcDeleteBitmapExecutor* 
CloudTablet::calc_delete_bitmap_executor() {
 
 Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                        DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
-                                       const RowsetIdUnorderedSet& 
cur_rowset_ids) {
+                                       const RowsetIdUnorderedSet& 
cur_rowset_ids,
+                                       int64_t lock_id) {
     RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
     // update delete bitmap info, in order to avoid recalculation when trying 
again
@@ -714,8 +715,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* 
txn_info, int64_t tx
         }
     }
 
+    auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
     RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
-            *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, 
new_delete_bitmap.get()));
+            *this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID, 
new_delete_bitmap.get()));
 
     // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache 
because if the txn is retried for some reason,
     // it will use the delete bitmap from txn_delete_bitmap_cache when 
re-calculating the delete bitmap, during which it will do
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 0fde2f5b1d9..80038e569ba 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -170,7 +170,8 @@ public:
 
     Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
                               DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+                              const RowsetIdUnorderedSet& cur_rowset_ids,
+                              int64_t lock_id = -1) override;
 
     Status calc_delete_bitmap_for_compaction(const 
std::vector<RowsetSharedPtr>& input_rowsets,
                                              const RowsetSharedPtr& 
output_rowset,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 2e70e4586cc..a499a27b07f 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -450,7 +450,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
TabletSchema* latest
                                   RowLocation* row_location, uint32_t version,
                                   
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
                                   RowsetSharedPtr* rowset, bool with_rowid,
-                                  std::string* encoded_seq_value, 
OlapReaderStatistics* stats) {
+                                  std::string* encoded_seq_value, 
OlapReaderStatistics* stats,
+                                  DeleteBitmapPtr delete_bitmap) {
     SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
     size_t seq_col_length = 0;
     // use the latest tablet schema to decide if the tablet has sequence 
column currently
@@ -467,6 +468,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
TabletSchema* latest
             Slice(encoded_key.get_data(), encoded_key.get_size() - 
seq_col_length - rowid_length);
     RowLocation loc;
 
+    auto tablet_delete_bitmap =
+            delete_bitmap == nullptr ? _tablet_meta->delete_bitmap_ptr() : 
delete_bitmap;
     for (size_t i = 0; i < specified_rowsets.size(); i++) {
         const auto& rs = specified_rowsets[i];
         const auto& segments_key_bounds = 
rs->rowset_meta()->get_segments_key_bounds();
@@ -501,7 +504,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, 
TabletSchema* latest
             if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
                 return s;
             }
-            if (s.ok() && 
_tablet_meta->delete_bitmap().contains_agg_without_cache(
+            if (s.ok() && tablet_delete_bitmap->contains_agg_without_cache(
                                   {loc.rowset_id, loc.segment_id, version}, 
loc.row_id)) {
                 // if has sequence col, we continue to compare the sequence_id 
of
                 // all rowsets, util we find an existing key.
@@ -535,7 +538,8 @@ Status BaseTablet::calc_delete_bitmap(const BaseTabletSPtr& 
tablet, RowsetShared
                                       const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
                                       const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                                       DeleteBitmapPtr delete_bitmap, int64_t 
end_version,
-                                      CalcDeleteBitmapToken* token, 
RowsetWriter* rowset_writer) {
+                                      CalcDeleteBitmapToken* token, 
RowsetWriter* rowset_writer,
+                                      DeleteBitmapPtr tablet_delete_bitmap) {
     auto rowset_id = rowset->rowset_id();
     if (specified_rowsets.empty() || segments.empty()) {
         LOG(INFO) << "skip to construct delete bitmap tablet: " << 
tablet->tablet_id()
@@ -548,10 +552,11 @@ Status BaseTablet::calc_delete_bitmap(const 
BaseTabletSPtr& tablet, RowsetShared
         const auto& seg = segment;
         if (token != nullptr) {
             RETURN_IF_ERROR(token->submit(tablet, rowset, seg, 
specified_rowsets, end_version,
-                                          delete_bitmap, rowset_writer));
+                                          delete_bitmap, rowset_writer, 
tablet_delete_bitmap));
         } else {
             RETURN_IF_ERROR(tablet->calc_segment_delete_bitmap(
-                    rowset, segment, specified_rowsets, delete_bitmap, 
end_version, rowset_writer));
+                    rowset, segment, specified_rowsets, delete_bitmap, 
end_version, rowset_writer,
+                    tablet_delete_bitmap));
         }
     }
 
@@ -562,7 +567,8 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                                               const 
segment_v2::SegmentSharedPtr& seg,
                                               const 
std::vector<RowsetSharedPtr>& specified_rowsets,
                                               DeleteBitmapPtr delete_bitmap, 
int64_t end_version,
-                                              RowsetWriter* rowset_writer) {
+                                              RowsetWriter* rowset_writer,
+                                              DeleteBitmapPtr 
tablet_delete_bitmap) {
     OlapStopWatch watch;
     auto rowset_id = rowset->rowset_id();
     Version dummy_version(end_version + 1, end_version + 1);
@@ -676,9 +682,16 @@ Status 
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
             }
 
             RowsetSharedPtr rowset_find;
-            auto st = lookup_row_key(key, rowset_schema.get(), true, 
specified_rowsets, &loc,
-                                     cast_set<uint32_t>(dummy_version.first - 
1), segment_caches,
-                                     &rowset_find);
+            Status st = Status::OK();
+            if (tablet_delete_bitmap == nullptr) {
+                st = lookup_row_key(key, rowset_schema.get(), true, 
specified_rowsets, &loc,
+                                    cast_set<uint32_t>(dummy_version.first - 
1), segment_caches,
+                                    &rowset_find);
+            } else {
+                st = lookup_row_key(key, rowset_schema.get(), true, 
specified_rowsets, &loc,
+                                    cast_set<uint32_t>(dummy_version.first - 
1), segment_caches,
+                                    &rowset_find, true, nullptr, nullptr, 
tablet_delete_bitmap);
+            }
             bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() || 
st.is<KEY_ALREADY_EXISTS>();
             // It's a defensive DCHECK, we need to exclude some common errors 
to avoid core-dump
             // while stress test
@@ -1351,7 +1364,8 @@ Status 
BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
 }
 
 Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, 
TabletTxnInfo* txn_info,
-                                        int64_t txn_id, int64_t 
txn_expiration) {
+                                        int64_t txn_id, int64_t txn_expiration,
+                                        DeleteBitmapPtr tablet_delete_bitmap) {
     SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
@@ -1380,6 +1394,8 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
     auto t1 = watch.get_elapse_time_us();
 
     {
+        int64_t next_visible_version = txn_info->is_txn_load ? 
txn_info->next_visible_version
+                                                             : 
txn_info->rowset->start_version();
         std::shared_lock meta_rlock(self->_meta_lock);
         // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
         if (self->tablet_state() == TABLET_NOTREADY) {
@@ -1387,7 +1403,7 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
                       << self->tablet_id();
             return Status::OK();
         }
-        RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
+        RETURN_IF_ERROR(self->get_all_rs_id_unlocked(next_visible_version - 1, 
&cur_rowset_ids));
     }
     auto t2 = watch.get_elapse_time_us();
 
@@ -1402,6 +1418,15 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
         std::shared_lock meta_rlock(self->_meta_lock);
         specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
     }
+    if (txn_info->is_txn_load) {
+        for (auto invisible_rowset : txn_info->invisible_rowsets) {
+            specified_rowsets.emplace_back(invisible_rowset);
+        }
+        std::sort(specified_rowsets.begin(), specified_rowsets.end(),
+                  [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) {
+                      return lhs->end_version() > rhs->end_version();
+                  });
+    }
     auto t3 = watch.get_elapse_time_us();
 
     // If a rowset is produced by compaction before the commit phase of the 
partial update load
@@ -1446,7 +1471,8 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
         auto token = self->calc_delete_bitmap_executor()->create_token();
         // set rowset_writer to nullptr to skip the alignment process
         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
rowsets_skip_alignment,
-                                           delete_bitmap, cur_version - 1, 
token.get(), nullptr));
+                                           delete_bitmap, cur_version - 1, 
token.get(), nullptr,
+                                           tablet_delete_bitmap));
         RETURN_IF_ERROR(token->wait());
     }
 
@@ -1454,13 +1480,14 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
     // Otherwise, it will be submitted to the thread pool for calculation.
     if (segments.size() <= 1) {
         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
-                                           cur_version - 1, nullptr, 
transient_rs_writer.get()));
+                                           cur_version - 1, nullptr, 
transient_rs_writer.get(),
+                                           tablet_delete_bitmap));
 
     } else {
         auto token = self->calc_delete_bitmap_executor()->create_token();
         RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
-                                           cur_version - 1, token.get(),
-                                           transient_rs_writer.get()));
+                                           cur_version - 1, token.get(), 
transient_rs_writer.get(),
+                                           tablet_delete_bitmap));
         RETURN_IF_ERROR(token->wait());
     }
 
@@ -1511,8 +1538,9 @@ Status BaseTablet::update_delete_bitmap(const 
BaseTabletSPtr& self, TabletTxnInf
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
     auto t5 = watch.get_elapse_time_us();
+    int64_t lock_id = txn_info->is_txn_load ? txn_info->lock_id : -1;
     RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
-                                             transient_rs_writer.get(), 
cur_rowset_ids));
+                                             transient_rs_writer.get(), 
cur_rowset_ids, lock_id));
     LOG(INFO) << "[Publish] construct delete bitmap tablet: " << 
self->tablet_id()
               << ", rowset_ids to add: " << rowset_ids_to_add.size()
               << ", rowset_ids to del: " << rowset_ids_to_del.size()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index b6fc953e460..f961f4c49ee 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -156,7 +156,8 @@ public:
                           std::vector<std::unique_ptr<SegmentCacheHandle>>& 
segment_caches,
                           RowsetSharedPtr* rowset = nullptr, bool with_rowid = 
true,
                           std::string* encoded_seq_value = nullptr,
-                          OlapReaderStatistics* stats = nullptr);
+                          OlapReaderStatistics* stats = nullptr,
+                          DeleteBitmapPtr tablet_delete_bitmap = nullptr);
 
     // calc delete bitmap when flush memtable, use a fake version to calc
     // For example, cur max version is 5, and we use version 6 to calc but
@@ -169,13 +170,15 @@ public:
                                      const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                                      DeleteBitmapPtr delete_bitmap, int64_t 
version,
                                      CalcDeleteBitmapToken* token,
-                                     RowsetWriter* rowset_writer = nullptr);
+                                     RowsetWriter* rowset_writer = nullptr,
+                                     DeleteBitmapPtr tablet_delete_bitmap = 
nullptr);
 
     Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                                       const segment_v2::SegmentSharedPtr& seg,
                                       const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                                       DeleteBitmapPtr delete_bitmap, int64_t 
end_version,
-                                      RowsetWriter* rowset_writer);
+                                      RowsetWriter* rowset_writer,
+                                      DeleteBitmapPtr tablet_delete_bitmap = 
nullptr);
 
     Status calc_delete_bitmap_between_segments(
             RowsetSharedPtr rowset, const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
@@ -235,11 +238,13 @@ public:
             int64_t txn_expiration = 0) = 0;
 
     static Status update_delete_bitmap(const BaseTabletSPtr& self, 
TabletTxnInfo* txn_info,
-                                       int64_t txn_id, int64_t txn_expiration 
= 0);
+                                       int64_t txn_id, int64_t txn_expiration 
= 0,
+                                       DeleteBitmapPtr tablet_delete_bitmap = 
nullptr);
 
     virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                       DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
-                                      const RowsetIdUnorderedSet& 
cur_rowset_ids) = 0;
+                                      const RowsetIdUnorderedSet& 
cur_rowset_ids,
+                                      int64_t lock_id = -1) = 0;
     virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
 
     void calc_compaction_output_rowset_delete_bitmap(
diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp 
b/be/src/olap/calc_delete_bitmap_executor.cpp
index 3983dc0a986..e45f9801f68 100644
--- a/be/src/olap/calc_delete_bitmap_executor.cpp
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -34,7 +34,8 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, 
RowsetSharedPtr cur_
                                      const segment_v2::SegmentSharedPtr& 
cur_segment,
                                      const std::vector<RowsetSharedPtr>& 
target_rowsets,
                                      int64_t end_version, DeleteBitmapPtr 
delete_bitmap,
-                                     RowsetWriter* rowset_writer) {
+                                     RowsetWriter* rowset_writer,
+                                     DeleteBitmapPtr tablet_delete_bitmap) {
     {
         std::shared_lock rlock(_lock);
         RETURN_IF_ERROR(_status);
@@ -44,7 +45,8 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet, 
RowsetSharedPtr cur_
     return _thread_token->submit_func([=, this]() {
         SCOPED_ATTACH_TASK(_query_thread_context);
         auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, 
target_rowsets,
-                                                     delete_bitmap, 
end_version, rowset_writer);
+                                                     delete_bitmap, 
end_version, rowset_writer,
+                                                     tablet_delete_bitmap);
         if (!st.ok()) {
             LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
                          << tablet->tablet_id() << " rowset: " << 
cur_rowset->rowset_id()
diff --git a/be/src/olap/calc_delete_bitmap_executor.h 
b/be/src/olap/calc_delete_bitmap_executor.h
index fa1e79b7fea..288108b0497 100644
--- a/be/src/olap/calc_delete_bitmap_executor.h
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -52,7 +52,8 @@ public:
     Status submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_rowset,
                   const segment_v2::SegmentSharedPtr& cur_segment,
                   const std::vector<RowsetSharedPtr>& target_rowsets, int64_t 
end_version,
-                  DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer);
+                  DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
+                  DeleteBitmapPtr tablet_delete_bitmap);
 
     // wait all tasks in token to be completed.
     Status wait();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8a0e23e75b8..0d04984d0e0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2490,7 +2490,7 @@ CalcDeleteBitmapExecutor* 
Tablet::calc_delete_bitmap_executor() {
 
 Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
                                   DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                                  const RowsetIdUnorderedSet& cur_rowset_ids) {
+                                  const RowsetIdUnorderedSet& cur_rowset_ids, 
int64_t lock_id) {
     RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f5866c67641..0b7d758ab8f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -417,7 +417,8 @@ public:
     CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
     Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
                               DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
-                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+                              const RowsetIdUnorderedSet& cur_rowset_ids,
+                              int64_t lock_id = -1) override;
 
     void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
     bool check_all_rowset_segment();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index d56e529e42b..fb0895604a1 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -236,6 +236,7 @@ public:
     static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                          ColumnPB* column);
 
+    DeleteBitmapPtr delete_bitmap_ptr() { return _delete_bitmap; }
     DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
 
     bool enable_unique_key_merge_on_write() const { return 
_enable_unique_key_merge_on_write; }
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 88ee97c5f6a..1994dec9494 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -87,6 +87,12 @@ struct TabletTxnInfo {
     std::shared_ptr<PublishStatus> publish_status;
     TxnPublishInfo publish_info;
 
+    // for cloud only, used to calculate delete bitmap for txn load
+    bool is_txn_load = false;
+    std::vector<RowsetSharedPtr> invisible_rowsets;
+    int64_t lock_id;
+    int64_t next_visible_version;
+
     TxnState state {TxnState::PREPARED};
     TabletTxnInfo() = default;
 
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 32f6b56f51a..58930f6edfc 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -912,6 +912,69 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, 
const TabletStats& stat
     }
 }
 
+// process mow table, check lock and remove pending key
+void process_mow_when_commit_txn(
+        const CommitTxnRequest* request, const std::string& instance_id, 
MetaServiceCode& code,
+        std::string& msg, std::unique_ptr<Transaction>& txn,
+        std::unordered_map<int64_t, std::vector<int64_t>>& 
table_id_tablet_ids) {
+    int64_t txn_id = request->txn_id();
+    std::stringstream ss;
+    std::vector<std::string> lock_keys;
+    lock_keys.reserve(request->mow_table_ids().size());
+    for (auto table_id : request->mow_table_ids()) {
+        lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1}));
+    }
+    std::vector<std::optional<std::string>> lock_values;
+    TxnErrorCode err = txn->batch_get(&lock_values, lock_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        ss << "failed to get delete bitmap update lock key info, instance_id=" 
<< instance_id
+           << " err=" << err;
+        msg = ss.str();
+        code = cast_as<ErrCategory::READ>(err);
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_locks = lock_keys.size();
+    for (size_t i = 0; i < total_locks; i++) {
+        int64_t table_id = request->mow_table_ids(i);
+        // When the key does not exist, it means the lock has been acquired
+        // by another transaction and successfully committed.
+        if (!lock_values[i].has_value()) {
+            ss << "get delete bitmap update lock info, lock is expired"
+               << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
+            code = MetaServiceCode::LOCK_EXPIRED;
+            msg = ss.str();
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+
+        DeleteBitmapUpdateLockPB lock_info;
+        if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            msg = "failed to parse DeleteBitmapUpdateLockPB";
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+        if (lock_info.lock_id() != request->txn_id()) {
+            msg = "lock is expired";
+            code = MetaServiceCode::LOCK_EXPIRED;
+            return;
+        }
+        txn->remove(lock_keys[i]);
+        LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_keys[i])
+                  << " txn_id=" << txn_id;
+
+        for (auto tablet_id : table_id_tablet_ids[table_id]) {
+            std::string pending_key = 
meta_pending_delete_bitmap_key({instance_id, tablet_id});
+            txn->remove(pending_key);
+            LOG(INFO) << "xxx remove delete bitmap pending key, pending_key=" 
<< hex(pending_key)
+                      << " txn_id=" << txn_id;
+        }
+    }
+    lock_keys.clear();
+    lock_values.clear();
+}
+
 /**
  * 0. Extract txn_id from request
  * 1. Get db id from TxnKv with txn_id
@@ -1173,61 +1236,11 @@ void commit_txn_immediately(
             stats.num_segs += i.num_segments();
         } // for tmp_rowsets_meta
 
-        // process mow table, check lock and remove pending key
-        std::vector<std::string> lock_keys;
-        lock_keys.reserve(request->mow_table_ids().size());
-        for (auto table_id : request->mow_table_ids()) {
-            
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, 
-1}));
-        }
-        std::vector<std::optional<std::string>> lock_values;
-        err = txn->batch_get(&lock_values, lock_keys);
-        if (err != TxnErrorCode::TXN_OK) {
-            ss << "failed to get delete bitmap update lock key info, 
instance_id=" << instance_id
-               << " err=" << err;
-            msg = ss.str();
-            code = cast_as<ErrCategory::READ>(err);
-            LOG(WARNING) << msg << " txn_id=" << txn_id;
+        process_mow_when_commit_txn(request, instance_id, code, msg, txn, 
table_id_tablet_ids);
+        if (code != MetaServiceCode::OK) {
+            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " 
code=" << code;
             return;
         }
-        size_t total_locks = lock_keys.size();
-        for (size_t i = 0; i < total_locks; i++) {
-            int64_t table_id = request->mow_table_ids(i);
-            // When the key does not exist, it means the lock has been acquired
-            // by another transaction and successfully committed.
-            if (!lock_values[i].has_value()) {
-                ss << "get delete bitmap update lock info, lock is expired"
-                   << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
-                code = MetaServiceCode::LOCK_EXPIRED;
-                msg = ss.str();
-                LOG(WARNING) << msg << " txn_id=" << txn_id;
-                return;
-            }
-
-            DeleteBitmapUpdateLockPB lock_info;
-            if (!lock_info.ParseFromString(lock_values[i].value())) 
[[unlikely]] {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = "failed to parse DeleteBitmapUpdateLockPB";
-                LOG(WARNING) << msg << " txn_id=" << txn_id;
-                return;
-            }
-            if (lock_info.lock_id() != request->txn_id()) {
-                msg = "lock is expired";
-                code = MetaServiceCode::LOCK_EXPIRED;
-                return;
-            }
-            txn->remove(lock_keys[i]);
-            LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_keys[i])
-                      << " txn_id=" << txn_id;
-
-            for (auto tablet_id : table_id_tablet_ids[table_id]) {
-                std::string pending_key = 
meta_pending_delete_bitmap_key({instance_id, tablet_id});
-                txn->remove(pending_key);
-                LOG(INFO) << "xxx remove delete bitmap pending key, 
pending_key="
-                          << hex(pending_key) << " txn_id=" << txn_id;
-            }
-        }
-        lock_keys.clear();
-        lock_values.clear();
 
         // Save rowset meta
         for (auto& i : rowsets) {
@@ -1810,62 +1823,12 @@ void commit_txn_eventually(
             response->add_versions(i.second + 1);
         }
 
-        // process mow table, check lock and remove pending key
-        std::vector<std::string> lock_keys;
-        lock_keys.reserve(request->mow_table_ids().size());
-        for (auto table_id : request->mow_table_ids()) {
-            
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, 
-1}));
-        }
-        std::vector<std::optional<std::string>> lock_values;
-        err = txn->batch_get(&lock_values, lock_keys);
-        if (err != TxnErrorCode::TXN_OK) {
-            ss << "failed to get delete bitmap update lock key info, 
instance_id=" << instance_id
-               << " err=" << err;
-            msg = ss.str();
-            code = cast_as<ErrCategory::READ>(err);
-            LOG(WARNING) << msg << " txn_id=" << txn_id;
+        process_mow_when_commit_txn(request, instance_id, code, msg, txn, 
table_id_tablet_ids);
+        if (code != MetaServiceCode::OK) {
+            LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " 
code=" << code;
             return;
         }
 
-        for (size_t i = 0; i < lock_keys.size(); i++) {
-            int64_t table_id = request->mow_table_ids(i);
-            // When the key does not exist, it means the lock has been acquired
-            // by another transaction and successfully committed.
-            if (!lock_values[i].has_value()) {
-                ss << "get delete bitmap update lock info, lock is expired"
-                   << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
-                code = MetaServiceCode::LOCK_EXPIRED;
-                msg = ss.str();
-                LOG(WARNING) << msg << " txn_id=" << txn_id;
-                return;
-            }
-
-            DeleteBitmapUpdateLockPB lock_info;
-            if (!lock_info.ParseFromString(lock_values[i].value())) 
[[unlikely]] {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                msg = "failed to parse DeleteBitmapUpdateLockPB";
-                LOG(WARNING) << msg << " txn_id=" << txn_id;
-                return;
-            }
-            if (lock_info.lock_id() != request->txn_id()) {
-                msg = "lock is expired";
-                code = MetaServiceCode::LOCK_EXPIRED;
-                return;
-            }
-            txn->remove(lock_keys[i]);
-            LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_keys[i])
-                      << " txn_id=" << txn_id;
-
-            for (auto tablet_id : table_id_tablet_ids[table_id]) {
-                std::string pending_key = 
meta_pending_delete_bitmap_key({instance_id, tablet_id});
-                txn->remove(pending_key);
-                LOG(INFO) << "xxx remove delete bitmap pending key, 
pending_key="
-                          << hex(pending_key) << " txn_id=" << txn_id;
-            }
-        }
-        lock_keys.clear();
-        lock_values.clear();
-
         // Save table versions
         for (auto& i : table_id_tablet_ids) {
             std::string ver_key = table_version_key({instance_id, db_id, 
i.first});
@@ -2282,6 +2245,12 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* 
request, CommitTxnResponse*
         } // for tmp_rowsets_meta
     }
 
+    process_mow_when_commit_txn(request, instance_id, code, msg, txn, 
table_id_tablet_ids);
+    if (code != MetaServiceCode::OK) {
+        LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << 
code;
+        return;
+    }
+
     // Save rowset meta
     for (auto& i : rowsets) {
         size_t rowset_size = i.first.size() + i.second.size();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index be0390db584..72e6438999c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3188,6 +3188,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, description = {"存算分离模式下calculate delete bitmap 
task 超时时间,默认15s"})
     public static int calculate_delete_bitmap_task_timeout_seconds = 15;
 
+    @ConfField(mutable = true, description = {"存算分离模式下事务导入calculate delete 
bitmap task 超时时间,默认300s"})
+    public static int 
calculate_delete_bitmap_task_timeout_seconds_for_transaction_load = 300;
+
     @ConfField(mutable = true, description = {"存算分离模式下commit阶段等锁超时时间,默认5s"})
     public static int try_commit_lock_timeout_seconds = 5;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 1a80058759b..3c56f7dc56a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -99,6 +99,7 @@ import org.apache.doris.task.CalcDeleteBitmapTask;
 import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTabletCommitInfo;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@@ -492,7 +493,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                             
transactionState.getTransactionStatus().toString());
                 }
             }
-            calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos);
+            calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos, null);
         }
 
         CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
@@ -529,12 +530,17 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
+        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment);
+    }
+
+    private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, 
long transactionId, boolean is2PC,
+            TxnCommitAttachment txnCommitAttachment) throws UserException {
         boolean txnOperated = false;
         TransactionState txnState = null;
         TxnStateChangeCallback cb = null;
         long callbackId = 0L;
         try {
-            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId, 
tableList);
+            txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
             txnOperated = true;
             if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
 {
                 throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -558,8 +564,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
-    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC, long dbId,
-            List<Table> tableList) throws UserException {
+    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC)
+            throws UserException {
         CommitTxnResponse commitTxnResponse = null;
         TransactionState txnState = null;
         int retryTime = 0;
@@ -648,9 +654,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, 
long transactionId,
-            List<TabletCommitInfo> tabletCommitInfos)
+            List<TabletCommitInfo> tabletCommitInfos, 
List<SubTransactionState> subTransactionStates)
             throws UserException {
-        Map<Long, Map<Long, List<Long>>> backendToPartitionTablets = 
Maps.newHashMap();
+        Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets = 
Maps.newHashMap();
         Map<Long, Partition> partitions = Maps.newHashMap();
         Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
         Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
@@ -661,6 +667,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             throw new UserException("The partition info is empty, table may be 
dropped, txnid=" + transactionId);
         }
 
+        Map<Long, List<Long>> partitionToSubTxnIds = 
getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
+                tabletToTabletMeta);
         Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
         Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
         Map<Long, Long> cumulativePoints = Maps.newHashMap();
@@ -670,9 +678,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
         Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = getCalcDeleteBitmapInfo(
                 backendToPartitionTablets, partitionVersions, 
baseCompactionCnts, cumulativeCompactionCnts,
-                        cumulativePoints);
+                cumulativePoints, partitionToSubTxnIds);
         try {
-            sendCalcDeleteBitmaptask(dbId, transactionId, 
backendToPartitionInfos);
+            sendCalcDeleteBitmaptask(dbId, transactionId, 
backendToPartitionInfos,
+                    subTransactionStates == null ? 
Config.calculate_delete_bitmap_task_timeout_seconds
+                            : 
Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
         } catch (UserException e) {
             LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + 
transactionId + ",exception=" + e.getMessage());
             removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
@@ -680,11 +690,34 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
+    private Map<Long, List<Long>> 
getPartitionSubTxnIds(List<SubTransactionState> subTransactionStates,
+            Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta> 
tabletToTabletMeta) {
+        if (subTransactionStates == null) {
+            return null;
+        }
+        Map<Long, List<Long>> partitionToSubTxnIds = Maps.newHashMap();
+        for (SubTransactionState subTransactionState : subTransactionStates) {
+            if 
(!tableToTabletList.containsKey(subTransactionState.getTable().getId())) {
+                // skip non mow table
+                continue;
+            }
+            for (TTabletCommitInfo ci : 
subTransactionState.getTabletCommitInfos()) {
+                TabletMeta tabletMeta = 
tabletToTabletMeta.get(ci.getTabletId());
+                long partitionId = tabletMeta.getPartitionId();
+                List<Long> subTxnIds = 
partitionToSubTxnIds.computeIfAbsent(partitionId, k -> Lists.newArrayList());
+                if 
(!subTxnIds.contains(subTransactionState.getSubTransactionId())) {
+                    subTxnIds.add(subTransactionState.getSubTransactionId());
+                }
+            }
+        }
+        return partitionToSubTxnIds;
+    }
+
     private void getPartitionInfo(List<OlapTable> tableList,
             List<TabletCommitInfo> tabletCommitInfos,
             Map<Long, Set<Long>> tableToParttions,
             Map<Long, Partition> partitions,
-            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets,
+            Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets,
             Map<Long, List<Long>> tableToTabletList,
             Map<Long, TabletMeta> tabletToTabletMeta) {
         Map<Long, OlapTable> tableMap = Maps.newHashMap();
@@ -697,18 +730,22 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         TabletInvertedIndex tabletInvertedIndex = 
Env.getCurrentEnv().getTabletInvertedIndex();
         List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
         for (int i = 0; i < tabletMetaList.size(); i++) {
+            long tabletId = tabletIds.get(i);
+            if (tabletToTabletMeta.containsKey(tabletId)) {
+                continue;
+            }
             TabletMeta tabletMeta = tabletMetaList.get(i);
             long tableId = tabletMeta.getTableId();
             if (!tableMap.containsKey(tableId)) {
                 continue;
             }
 
-            tabletToTabletMeta.put(tabletIds.get(i), tabletMeta);
+            tabletToTabletMeta.put(tabletId, tabletMeta);
 
-            if (!tableToTabletList.containsKey(tableId)) {
-                tableToTabletList.put(tableId, Lists.newArrayList());
+            List<Long> tableTabletIds = 
tableToTabletList.computeIfAbsent(tableId, k -> Lists.newArrayList());
+            if (!tableTabletIds.contains(tabletId)) {
+                tableTabletIds.add(tabletId);
             }
-            tableToTabletList.get(tableId).add(tabletIds.get(i));
 
             long partitionId = tabletMeta.getPartitionId();
             long backendId = tabletCommitInfos.get(i).getBackendId();
@@ -721,11 +758,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             if (!backendToPartitionTablets.containsKey(backendId)) {
                 backendToPartitionTablets.put(backendId, Maps.newHashMap());
             }
-            Map<Long, List<Long>> partitionToTablets = 
backendToPartitionTablets.get(backendId);
+            Map<Long, Set<Long>> partitionToTablets = 
backendToPartitionTablets.get(backendId);
             if (!partitionToTablets.containsKey(partitionId)) {
-                partitionToTablets.put(partitionId, Lists.newArrayList());
+                partitionToTablets.put(partitionId, Sets.newHashSet());
             }
-            partitionToTablets.get(partitionId).add(tabletIds.get(i));
+            partitionToTablets.get(partitionId).add(tabletId);
             partitions.putIfAbsent(partitionId, 
tableMap.get(tableId).getPartition(partitionId));
         }
     }
@@ -741,18 +778,18 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
getCalcDeleteBitmapInfo(
-            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets, 
Map<Long, Long> partitionVersions,
+            Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets, 
Map<Long, Long> partitionVersions,
                     Map<Long, Long> baseCompactionCnts, Map<Long, Long> 
cumulativeCompactionCnts,
-                            Map<Long, Long> cumulativePoints) {
+                            Map<Long, Long> cumulativePoints, Map<Long, 
List<Long>> partitionToSubTxnIds) {
         Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = Maps.newHashMap();
-        for (Map.Entry<Long, Map<Long, List<Long>>> entry : 
backendToPartitionTablets.entrySet()) {
+        for (Map.Entry<Long, Map<Long, Set<Long>>> entry : 
backendToPartitionTablets.entrySet()) {
             List<TCalcDeleteBitmapPartitionInfo> partitionInfos = 
Lists.newArrayList();
-            for (Map.Entry<Long, List<Long>> partitionToTablets : 
entry.getValue().entrySet()) {
+            for (Map.Entry<Long, Set<Long>> partitionToTablets : 
entry.getValue().entrySet()) {
                 Long partitionId = partitionToTablets.getKey();
-                List<Long> tabletList = partitionToTablets.getValue();
+                Set<Long> tabletList = partitionToTablets.getValue();
                 TCalcDeleteBitmapPartitionInfo partitionInfo = new 
TCalcDeleteBitmapPartitionInfo(partitionId,
                         partitionVersions.get(partitionId),
-                        tabletList);
+                        Lists.newArrayList(tabletList));
                 if (!baseCompactionCnts.isEmpty() && 
!cumulativeCompactionCnts.isEmpty()
                         && !cumulativePoints.isEmpty()) {
                     List<Long> reqBaseCompactionCnts = Lists.newArrayList();
@@ -766,6 +803,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
                     
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
                     partitionInfo.setCumulativePoints(reqCumulativePoints);
+                    if (partitionToSubTxnIds != null) {
+                        List<Long> subTxnIds = 
partitionToSubTxnIds.get(partitionId);
+                        if (subTxnIds != null && !subTxnIds.isEmpty()) {
+                            partitionInfo.setSubTxnIds(subTxnIds);
+                            LOG.debug("partitionId: {}, subTxnIds: {}", 
partitionId, subTxnIds);
+                        }
+                    }
                 }
                 partitionInfos.add(partitionInfo);
             }
@@ -926,8 +970,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
-            Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos)
-            throws UserException {
+            Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos,
+            long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException 
{
         if (backendToPartitionInfos.isEmpty()) {
             return;
         }
@@ -948,13 +992,14 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             // not check return value, because the add will success
             AgentTaskQueue.addTask(task);
             batchTask.addTask(task);
-            LOG.info("send calculate delete bitmap task to be {}, txn_id {}", 
entry.getKey(), transactionId);
+            LOG.info("send calculate delete bitmap task to be {}, txn_id {}, 
partitionInfos={}", entry.getKey(),
+                    transactionId, entry.getValue());
         }
         AgentTaskExecutor.submit(batchTask);
 
         boolean ok;
         try {
-            ok = 
countDownLatch.await(Config.calculate_delete_bitmap_task_timeout_seconds, 
TimeUnit.SECONDS);
+            ok = countDownLatch.await(calculateDeleteBitmapTaskTimeoutSeconds, 
TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             LOG.warn("InterruptedException: ", e);
             ok = false;
@@ -1043,14 +1088,42 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     "disable_load_job is set to true, all load jobs are not 
allowed");
         }
         LOG.info("try to commit transaction, txnId: {}, subTxnStates: {}", 
transactionId, subTransactionStates);
+
+        Preconditions.checkState(db instanceof Database);
+        List<Long> tableIdList = subTransactionStates.stream().map(s -> 
s.getTable().getId()).distinct()
+                .collect(Collectors.toList());
+        List<Table> tableList = ((Database) 
db).getTablesOnIdOrderOrThrowException(tableIdList);
+        beforeCommitTransaction(tableList, transactionId, timeoutMillis);
+        try {
+            commitTransactionWithSubTxns(db.getId(), tableList, transactionId, 
subTransactionStates);
+        } finally {
+            decreaseWaitingLockCount(tableList);
+            MetaLockUtils.commitUnlockTables(tableList);
+        }
+        return true;
+    }
+
+    private void commitTransactionWithSubTxns(long dbId, List<Table> 
tableList, long transactionId,
+            List<SubTransactionState> subTransactionStates) throws 
UserException {
+        List<TabletCommitInfo> tabletCommitInfos = 
subTransactionStates.stream().map(
+                        
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
+                .map(c -> new TabletCommitInfo(c.getTabletId(), 
c.getBackendId())).collect(Collectors.toList());
+        List<OlapTable> mowTableList = getMowTableList(tableList, 
tabletCommitInfos);
+        if (!mowTableList.isEmpty()) {
+            calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos, subTransactionStates);
+        }
+
         cleanSubTransactions(transactionId);
         CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
-        builder.setDbId(db.getId())
+        builder.setDbId(dbId)
                 .setTxnId(transactionId)
                 .setIs2Pc(false)
                 .setCloudUniqueId(Config.cloud_unique_id)
                 .setIsTxnLoad(true)
                 .setEnableTxnLazyCommit(Config.enable_cloud_txn_lazy_commit);
+        for (OlapTable olapTable : mowTableList) {
+            builder.addMowTableIds(olapTable.getId());
+        }
         // add sub txn infos
         for (SubTransactionState subTransactionState : subTransactionStates) {
             
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
@@ -1064,31 +1137,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        TransactionState txnState = null;
-        boolean txnOperated = false;
-        try {
-            txnState = commitTxn(commitTxnRequest, transactionId, false, 
db.getId(),
-                    
subTransactionStates.stream().map(SubTransactionState::getTable)
-                        .collect(Collectors.toList()));
-            txnOperated = true;
-        } finally {
-            if (txnState != null) {
-                TxnStateChangeCallback cb = 
callbackFactory.getCallback(txnState.getCallbackId());
-                if (cb != null) {
-                    LOG.info("commitTxn, run txn callback, transactionId:{} 
callbackId:{}, txnState:{}",
-                            txnState.getTransactionId(), 
txnState.getCallbackId(), txnState);
-                    cb.afterCommitted(txnState, txnOperated);
-                    cb.afterVisible(txnState, txnOperated);
-                }
-            }
-        }
-        return true;
+        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
     }
 
-    @Override
-    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
-                                               List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
-                                               TxnCommitAttachment 
txnCommitAttachment) throws UserException {
+    // add some log and get commit lock, mainly used for mow tables
+    private void beforeCommitTransaction(List<Table> tableList, long 
transactionId, long timeoutMillis)
+            throws UserException {
         for (int i = 0; i < tableList.size(); i++) {
             long tableId = tableList.get(i).getId();
             LOG.info("start commit txn=" + transactionId + ",table=" + 
tableId);
@@ -1107,6 +1161,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     "get table cloud commit lock timeout, tableList=("
                             + StringUtils.join(tableList, ",") + ")");
         }
+    }
+
+    @Override
+    public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
+                                               List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
+                                               TxnCommitAttachment 
txnCommitAttachment) throws UserException {
+        beforeCommitTransaction(tableList, transactionId, timeoutMillis);
         try {
             commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 25c4ff4b3b2..8a75dd0d0c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -21,8 +21,6 @@ import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
@@ -206,13 +204,6 @@ public class TransactionEntry {
             throw new AnalysisException(
                     "Transaction insert can not insert into values and insert 
into select at the same time");
         }
-        if (Config.isCloudMode()) {
-            OlapTable olapTable = (OlapTable) table;
-            if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && 
olapTable.getEnableUniqueKeyMergeOnWrite()) {
-                throw new UserException(
-                        "Transaction load is not supported for merge on write 
unique keys table in cloud mode");
-            }
-        }
         DatabaseIf database = table.getDatabase();
         if (!isTransactionBegan) {
             long timeoutSecond = ConnectContext.get().getExecTimeout();
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index abffd176ef8..fdbf4483bf8 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -440,6 +440,7 @@ struct TCalcDeleteBitmapPartitionInfo {
     4: optional list<i64> base_compaction_cnts
     5: optional list<i64> cumulative_compaction_cnts
     6: optional list<i64> cumulative_points
+    7: optional list<i64> sub_txn_ids
 }
 
 struct TCalcDeleteBitmapRequest {
diff --git a/regression-test/suites/insert_p0/transaction/txn_insert.groovy 
b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
index 6653c05740e..a4868ca6b11 100644
--- a/regression-test/suites/insert_p0/transaction/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
@@ -584,19 +584,10 @@ suite("txn_insert") {
             } catch (Exception e) {
                 logger.info("exception: " + e)
                 sql """ rollback """
-                if (isCloudMode()) {
-                    assertTrue(e.getMessage().contains("Transaction load is 
not supported for merge on write unique keys table in cloud mode"))
-                } else {
-                    assertTrue(false, "should not reach here")
-                }
+                assertTrue(false, "should not reach here")
             }
         }
 
-        // the following cases are not supported in cloud mode
-        if (isCloudMode()) {
-            break
-        }
-
         // 16. update stmt(mow table)
         if (use_nereids_planner) {
             def ut_table = "txn_insert_ut"
diff --git 
a/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
 
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
new file mode 100644
index 00000000000..8e6de4dd9e9
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.CompletableFuture
+
+suite("txn_insert_concurrent_insert_mow") {
+    def tableName = "txn_insert_concurrent_insert_mow"
+    List<String> errors = new ArrayList<>()
+
+    for (int i = 0; i < 3; i++) {
+        def table_name = "${tableName}_${i}"
+        sql """ drop table if exists ${table_name} """
+        sql """
+            CREATE TABLE IF NOT EXISTS ${table_name} (
+                L_ORDERKEY    INTEGER NOT NULL,
+                L_PARTKEY     INTEGER NOT NULL,
+                L_SUPPKEY     INTEGER NOT NULL,
+                L_LINENUMBER  INTEGER NOT NULL,
+                L_QUANTITY    DECIMAL(15,2) NOT NULL,
+                L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
+                L_DISCOUNT    DECIMAL(15,2) NOT NULL,
+                L_TAX         DECIMAL(15,2) NOT NULL,
+                L_RETURNFLAG  CHAR(1) NOT NULL,
+                L_LINESTATUS  CHAR(1) NOT NULL,
+                L_SHIPDATE    DATE NOT NULL,
+                L_COMMITDATE  DATE NOT NULL,
+                L_RECEIPTDATE DATE NOT NULL,
+                L_SHIPINSTRUCT CHAR(25) NOT NULL,
+                L_SHIPMODE     CHAR(10) NOT NULL,
+                L_COMMENT      VARCHAR(44) NOT NULL
+            )
+            UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+            DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+            PROPERTIES (
+                "replication_num" = "1"
+            );
+        """
+
+        if (i == 0) {
+            continue
+        }
+
+        streamLoad {
+            table table_name
+            set 'column_separator', '|'
+            set 'compress_type', 'GZ'
+            set 'columns', "l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, 
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,temp"
+            file """${getS3Url()}/regression/tpch/sf0.1/lineitem.tbl.gz"""
+
+            time 10000 // limit inflight 10s
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+    sql """ sync """
+
+    def dbName = "regression_test_insert_p0_transaction"
+    def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+    logger.info("url: ${url}")
+
+    def sqls = [
+            "begin",
+            "insert into ${tableName}_0 select * from ${tableName}_1 where 
L_ORDERKEY < 30000;",
+            "insert into ${tableName}_1 select * from ${tableName}_2 where 
L_ORDERKEY > 500000;",
+            "insert into ${tableName}_0 select * from ${tableName}_2 where 
L_ORDERKEY < 30000;",
+            "commit"
+    ]
+    def txn_insert = { ->
+        try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+             Statement stmt = conn.createStatement()) {
+            for (def sql : sqls) {
+                logger.info(Thread.currentThread().getName() + " execute sql: 
" + sql)
+                stmt.execute(sql)
+            }
+            logger.info("finish txn insert for " + 
Thread.currentThread().getName())
+        } catch (Throwable e) {
+            logger.error("txn insert failed", e)
+            errors.add("txn insert failed " + e.getMessage())
+        }
+    }
+
+    List<CompletableFuture<Void>> futures = new ArrayList<>()
+    for (int i = 0; i < 10; i++) {
+        CompletableFuture<Void> future = CompletableFuture.runAsync(txn_insert)
+        futures.add(future)
+    }
+    CompletableFuture<?>[] futuresArray = futures.toArray(new 
CompletableFuture[0])
+    CompletableFuture.allOf(futuresArray).get(10, TimeUnit.MINUTES)
+    sql """ sync """
+
+    logger.info("error num: " + errors.size() + ", errors: " + errors)
+
+    def result = sql """ select count() from ${tableName}_0 """
+    logger.info("result: ${result}")
+    assertEquals(30209, result[0][0])
+    result = sql """ select count() from ${tableName}_1 """
+    logger.info("result: ${result}")
+    assertEquals(600572, result[0][0])
+
+    def db_name = "regression_test_insert_p0_transaction"
+    def tables = sql """ show tables from $db_name """
+    logger.info("tables: $tables")
+    for (def table_info : tables) {
+        def table_name = table_info[0]
+        if (table_name.startsWith(tableName)) {
+            check_table_version_continuous(db_name, table_name)
+        }
+    }
+}
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
index 1fbe18bf212..fa57c550118 100644
--- 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
@@ -81,7 +81,7 @@ suite("txn_insert_concurrent_insert_aggregate") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
index 048a07fb817..e771078f1fb 100644
--- 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
@@ -81,7 +81,7 @@ suite("txn_insert_concurrent_insert_duplicate") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
index c67119328ef..418992835d0 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
@@ -82,7 +82,7 @@ suite("txn_insert_concurrent_insert_mor") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
similarity index 95%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
index f8a971db75e..4d6e297cac9 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
@@ -22,11 +22,6 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.CompletableFuture
 
 suite("txn_insert_concurrent_insert_mow") {
-    if (isCloudMode()) {
-        logger.info("cloud txn load does not support mow")
-        return
-    }
-
     def tableName = "txn_insert_concurrent_insert_mow"
     List<String> errors = new ArrayList<>()
 
@@ -85,8 +80,12 @@ suite("txn_insert_concurrent_insert_mow") {
         }
     }
     sql """ sync """
+    def t2_row_count = 6001215
+    def result = sql """ select count() from ${tableName}_2 """
+    logger.info("${tableName}_2 row count: ${result}, expected: 
${t2_row_count}")
+    assertEquals(t2_row_count, result[0][0] as int)
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
@@ -156,7 +155,7 @@ suite("txn_insert_concurrent_insert_mow") {
     logger.info("error num: " + errors.size() + ", errors: " + errors)
 
     def t0_row_count = 6001215
-    def result = sql """ select count() from ${tableName}_0 """
+    result = sql """ select count() from ${tableName}_0 """
     logger.info("${tableName}_0 row count: ${result}, expected: 
${t0_row_count}")
 
     def t1_row_count = 2999666
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
similarity index 98%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
index a524703f9ef..d0b27641c64 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
@@ -23,11 +23,6 @@ import java.util.concurrent.CompletableFuture
 
 // test update and delete command
 suite("txn_insert_concurrent_insert_ud") {
-    if (isCloudMode()) {
-        logger.info("cloud txn load does not support mow")
-        return
-    }
-
     def tableName = "txn_insert_concurrent_insert_ud"
     List<String> errors = new ArrayList<>()
 
@@ -88,7 +83,7 @@ suite("txn_insert_concurrent_insert_ud") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
diff --git 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
index b467a87de82..eba69918660 100644
--- 
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
@@ -84,7 +84,7 @@ suite("txn_insert_concurrent_insert_update") {
     }
     sql """ sync """
 
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 
diff --git 
a/regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy 
b/regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
similarity index 99%
rename from 
regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy
rename to 
regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
index 56692b68d37..34b859b5e80 100644
--- a/regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy
+++ 
b/regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 // schema change and modify replica num
 suite("txn_insert_with_schema_change") {
     def tableName = "txn_insert_with_schema_change"
-    def dbName = "regression_test_insert_p2"
+    def dbName = "regression_test_insert_p2_transaction"
     def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
     logger.info("url: ${url}")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to