This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 09f7287cda5 [Opt](cloud) Notify BE to make committed rowset visible
directly after txn commit without fetching from meta service (#59754)
09f7287cda5 is described below
commit 09f7287cda507b1e7809036244859d38d6a54244
Author: bobhan1 <[email protected]>
AuthorDate: Wed Mar 18 10:54:56 2026 +0800
[Opt](cloud) Notify BE to make committed rowset visible directly after txn
commit without fetching from meta service (#59754)
### What problem does this PR solve?
## Test
1FE+3BE, 400bucket mow table, 10 concurrency stream load
before:
<img width="3218" height="1300" alt="image"
src="https://github.com/user-attachments/assets/3d4fe51d-4fae-484b-a257-a6b5ec010970"
/>
after:
<img width="3206" height="1304" alt="image"
src="https://github.com/user-attachments/assets/e7230b3c-999d-407b-aa57-783492995e5b"
/>
---
Problem Summary
In cloud storage mode, after a load transaction commits rowsets to Meta
Service (MS), BE nodes need to fetch the committed rowset metadata from
MS during subsequent sync_rowsets() operations. This introduces
additional latency and MS query overhead, especially for high-frequency
import scenarios.
This PR implements a notification mechanism where FE/BE directly
notifies BE nodes about committed rowsets after transaction commit,
allowing BE to update tablet metadata immediately without fetching from
MS.
Solution
1. CloudCommittedRSMgr (BE): A new in-memory manager that caches
committed temporary rowset metadata after they are committed to MS
- Stores rowset meta with expiration time
- Supports efficient lookup by (txn_id, tablet_id)
- Handles empty rowset markers
- Background thread for cleaning expired entries
2. Notification Flow:
- When FE commits a load transaction to MS, it sends
MakeCloudTmpRsVisibleTask to involved BE nodes
- BE receives the task with final version/visible_ts information
- BE promotes cached rowset metadata to tablet meta directly
- BE can forward notification to FE if needed (for tablets not on the
original BE)
3. Configuration:
- FE: enable_cloud_notify_be_after_load_txn_commit (default: false)
- BE: enable_cloud_make_rs_visible_on_be (default: false)
- BE: cloud_make_committed_rs_visible_worker_count (default: 4)
- BE: cloud_mow_sync_rowsets_when_load_txn_begin (default: true)
Benefits
- Reduces Meta Service query pressure by avoiding redundant rowset
metadata fetches
- Improves import latency by making rowsets visible faster
- Optimizes cloud storage mode performance for high-frequency imports
---
be/src/agent/agent_server.cpp | 6 +
be/src/agent/task_worker_pool.cpp | 52 +++
be/src/agent/task_worker_pool.h | 3 +
be/src/cloud/cloud_committed_rs_mgr.cpp | 142 +++++++
be/src/cloud/cloud_committed_rs_mgr.h | 87 +++++
be/src/cloud/cloud_delete_task.cpp | 10 +-
be/src/cloud/cloud_delta_writer.cpp | 4 +-
be/src/cloud/cloud_delta_writer.h | 2 +-
be/src/cloud/cloud_meta_mgr.cpp | 23 +-
be/src/cloud/cloud_meta_mgr.h | 4 +-
be/src/cloud/cloud_rowset_builder.cpp | 21 +-
be/src/cloud/cloud_rowset_builder.h | 2 +-
be/src/cloud/cloud_storage_engine.cpp | 3 +
be/src/cloud/cloud_storage_engine.h | 4 +
be/src/cloud/cloud_tablet.cpp | 169 +++++++++
be/src/cloud/cloud_tablet.h | 29 ++
be/src/cloud/cloud_tablet_mgr.cpp | 4 +
be/src/cloud/cloud_tablets_channel.cpp | 4 +-
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 41 +++
be/src/cloud/cloud_txn_delete_bitmap_cache.h | 4 +
be/src/common/config.cpp | 6 +
be/src/common/config.h | 6 +
be/src/storage/rowset/rowset_meta.h | 12 +-
be/test/cloud/cloud_committed_rs_mgr_test.cpp | 408 +++++++++++++++++++++
be/test/cloud/cloud_tablet_test.cpp | 365 ++++++++++++++++++
.../main/java/org/apache/doris/common/Config.java | 3 +
.../transaction/CloudGlobalTransactionMgr.java | 138 ++++++-
.../apache/doris/service/FrontendServiceImpl.java | 2 +-
.../java/org/apache/doris/task/AgentBatchTask.java | 10 +
.../doris/task/MakeCloudTmpRsVisibleTask.java | 73 ++++
.../doris/transaction/GlobalTransactionMgr.java | 2 +-
.../transaction/GlobalTransactionMgrIface.java | 2 +-
gensrc/thrift/AgentService.thrift | 9 +
gensrc/thrift/Types.thrift | 3 +-
...loud_dup_forward_notify_be_after_txn_commit.out | 20 +
.../test_cloud_dup_notify_be_after_txn_commit.out | 36 ++
...t_cloud_empty_rs_notify_be_after_txn_commit.out | 38 ++
.../test_cloud_mow_notify_be_after_txn_commit.out | 14 +
.../pipeline/cloud_p0/conf/be_custom.conf | 3 +
.../pipeline/cloud_p0/conf/fe_custom.conf | 4 +-
.../pipeline/cloud_p1/conf/be_custom.conf | 3 +
.../pipeline/cloud_p1/conf/fe_custom.conf | 2 +
...d_dup_forward_notify_be_after_txn_commit.groovy | 131 +++++++
...est_cloud_dup_notify_be_after_txn_commit.groovy | 161 ++++++++
...loud_empty_rs_notify_be_after_txn_commit.groovy | 191 ++++++++++
...est_cloud_mow_notify_be_after_txn_commit.groovy | 84 +++++
46 files changed, 2307 insertions(+), 33 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index a60686637fd..b13af97e717 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -240,6 +240,12 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
"ALTER_INVERTED_INDEX", config::alter_index_worker_count,
[&engine](auto&& task) { return alter_cloud_index_callback(engine,
task); });
+ _workers[TTaskType::MAKE_CLOUD_COMMITTED_RS_VISIBLE] =
std::make_unique<TaskWorkerPool>(
+ "MAKE_CLOUD_COMMITTED_RS_VISIBLE",
config::cloud_make_committed_rs_visible_worker_count,
+ [&engine](auto&& task) {
+ return make_cloud_committed_rs_visible_callback(engine, task);
+ });
+
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
[&cluster_info = _cluster_info] {
report_task_callback(cluster_info); }));
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 415c8b6742a..2753a527a4b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -51,6 +51,7 @@
#include "cloud/cloud_schema_change_job.h"
#include "cloud/cloud_snapshot_loader.h"
#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/config.h"
#include "common/config.h"
@@ -2393,6 +2394,57 @@ void calc_delete_bitmap_callback(CloudStorageEngine&
engine, const TAgentTaskReq
remove_task_info(req.task_type, req.signature);
}
+void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
+ const TAgentTaskRequest& req) {
+ if (!config::enable_cloud_make_rs_visible_on_be) {
+ return;
+ }
+ LOG(INFO) << "begin to make cloud tmp rs visible, txn_id="
+ << req.make_cloud_tmp_rs_visible_req.txn_id
+ << ", tablet_count=" <<
req.make_cloud_tmp_rs_visible_req.tablet_ids.size();
+
+ const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req;
+ auto& tablet_mgr = engine.tablet_mgr();
+
+ int64_t txn_id = make_visible_req.txn_id;
+ int64_t version_update_time_ms =
make_visible_req.__isset.version_update_time_ms
+ ?
make_visible_req.version_update_time_ms
+ : 0;
+
+ // Process each tablet involved in this transaction on this BE
+ for (int64_t tablet_id : make_visible_req.tablet_ids) {
+ auto tablet_result =
+ tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false,
+ /* sync_delete_bitmap */ false,
+ /* sync_stats */ nullptr, /*
force_use_only_cached */ true,
+ /* cache_on_miss */ false);
+ if (!tablet_result.has_value()) {
+ continue;
+ }
+ auto cloud_tablet = tablet_result.value();
+
+ int64_t partition_id = cloud_tablet->partition_id();
+ auto version_iter =
make_visible_req.partition_version_map.find(partition_id);
+ if (version_iter == make_visible_req.partition_version_map.end()) {
+ continue;
+ }
+ int64_t visible_version = version_iter->second;
+ DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto target_table_id = dp->param<int64_t>("table_id", -1);
+ auto version = dp->param<int64_t>("version", -1);
+ if ((target_tablet_id == tablet_id || target_table_id ==
cloud_tablet->table_id()) &&
+ version == visible_version) {
+ DBUG_BLOCK
+ }
+ });
+ cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version,
+ version_update_time_ms);
+ }
+ LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id
+ << ", processed_tablets=" << make_visible_req.tablet_ids.size();
+}
+
void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req)
{
LOG(INFO) << "clean trash start";
DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 300e1daa606..e5782055267 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -205,6 +205,9 @@ void report_tablet_callback(CloudStorageEngine& engine,
const ClusterInfo* clust
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
+void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
+ const TAgentTaskRequest& req);
+
void report_index_policy_callback(const ClusterInfo* cluster_info);
} // namespace doris
diff --git a/be/src/cloud/cloud_committed_rs_mgr.cpp
b/be/src/cloud/cloud_committed_rs_mgr.cpp
new file mode 100644
index 00000000000..3d96b7ca7f7
--- /dev/null
+++ b/be/src/cloud/cloud_committed_rs_mgr.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <chrono>
+
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "storage/rowset/rowset_meta.h"
+#include "util/thread.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
+
+CloudCommittedRSMgr::~CloudCommittedRSMgr() {
+ _stop_latch.count_down();
+ if (_clean_thread) {
+ _clean_thread->join();
+ }
+}
+
+Status CloudCommittedRSMgr::init() {
+ auto st = Thread::create(
+ "CloudCommittedRSMgr", "clean_committed_rs_thread",
+ [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr,
error: " << st;
+ }
+ return st;
+}
+
+void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t
tablet_id,
+ RowsetMetaSharedPtr rowset_meta,
+ int64_t expiration_time) {
+ int64_t txn_expiration_min =
+
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+ .count() +
+ config::tablet_txn_info_min_expired_seconds;
+ expiration_time = std::max(txn_expiration_min, expiration_time);
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ TxnTabletKey key(txn_id, tablet_id);
+ _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta,
expiration_time));
+ _expiration_map.emplace(expiration_time, key);
+ LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" <<
tablet_id
+ << ", rowset_id=" << rowset_meta->rowset_id().to_string()
+ << ", expiration_time=" << expiration_time;
+}
+
+Result<std::pair<RowsetMetaSharedPtr, int64_t>>
CloudCommittedRSMgr::get_committed_rowset(
+ int64_t txn_id, int64_t tablet_id) {
+ std::shared_lock<std::shared_mutex> rlock(_rwlock);
+ TxnTabletKey key(txn_id, tablet_id);
+ if (auto it = _empty_rowset_markers.find(key); it !=
_empty_rowset_markers.end()) {
+ return std::make_pair(nullptr, it->second);
+ }
+ auto iter = _committed_rs_map.find(key);
+ if (iter == _committed_rs_map.end()) {
+ return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
+ "committed rowset not found, txn_id={}, tablet_id={}", txn_id,
tablet_id));
+ }
+ return std::make_pair(iter->second.rowset_meta,
iter->second.expiration_time);
+}
+
+void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t
tablet_id) {
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ _committed_rs_map.erase({txn_id, tablet_id});
+}
+
+void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ while (!_expiration_map.empty()) {
+ auto iter = _expiration_map.begin();
+ if (!_committed_rs_map.contains(iter->second) &&
+ !_empty_rowset_markers.contains(iter->second)) {
+ _expiration_map.erase(iter);
+ continue;
+ }
+ int64_t expiration_time = iter->first;
+ if (expiration_time > current_time) {
+ break;
+ }
+
+ auto key = iter->second;
+ _expiration_map.erase(iter);
+
+ auto it_rs = _committed_rs_map.find(key);
+ if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time
== expiration_time) {
+ _committed_rs_map.erase(it_rs);
+ LOG(INFO) << "clean expired pending cloud rowset, txn_id=" <<
key.txn_id
+ << ", tablet_id=" << key.tablet_id << ",
expiration_time=" << expiration_time;
+ }
+ auto it_empty = _empty_rowset_markers.find(key);
+ if (it_empty != _empty_rowset_markers.end() && it_empty->second ==
expiration_time) {
+ _empty_rowset_markers.erase(it_empty);
+ LOG(INFO) << "clean expired empty rowset marker, txn_id=" <<
key.txn_id
+ << ", tablet_id=" << key.tablet_id << ",
expiration_time=" << expiration_time;
+ }
+ }
+}
+
+void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id,
+ int64_t txn_expiration) {
+ int64_t txn_expiration_min =
+
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+ .count() +
+ config::tablet_txn_info_min_expired_seconds;
+ txn_expiration = std::max(txn_expiration_min, txn_expiration);
+
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ TxnTabletKey txn_key(txn_id, tablet_id);
+ _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration);
+ _expiration_map.emplace(txn_expiration, txn_key);
+}
+
+void CloudCommittedRSMgr::_clean_thread_callback() {
+ do {
+ remove_expired_committed_rowsets();
+ } while (!_stop_latch.wait_for(
+
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
+}
+#include "common/compile_check_end.h"
+} // namespace doris
diff --git a/be/src/cloud/cloud_committed_rs_mgr.h
b/be/src/cloud/cloud_committed_rs_mgr.h
new file mode 100644
index 00000000000..33af0ba3979
--- /dev/null
+++ b/be/src/cloud/cloud_committed_rs_mgr.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "storage/rowset/rowset_fwd.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Thread;
+
+// Manages temporary rowset meta for cloud storage transactions in memory.
+// This cache stores rowset meta produced during import operations after they
+// are committed to MS. After the load txn was committed in MS finally, FE/BE
will
+// notifies the final version/visible_ts, BE can update and promote these
+// temporary rowsets to the tablet meta without fetching from MS in later
sync_rowsets().
+class CloudCommittedRSMgr {
+public:
+ CloudCommittedRSMgr();
+ ~CloudCommittedRSMgr();
+
+ Status init();
+
+ void add_committed_rowset(int64_t txn_id, int64_t tablet_id,
RowsetMetaSharedPtr rowset_meta,
+ int64_t expiration_time);
+
+ Result<std::pair<RowsetMetaSharedPtr, int64_t>>
get_committed_rowset(int64_t txn_id,
+
int64_t tablet_id);
+
+ void remove_committed_rowset(int64_t txn_id, int64_t tablet_id);
+
+ void remove_expired_committed_rowsets();
+
+ void mark_empty_rowset(int64_t txn_id, int64_t tablet_id, int64_t
txn_expiration);
+
+private:
+ void _clean_thread_callback();
+
+ struct TxnTabletKey {
+ int64_t txn_id;
+ int64_t tablet_id;
+
+ TxnTabletKey(int64_t txn_id_, int64_t tablet_id_)
+ : txn_id(txn_id_), tablet_id(tablet_id_) {}
+
+ auto operator<=>(const TxnTabletKey&) const = default;
+ };
+
+ struct CommittedRowsetValue {
+ RowsetMetaSharedPtr rowset_meta;
+ int64_t expiration_time; // seconds since epoch
+
+ CommittedRowsetValue(RowsetMetaSharedPtr rowset_meta_, int64_t
expiration_time_)
+ : rowset_meta(std::move(rowset_meta_)),
expiration_time(expiration_time_) {}
+ };
+
+ // Map: <txn_id, tablet_id> -> <rowset_meta, expiration_time>
+ std::map<TxnTabletKey, CommittedRowsetValue> _committed_rs_map;
+ // Multimap for efficient expiration cleanup: expiration_time -> <txn_id,
tablet_id>
+ std::multimap<int64_t, TxnTabletKey> _expiration_map;
+ std::map<TxnTabletKey, int64_t /* expiration_time */>
_empty_rowset_markers;
+ std::shared_mutex _rwlock;
+ std::shared_ptr<Thread> _clean_thread;
+ CountDownLatch _stop_latch;
+};
+#include "common/compile_check_end.h"
+} // namespace doris
diff --git a/be/src/cloud/cloud_delete_task.cpp
b/be/src/cloud/cloud_delete_task.cpp
index 0b51da4eafd..dc3d991df58 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -103,19 +103,27 @@ Status CloudDeleteTask::execute(CloudStorageEngine&
engine, const TPushReq& requ
}
st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to commit rowset, status=" << st.to_string();
+ return st;
+ }
// Update tablet stats
tablet->fetch_add_approximate_num_rowsets(1);
tablet->fetch_add_approximate_cumu_num_rowsets(1);
// TODO(liaoxin) delete operator don't send calculate delete bitmap task
from fe,
- // then we don't need to set_txn_related_delete_bitmap here.
+ // then we don't need to set_txn_related_info here.
if (tablet->enable_unique_key_merge_on_write()) {
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_id());
RowsetIdUnorderedSet rowset_ids;
engine.txn_delete_bitmap_cache().set_tablet_txn_info(
request.transaction_id, tablet->tablet_id(), delete_bitmap,
rowset_ids, rowset,
request.timeout, nullptr);
+ } else {
+ if (config::enable_cloud_make_rs_visible_on_be) {
+ engine.meta_mgr().cache_committed_rowset(rowset->rowset_meta(),
context.txn_expiration);
+ }
}
return st;
diff --git a/be/src/cloud/cloud_delta_writer.cpp
b/be/src/cloud/cloud_delta_writer.cpp
index 299182b45cb..d51d5d8b576 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -141,8 +141,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}
-Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
- return rowset_builder()->set_txn_related_delete_bitmap();
+Status CloudDeltaWriter::set_txn_related_info() {
+ return rowset_builder()->set_txn_related_info();
}
} // namespace doris
diff --git a/be/src/cloud/cloud_delta_writer.h
b/be/src/cloud/cloud_delta_writer.h
index 846149137b1..614bfd0f16a 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -51,7 +51,7 @@ public:
Status commit_rowset();
- Status set_txn_related_delete_bitmap();
+ Status set_txn_related_info();
std::shared_ptr<ResourceContext> resource_context() { return
_resource_ctx; }
private:
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f51b58bb757..bfa58af43d4 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -565,6 +565,14 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
using namespace std::chrono;
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets",
Status::OK(), tablet);
+ DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.inject_error", {
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ auto target_table_id = dp->param<int64_t>("table_id", -1);
+ if (target_tablet_id == tablet->tablet_id() || target_table_id ==
tablet->table_id()) {
+ return Status::InternalError(
+ "[sync_tablet_rowsets_unlocked] injected error for
testing");
+ }
+ });
MetaServiceProxy* proxy;
RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
@@ -800,7 +808,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
}
}
-bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet,
int64_t old_max_version,
+bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet,
std::ranges::range
auto&& rs_metas,
DeleteBitmap*
delete_bitmap) {
std::set<int64_t> txn_processed;
@@ -958,7 +966,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet*
tablet, int64_t old_
}
if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache &&
- sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas,
delete_bitmap)) {
+ sync_tablet_delete_bitmap_by_cache(tablet, rs_metas, delete_bitmap)) {
if (sync_stats) {
sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size();
}
@@ -1376,6 +1384,17 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta,
const std::string& job_i
return st;
}
+void CloudMetaMgr::cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t
expiration_time) {
+ // For load-generated rowsets (job_id is empty), add to pending rowset
manager
+ // so FE can notify BE to promote them later
+
+ // TODO(bobhan1): copy rs_meta?
+ int64_t txn_id = rs_meta->txn_id();
+ int64_t tablet_id = rs_meta->tablet_id();
+
ExecEnv::GetInstance()->storage_engine().to_cloud().committed_rs_mgr().add_committed_rowset(
+ txn_id, tablet_id, std::move(rs_meta), expiration_time);
+}
+
Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id();
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 2713ce12921..5bf54dde548 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -83,6 +83,7 @@ public:
Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
+ void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t
expiration_time);
Status update_tmp_rowset(const RowsetMeta& rs_meta);
@@ -176,8 +177,7 @@ public:
std::string* my_cluster_id = nullptr);
private:
- bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t
old_max_version,
- std::ranges::range auto&& rs_metas,
+ bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap);
Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t
old_max_version,
diff --git a/be/src/cloud/cloud_rowset_builder.cpp
b/be/src/cloud/cloud_rowset_builder.cpp
index 0ce4829d9fe..1f73d11d20e 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -43,10 +43,12 @@ Status CloudRowsetBuilder::init() {
std::shared_ptr<MowContext> mow_context;
if (_tablet->enable_unique_key_merge_on_write()) {
- auto st =
std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets();
- // sync_rowsets will return INVALID_TABLET_STATE when tablet is under
alter
- if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
- return st;
+ if (config::cloud_mow_sync_rowsets_when_load_txn_begin) {
+ auto st =
std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets();
+ // sync_rowsets will return INVALID_TABLET_STATE when tablet is
under alter
+ if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
+ return st;
+ }
}
RETURN_IF_ERROR(init_mow_context(mow_context));
}
@@ -130,7 +132,7 @@ const RowsetMetaSharedPtr&
CloudRowsetBuilder::rowset_meta() {
return _rowset_writer->rowset_meta();
}
-Status CloudRowsetBuilder::set_txn_related_delete_bitmap() {
+Status CloudRowsetBuilder::set_txn_related_info() {
if (_tablet->enable_unique_key_merge_on_write()) {
// For empty rowsets when skip_writing_empty_rowset_metadata=true,
// store only a lightweight marker instead of full rowset info.
@@ -156,6 +158,15 @@ Status CloudRowsetBuilder::set_txn_related_delete_bitmap()
{
_engine.txn_delete_bitmap_cache().set_tablet_txn_info(
_req.txn_id, _tablet->tablet_id(), _delete_bitmap,
*_rowset_ids, _rowset,
_req.txn_expiration, _partial_update_info);
+ } else {
+ if (config::enable_cloud_make_rs_visible_on_be) {
+ if (_skip_writing_rowset_metadata) {
+ _engine.committed_rs_mgr().mark_empty_rowset(_req.txn_id,
_tablet->tablet_id(),
+
_req.txn_expiration);
+ } else {
+ _engine.meta_mgr().cache_committed_rowset(rowset_meta(),
_req.txn_expiration);
+ }
+ }
}
return Status::OK();
}
diff --git a/be/src/cloud/cloud_rowset_builder.h
b/be/src/cloud/cloud_rowset_builder.h
index 3384f235167..cec8cfed979 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -37,7 +37,7 @@ public:
const RowsetMetaSharedPtr& rowset_meta();
- Status set_txn_related_delete_bitmap();
+ Status set_txn_related_info();
void set_skip_writing_rowset_metadata(bool skip) {
_skip_writing_rowset_metadata = skip; }
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 432c1fde72f..927d5bef343 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -223,6 +223,9 @@ Status CloudStorageEngine::open() {
: config::delete_bitmap_agg_cache_capacity);
RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
+ _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>();
+ RETURN_IF_ERROR(_committed_rs_mgr->init());
+
_file_cache_block_downloader =
std::make_unique<io::FileCacheBlockDownloader>(*this);
_cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 7d0a3e61296..68626ec0d9e 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -24,6 +24,7 @@
//#include "cloud/cloud_cumulative_compaction.h"
//#include "cloud/cloud_base_compaction.h"
//#include "cloud/cloud_full_compaction.h"
+#include "cloud/cloud_committed_rs_mgr.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_txn_delete_bitmap_cache.h"
@@ -94,6 +95,8 @@ public:
CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return
*_txn_delete_bitmap_cache; }
+ CloudCommittedRSMgr& committed_rs_mgr() const { return *_committed_rs_mgr;
}
+
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}
@@ -214,6 +217,7 @@ private:
std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
+ std::unique_ptr<CloudCommittedRSMgr> _committed_rs_mgr;
std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 9936ac374f6..faa9a82ae6a 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -27,6 +27,7 @@
#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
+#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
@@ -41,6 +42,7 @@
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cloud/cloud_warm_up_manager.h"
+#include "cloud/config.h"
#include "common/cast_set.h"
#include "common/config.h"
#include "common/logging.h"
@@ -1829,5 +1831,172 @@ void
CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
_tablet_meta->add_rowsets_unchecked(rowsets);
}
+void CloudTablet::clear_unused_visible_pending_rowsets() {
+ int64_t cur_max_version = max_version().second;
+ int32_t max_version_count = max_version_config();
+ int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ std::unique_lock<std::mutex> wlock(_visible_pending_rs_lock);
+ for (auto it = _visible_pending_rs_map.begin(); it !=
_visible_pending_rs_map.end();) {
+ if (int64_t version = it->first, expiration_time =
it->second.expiration_time;
+ version <= cur_max_version || expiration_time < current_time) {
+ it = _visible_pending_rs_map.erase(it);
+ } else {
+ ++it;
+ }
+ }
+
+ while (!_visible_pending_rs_map.empty() && _visible_pending_rs_map.size()
> max_version_count) {
+ _visible_pending_rs_map.erase(--_visible_pending_rs_map.end());
+ }
+}
+
+void CloudTablet::try_make_committed_rs_visible(int64_t txn_id, int64_t
visible_version,
+ int64_t
version_update_time_ms) {
+ if (enable_unique_key_merge_on_write()) {
+ // for mow tablet, we get committed rowset from
`CloudTxnDeleteBitmapCache` rather than `CommittedRowsetManager`
+ try_make_committed_rs_visible_for_mow(txn_id, visible_version,
version_update_time_ms);
+ return;
+ }
+
+ auto& committed_rs_mgr = _engine.committed_rs_mgr();
+ auto res = committed_rs_mgr.get_committed_rowset(txn_id, tablet_id());
+ if (!res.has_value()) {
+ return;
+ }
+ auto [rowset_meta, expiration_time] = res.value();
+ bool is_empty_rowset = (rowset_meta == nullptr);
+ if (!is_empty_rowset) {
+ rowset_meta->set_cloud_fields_after_visible(visible_version,
version_update_time_ms);
+ }
+ {
+ std::lock_guard<std::mutex> lock(_visible_pending_rs_lock);
+ _visible_pending_rs_map.emplace(
+ visible_version,
+ VisiblePendingRowset {rowset_meta, expiration_time,
is_empty_rowset});
+ }
+ apply_visible_pending_rowsets();
+ committed_rs_mgr.remove_committed_rowset(txn_id, tablet_id());
+}
+
+void CloudTablet::try_make_committed_rs_visible_for_mow(int64_t txn_id,
int64_t visible_version,
+ int64_t
version_update_time_ms) {
+ Defer defer {[&] {
+
_engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet_id());
+ }};
+ auto res =
_engine.txn_delete_bitmap_cache().get_rowset_and_delete_bitmap(txn_id,
tablet_id());
+ if (!res.has_value()) {
+ return;
+ }
+ auto [rowset, delete_bitmap] = res.value();
+ bool is_empty_rowset = (rowset == nullptr);
+ {
+ std::unique_lock lock {_sync_meta_lock};
+ std::unique_lock meta_wlock {_meta_lock};
+ if (_max_version + 1 != visible_version) {
+ return;
+ }
+ if (is_empty_rowset) {
+ Versions existing_versions;
+ for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ if (existing_versions.empty()) {
+ return;
+ }
+ auto max_version = std::ranges::max(existing_versions, {},
&Version::first);
+ auto prev_rowset = get_rowset_by_version(max_version);
+ auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
+ this, visible_version, prev_rowset->rowset_meta(),
&rowset);
+ if (!st.ok()) {
+ return;
+ }
+ } else {
+ for (const auto& [delete_bitmap_key, bitmap_value] :
delete_bitmap->delete_bitmap) {
+ // skip sentinel mark, which is used for delete bitmap
correctness check
+ if (std::get<1>(delete_bitmap_key) !=
DeleteBitmap::INVALID_SEGMENT_ID) {
+ tablet_meta()->delete_bitmap().merge(
+ {std::get<0>(delete_bitmap_key),
std::get<1>(delete_bitmap_key),
+ visible_version},
+ bitmap_value);
+ }
+ }
+ }
+ rowset->rowset_meta()->set_cloud_fields_after_visible(visible_version,
+
version_update_time_ms);
+ add_rowsets({rowset}, false, meta_wlock, true);
+ }
+ LOG(INFO) << "mow added visible pending rowset, txn_id=" << txn_id
+ << ", tablet_id=" << tablet_id() << ", version=" <<
visible_version
+ << ", rowset_id=" << rowset->rowset_id().to_string();
+}
+
+void CloudTablet::apply_visible_pending_rowsets() {
+ Defer defer {[&] { clear_unused_visible_pending_rowsets(); }};
+
+ std::unique_lock lock(_sync_meta_lock);
+ std::unique_lock<std::shared_mutex> meta_wlock(_meta_lock);
+ int64_t next_version = _max_version + 1;
+ std::vector<RowsetSharedPtr> to_add;
+ std::lock_guard<std::mutex> pending_lock(_visible_pending_rs_lock);
+ for (auto it = _visible_pending_rs_map.upper_bound(_max_version);
+ it != _visible_pending_rs_map.end(); ++it) {
+ int64_t version = it->first;
+ if (version != next_version) break;
+
+ auto& pending_rs = it->second;
+ if (pending_rs.is_empty_rowset) {
+ RowsetSharedPtr prev_rowset {nullptr};
+ if (!to_add.empty()) {
+ prev_rowset = to_add.back();
+ } else {
+ Versions existing_versions;
+ for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ if (existing_versions.empty()) {
+ break;
+ }
+ auto max_version = std::ranges::max(existing_versions, {},
&Version::first);
+ prev_rowset = get_rowset_by_version(max_version);
+ }
+ RowsetSharedPtr rowset;
+ auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
+ this, version, prev_rowset->rowset_meta(), &rowset);
+ if (!st.ok()) {
+ return;
+ }
+ to_add.push_back(std::move(rowset));
+ } else {
+ RowsetSharedPtr rowset;
+ auto st = RowsetFactory::create_rowset(nullptr, "",
pending_rs.rowset_meta, &rowset);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create rowset from pending rowset
meta, tablet_id="
+ << tablet_id() << ", version=" << version
+ << ", rowset_id=" <<
pending_rs.rowset_meta->rowset_id().to_string()
+ << ", error=" << st;
+ break;
+ }
+ to_add.push_back(std::move(rowset));
+ }
+ next_version++;
+ }
+ if (!to_add.empty()) {
+ add_rowsets(to_add, false, meta_wlock, true);
+ LOG_INFO(
+ "applied_visible_pending_rowsets, tablet_id={},
new_max_version={}, "
+ "count={}, new_rowsets={}",
+ tablet_id(), _max_version, to_add.size(),
+ fmt::join(to_add | std::views::transform([](const
RowsetSharedPtr& rs) {
+ return fmt::format("{}{}",
rs->rowset_id().to_string(),
+ rs->version().to_string());
+ }),
+ ","));
+ }
+}
+
#include "common/compile_check_end.h"
+
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e7a9b13e851..889d92808f2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -368,6 +368,18 @@ public:
void add_warmed_up_rowset(const RowsetId& rowset_id);
+ // Try to apply visible pending rowsets to tablet meta in version order
+ // This should be called after receiving FE notification or when new
rowsets are added
+ // @return Status::OK() if successfully applied, error otherwise
+ void apply_visible_pending_rowsets();
+
+ void try_make_committed_rs_visible(int64_t txn_id, int64_t visible_version,
+ int64_t version_update_time_ms);
+ void try_make_committed_rs_visible_for_mow(int64_t txn_id, int64_t
visible_version,
+ int64_t version_update_time_ms);
+
+ void clear_unused_visible_pending_rowsets();
+
std::string rowset_warmup_digest() const {
std::string res;
auto add_log = [&](const RowsetSharedPtr& rs) {
@@ -507,6 +519,23 @@ private:
mutable std::shared_mutex _cluster_info_mutex;
std::string _last_active_cluster_id;
int64_t _last_active_time_ms {0};
+
+ // Map: version -> <rowset_meta, expiration_time>
+ // Stores rowsets that have been notified by FE but not yet added to
tablet meta
+ // due to out-of-order notification or version discontinuity
+ struct VisiblePendingRowset {
+ const bool is_empty_rowset;
+ const int64_t expiration_time; // seconds since epoch
+ RowsetMetaSharedPtr rowset_meta;
+
+ VisiblePendingRowset(RowsetMetaSharedPtr rowset_meta_, int64_t
expiration_time_,
+ bool is_empty_rowset_ = false)
+ : is_empty_rowset(is_empty_rowset_),
+ expiration_time(expiration_time_),
+ rowset_meta(std::move(rowset_meta_)) {}
+ };
+ mutable std::mutex _visible_pending_rs_lock;
+ std::map<int64_t, VisiblePendingRowset> _visible_pending_rs_map;
};
using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index 8c5f14fd7dc..3e979864138 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -360,6 +360,10 @@ void CloudTabletMgr::vacuum_stale_rowsets(const
CountDownLatch& stop_latch) {
<< ", tablet_id=" <<
tablet_id_with_max_useless_rowset_version_count;
}
}
+ {
+ _tablet_map->traverse(
+ [](auto&& tablet) {
tablet->clear_unused_visible_pending_rowsets(); });
+ }
}
std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
diff --git a/be/src/cloud/cloud_tablets_channel.cpp
b/be/src/cloud/cloud_tablets_channel.cpp
index 173b8654696..ff01f2c5858 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -250,9 +250,9 @@ Status CloudTabletsChannel::close(LoadChannel* parent,
const PTabletWriterAddBlo
}
}
- // 6. set txn related delete bitmap if necessary
+ // 6. set txn related info if necessary
for (auto it = writers_to_commit.begin(); it != writers_to_commit.end();) {
- auto st = (*it)->set_txn_related_delete_bitmap();
+ auto st = (*it)->set_txn_related_info();
if (!st.ok()) {
_add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
_close_status = std::move(st);
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index f5a210feebc..b6349067b87 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "cpp/sync_point.h"
#include "storage/olap_common.h"
+#include "storage/rowset/rowset_fwd.h"
#include "storage/tablet/tablet_meta.h"
#include "storage/txn/txn_manager.h"
@@ -96,6 +97,46 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
return st;
}
+Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
+CloudTxnDeleteBitmapCache::get_rowset_and_delete_bitmap(TTransactionId
transaction_id,
+ int64_t tablet_id) {
+ RowsetSharedPtr rowset;
+ {
+ std::shared_lock<std::shared_mutex> rlock(_rwlock);
+ TxnKey txn_key(transaction_id, tablet_id);
+ if (_empty_rowset_markers.contains(txn_key)) {
+ return std::make_pair(nullptr, nullptr);
+ }
+ auto iter = _txn_map.find(txn_key);
+ if (iter == _txn_map.end()) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+ if (!(iter->second.publish_status &&
+ *(iter->second.publish_status) == PublishStatus::SUCCEED)) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+ rowset = iter->second.rowset;
+ }
+
+ std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+ CacheKey key(key_str);
+ Cache::Handle* handle = lookup(key);
+
+ DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss",
{
+ handle = nullptr;
+ LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss,
make cache missed "
+ "when get delete bitmap, txn_id:"
+ << transaction_id << ", tablet_id: " << tablet_id;
+ });
+ DeleteBitmapCacheValue* val =
+ handle == nullptr ? nullptr :
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
+ if (!val) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+ Defer defer {[this, handle] { release(handle); }};
+ return std::make_pair(rowset, val->delete_bitmap);
+}
+
Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr*
delete_bitmap,
RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>*
publish_status) {
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
index 4274cb1b439..7cf6c27ecd8 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
@@ -77,6 +77,10 @@ public:
DeleteBitmapPtr* delete_bitmap,
RowsetIdUnorderedSet* rowset_ids,
std::shared_ptr<PublishStatus>* publish_status);
+ // the caller should guarantee that the txn `transaction_id` has been
published successfully in MS
+ Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
get_rowset_and_delete_bitmap(
+ TTransactionId transaction_id, int64_t tablet_id);
+
private:
void _clean_thread_callback();
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9d692c4ef0e..97c64a9b7a5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -251,6 +251,8 @@ DEFINE_Int32(num_query_ctx_map_partitions, "128");
DEFINE_Int32(make_snapshot_worker_count, "5");
// the count of thread to release snapshot
DEFINE_Int32(release_snapshot_worker_count, "5");
+// the count of thread to make committed rowsets visible in cloud mode
+DEFINE_Int32(cloud_make_committed_rs_visible_worker_count, "16");
// report random wait a little time to avoid FE receiving multiple be reports
at the same time.
// do not set it to false for production environment
DEFINE_mBool(report_random_wait, "true");
@@ -1712,6 +1714,10 @@ DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
DEFINE_Validator(concurrency_stats_dump_interval_ms,
[](const int32_t config) -> bool { return config >= 10; });
+DEFINE_mBool(cloud_mow_sync_rowsets_when_load_txn_begin, "true");
+
+DEFINE_mBool(enable_cloud_make_rs_visible_on_be, "false");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b1f76c32fea..e3fcefcb6ac 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -308,6 +308,8 @@ DECLARE_Int32(download_worker_count);
DECLARE_Int32(make_snapshot_worker_count);
// the count of thread to release snapshot
DECLARE_Int32(release_snapshot_worker_count);
+// the count of thread to make committed rowsets visible in cloud mode
+DECLARE_Int32(cloud_make_committed_rs_visible_worker_count);
// report random wait a little time to avoid FE receiving multiple be reports
at the same time.
// do not set it to false for production environment
DECLARE_mBool(report_random_wait);
@@ -1792,6 +1794,10 @@ DECLARE_mString(aws_credentials_provider_version);
DECLARE_mBool(enable_concurrency_stats_dump);
DECLARE_mInt32(concurrency_stats_dump_interval_ms);
+DECLARE_mBool(cloud_mow_sync_rowsets_when_load_txn_begin);
+
+DECLARE_mBool(enable_cloud_make_rs_visible_on_be);
+
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/storage/rowset/rowset_meta.h
b/be/src/storage/rowset/rowset_meta.h
index 90b21ed1aa6..83a908613c5 100644
--- a/be/src/storage/rowset/rowset_meta.h
+++ b/be/src/storage/rowset/rowset_meta.h
@@ -395,11 +395,9 @@ public:
}
return system_clock::from_time_t(newest_write_timestamp());
}
-#ifdef BE_TEST
void set_visible_ts_ms(int64_t visible_ts_ms) {
_rowset_meta_pb.set_visible_ts_ms(visible_ts_ms);
}
-#endif
void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
void set_tablet_schema(const TabletSchemaPB& tablet_schema);
@@ -462,6 +460,16 @@ public:
[algorithm]() -> Result<EncryptionAlgorithmPB> { return
algorithm; });
}
+ void set_cloud_fields_after_visible(int64_t visible_version, int64_t
version_update_time_ms) {
+ // Update rowset meta with correct version and visible_ts
+ // !!ATTENTION!!: this code should be updated if there are more fields
+ // in rowset meta which will be modified in meta-service when
commit_txn in the future
+ set_version({visible_version, visible_version});
+ if (version_update_time_ms > 0) {
+ set_visible_ts_ms(version_update_time_ms);
+ }
+ }
+
private:
bool _deserialize_from_pb(std::string_view value);
diff --git a/be/test/cloud/cloud_committed_rs_mgr_test.cpp
b/be/test/cloud/cloud_committed_rs_mgr_test.cpp
new file mode 100644
index 00000000000..9df6597b8a6
--- /dev/null
+++ b/be/test/cloud/cloud_committed_rs_mgr_test.cpp
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <memory>
+#include <thread>
+
+#include "cloud/config.h"
+#include "storage/rowset/rowset_meta.h"
+
+namespace doris {
+
+class CloudCommittedRSMgrTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _mgr = std::make_unique<CloudCommittedRSMgr>();
+ // Do not call init() to avoid starting background cleanup thread
+ }
+
+ void TearDown() override { _mgr.reset(); }
+
+ RowsetMetaSharedPtr create_rowset_meta(int64_t tablet_id, int64_t txn_id,
+ int64_t rowset_id_val = 0) {
+ RowsetMetaPB rowset_meta_pb;
+ rowset_meta_pb.set_tablet_id(tablet_id);
+ rowset_meta_pb.set_txn_id(txn_id);
+ rowset_meta_pb.set_num_segments(1);
+ rowset_meta_pb.set_num_rows(100);
+ rowset_meta_pb.set_total_disk_size(1024);
+ rowset_meta_pb.set_data_disk_size(512);
+
+ RowsetId rowset_id;
+ if (rowset_id_val == 0) {
+ rowset_id.init(txn_id);
+ } else {
+ rowset_id.init(rowset_id_val);
+ }
+ rowset_meta_pb.set_rowset_id(0);
+ rowset_meta_pb.set_rowset_id_v2(rowset_id.to_string());
+
+ auto rowset_meta = std::make_shared<RowsetMeta>();
+ rowset_meta->init_from_pb(rowset_meta_pb);
+ return rowset_meta;
+ }
+
+ int64_t current_time_seconds() {
+ return std::chrono::duration_cast<std::chrono::seconds>(
+ std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
+
+protected:
+ std::unique_ptr<CloudCommittedRSMgr> _mgr;
+};
+
+TEST_F(CloudCommittedRSMgrTest, TestAddAndGetCommittedRowset) {
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+ int64_t expiration_time = current_time_seconds() + 3600;
+
+ // Add committed rowset
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
expiration_time);
+
+ // Get committed rowset
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ ASSERT_NE(retrieved_meta, nullptr);
+ EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+ EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+ EXPECT_EQ(retrieved_meta->rowset_id().to_string(),
rowset_meta->rowset_id().to_string());
+ EXPECT_EQ(retrieved_expiration, expiration_time);
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestGetNonExistentRowset) {
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ EXPECT_FALSE(result.has_value());
+ EXPECT_TRUE(result.error().is<ErrorCode::NOT_FOUND>());
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestRemoveCommittedRowset) {
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+ int64_t expiration_time = current_time_seconds() + 3600;
+
+ // Add committed rowset
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
expiration_time);
+
+ // Verify it exists
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+
+ // Remove it
+ _mgr->remove_committed_rowset(txn_id, tablet_id);
+
+ // Verify it's gone
+ result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ EXPECT_FALSE(result.has_value());
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestRemoveExpiredCommittedRowsets) {
+ // Save original config value
+ int32_t original_min_expired_seconds =
config::tablet_txn_info_min_expired_seconds;
+ // Set min expiration to 0 to allow testing of past expiration times
+ config::tablet_txn_info_min_expired_seconds = -100;
+
+ int64_t current_time = current_time_seconds();
+
+ // Add expired rowset
+ int64_t txn_id_1 = 1000;
+ int64_t tablet_id_1 = 2000;
+ auto rowset_meta_1 = create_rowset_meta(tablet_id_1, txn_id_1);
+ int64_t expiration_time_1 = current_time - 10; // Already expired
+ _mgr->add_committed_rowset(txn_id_1, tablet_id_1, rowset_meta_1,
expiration_time_1);
+
+ // Add non-expired rowset
+ int64_t txn_id_2 = 1001;
+ int64_t tablet_id_2 = 2001;
+ auto rowset_meta_2 = create_rowset_meta(tablet_id_2, txn_id_2);
+ int64_t expiration_time_2 = current_time + 3600; // Not expired
+ _mgr->add_committed_rowset(txn_id_2, tablet_id_2, rowset_meta_2,
expiration_time_2);
+
+ // Verify both exist
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_1, tablet_id_1).has_value());
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_2, tablet_id_2).has_value());
+
+ // Remove expired rowsets
+ _mgr->remove_expired_committed_rowsets();
+
+ // Verify expired rowset is removed
+ EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_1,
tablet_id_1).has_value());
+ // Verify non-expired rowset still exists
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_2, tablet_id_2).has_value());
+
+ // Restore config
+ config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMarkAndCheckEmptyRowset) {
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ int64_t txn_expiration = current_time_seconds() + 3600;
+
+ // Initially not marked as empty
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ EXPECT_FALSE(result.has_value());
+
+ // Mark as empty
+ _mgr->mark_empty_rowset(txn_id, tablet_id, txn_expiration);
+
+ // Check it's marked as empty
+ result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_EQ(retrieved_meta, nullptr);
+ EXPECT_EQ(retrieved_expiration, txn_expiration);
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestEmptyRowsetExpiration) {
+ // Save original config value
+ int32_t original_min_expired_seconds =
config::tablet_txn_info_min_expired_seconds;
+ // Set min expiration to 0 to allow testing of past expiration times
+ config::tablet_txn_info_min_expired_seconds = -100;
+
+ int64_t current_time = current_time_seconds();
+
+ // Mark as empty with past expiration
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ int64_t txn_expiration = current_time - 10; // Already expired
+ _mgr->mark_empty_rowset(txn_id, tablet_id, txn_expiration);
+
+ // Verify it's marked
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ EXPECT_EQ(result.value().first, nullptr);
+
+ // Remove expired rowsets
+ _mgr->remove_expired_committed_rowsets();
+
+ // Verify it's removed
+ result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ EXPECT_FALSE(result.has_value());
+
+ // Restore config
+ config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMultipleRowsets) {
+ int64_t expiration_time = current_time_seconds() + 3600;
+
+ // Add multiple rowsets for different tablets and transactions
+ for (int i = 0; i < 10; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
expiration_time);
+ }
+
+ // Verify all rowsets can be retrieved
+ for (int i = 0; i < 10; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+ EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+ }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestSameTransactionDifferentTablets) {
+ int64_t txn_id = 1000;
+ int64_t expiration_time = current_time_seconds() + 3600;
+
+ // Add same txn_id for different tablets
+ for (int i = 0; i < 5; i++) {
+ int64_t tablet_id = 2000 + i;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
expiration_time);
+ }
+
+ // Verify all can be retrieved independently
+ for (int i = 0; i < 5; i++) {
+ int64_t tablet_id = 2000 + i;
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+ EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+ }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMinExpirationTime) {
+ // Save original config value
+ int64_t original_min_expired_seconds =
config::tablet_txn_info_min_expired_seconds;
+
+ // Set min expiration to 100 seconds
+ config::tablet_txn_info_min_expired_seconds = 100;
+
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+
+ // Try to set expiration time less than min
+ int64_t current_time = current_time_seconds();
+ int64_t short_expiration = current_time + 10; // Less than min
+
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
short_expiration);
+
+ // Get and verify expiration is at least min
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_GE(retrieved_expiration, current_time +
config::tablet_txn_info_min_expired_seconds);
+
+ // Restore config
+ config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMixedRowsetsAndEmptyMarkers) {
+ int64_t expiration_time = current_time_seconds() + 3600;
+
+ // Add some normal rowsets
+ for (int i = 0; i < 5; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta,
expiration_time);
+ }
+
+ // Add some empty markers
+ for (int i = 5; i < 10; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ _mgr->mark_empty_rowset(txn_id, tablet_id, expiration_time);
+ }
+
+ // Verify normal rowsets
+ for (int i = 0; i < 5; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_NE(retrieved_meta, nullptr);
+ }
+
+ // Verify empty markers
+ for (int i = 5; i < 10; i++) {
+ int64_t txn_id = 1000 + i;
+ int64_t tablet_id = 2000 + i;
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta, retrieved_expiration] = result.value();
+ EXPECT_EQ(retrieved_meta, nullptr);
+ }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestExpiredRowsetsCleanupWithMixedTypes) {
+ // Save original config value
+ int64_t original_min_expired_seconds =
config::tablet_txn_info_min_expired_seconds;
+ // Set min expiration to 0 to allow testing of past expiration times
+ config::tablet_txn_info_min_expired_seconds = 0;
+
+ int64_t current_time = current_time_seconds();
+
+ // Add expired normal rowset
+ int64_t txn_id_1 = 1000;
+ int64_t tablet_id_1 = 2000;
+ auto rowset_meta_1 = create_rowset_meta(tablet_id_1, txn_id_1);
+ _mgr->add_committed_rowset(txn_id_1, tablet_id_1, rowset_meta_1,
current_time - 10);
+
+ // Add expired empty marker
+ int64_t txn_id_2 = 1001;
+ int64_t tablet_id_2 = 2001;
+ _mgr->mark_empty_rowset(txn_id_2, tablet_id_2, current_time - 10);
+
+ // Add non-expired normal rowset
+ int64_t txn_id_3 = 1002;
+ int64_t tablet_id_3 = 2002;
+ auto rowset_meta_3 = create_rowset_meta(tablet_id_3, txn_id_3);
+ _mgr->add_committed_rowset(txn_id_3, tablet_id_3, rowset_meta_3,
current_time + 3600);
+
+ // Add non-expired empty marker
+ int64_t txn_id_4 = 1003;
+ int64_t tablet_id_4 = 2003;
+ _mgr->mark_empty_rowset(txn_id_4, tablet_id_4, current_time + 3600);
+
+ // Verify all exist
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_1, tablet_id_1).has_value());
+ auto result_2 = _mgr->get_committed_rowset(txn_id_2, tablet_id_2);
+ EXPECT_TRUE(result_2.has_value());
+ EXPECT_EQ(result_2.value().first, nullptr);
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_3, tablet_id_3).has_value());
+ auto result_4 = _mgr->get_committed_rowset(txn_id_4, tablet_id_4);
+ EXPECT_TRUE(result_4.has_value());
+ EXPECT_EQ(result_4.value().first, nullptr);
+
+ // Remove expired
+ _mgr->remove_expired_committed_rowsets();
+
+ // Verify expired are removed
+ EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_1,
tablet_id_1).has_value());
+ EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_2,
tablet_id_2).has_value());
+
+ // Verify non-expired still exist
+ EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_3, tablet_id_3).has_value());
+ result_4 = _mgr->get_committed_rowset(txn_id_4, tablet_id_4);
+ EXPECT_TRUE(result_4.has_value());
+ EXPECT_EQ(result_4.value().first, nullptr);
+
+ // Restore config
+ config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestUpdateSameRowset) {
+ int64_t txn_id = 1000;
+ int64_t tablet_id = 2000;
+ int64_t expiration_time_1 = current_time_seconds() + 1800;
+ int64_t expiration_time_2 = current_time_seconds() + 3600;
+
+ // Add rowset first time
+ auto rowset_meta_1 = create_rowset_meta(tablet_id, txn_id, 10001);
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta_1,
expiration_time_1);
+
+ // Verify first rowset is added
+ auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta_1, retrieved_expiration_1] = result.value();
+ EXPECT_EQ(retrieved_meta_1->rowset_id().to_string(),
rowset_meta_1->rowset_id().to_string());
+
+ // Add same txn_id and tablet_id again with different rowset and expiration
+ // Due to using insert_or_assign(), the second insert should overwrite the
first one
+ auto rowset_meta_2 = create_rowset_meta(tablet_id, txn_id, 10002);
+ _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta_2,
expiration_time_2);
+
+ // Get and verify it's the second one (insert_or_assign overwrites)
+ result = _mgr->get_committed_rowset(txn_id, tablet_id);
+ ASSERT_TRUE(result.has_value());
+ auto [retrieved_meta_2, retrieved_expiration_2] = result.value();
+ EXPECT_EQ(retrieved_meta_2->rowset_id().to_string(),
rowset_meta_2->rowset_id().to_string());
+}
+
+} // namespace doris
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
index 7f989037e2b..2fee56fbac8 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -940,4 +940,369 @@ TEST_F(CloudTabletSyncMetaTest,
TestSyncMetaMultipleProperties) {
sp->disable_processing();
sp->clear_all_call_backs();
}
+class CloudTabletApplyVisiblePendingTest : public testing::Test {
+public:
+ CloudTabletApplyVisiblePendingTest() :
_engine(CloudStorageEngine(EngineOptions {})) {}
+
+ void SetUp() override {
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ _tablet =
+ std::make_shared<CloudTablet>(_engine,
std::make_shared<TabletMeta>(*_tablet_meta));
+ }
+
+ void TearDown() override {}
+
+ RowsetSharedPtr create_rowset(Version version, int num_segments = 1) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(version);
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ rs_meta->set_num_segments(num_segments);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta,
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+ RowsetMetaSharedPtr create_pending_rowset_meta(int64_t version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(Version(version, version));
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ rs_meta->set_num_segments(1);
+ return rs_meta;
+ }
+
+ // Create a rowset whose RowsetMeta carries a valid TabletSchema,
+ // required as template for create_empty_rowset_for_hole.
+ RowsetSharedPtr create_rowset_with_schema(Version version, int
num_segments = 1) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(version);
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ rs_meta->set_num_segments(num_segments);
+
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+ auto* col = schema_pb.add_column();
+ col->set_unique_id(0);
+ col->set_name("k1");
+ col->set_type("INT");
+ col->set_is_key(true);
+ col->set_is_nullable(false);
+ rs_meta->set_tablet_schema(schema_pb);
+
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta,
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+ void add_initial_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
+ std::unique_lock<std::shared_mutex>
meta_wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets(std::vector<RowsetSharedPtr>(rowsets), false,
meta_wlock, false);
+ }
+
+ void add_pending_rowset(int64_t version, RowsetMetaSharedPtr rowset_meta,
+ int64_t expiration_time = INT64_MAX, bool is_empty
= false) {
+ std::lock_guard<std::mutex> lock(_tablet->_visible_pending_rs_lock);
+ _tablet->_visible_pending_rs_map.emplace(
+ version, CloudTablet::VisiblePendingRowset
{std::move(rowset_meta), expiration_time,
+ is_empty});
+ }
+
+ size_t pending_rs_count() const {
+ std::lock_guard<std::mutex> lock(_tablet->_visible_pending_rs_lock);
+ return _tablet->_visible_pending_rs_map.size();
+ }
+
+protected:
+ TabletMetaSharedPtr _tablet_meta;
+ std::shared_ptr<CloudTablet> _tablet;
+ CloudStorageEngine _engine;
+};
+
+// Test apply with no pending rowsets does nothing
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyNoPendingRowsets) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 1);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+}
+
+// Test apply single consecutive non-empty rowset
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplySingleConsecutiveRowset) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ add_pending_rowset(2, create_pending_rowset_meta(2));
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 2);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+}
+
+// Test apply multiple consecutive non-empty rowsets
+TEST_F(CloudTabletApplyVisiblePendingTest,
TestApplyMultipleConsecutiveRowsets) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ for (int64_t v = 2; v <= 4; ++v) {
+ add_pending_rowset(v, create_pending_rowset_meta(v));
+ }
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 4);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 4);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+ EXPECT_TRUE(rowset_map.contains(Version(4, 4)));
+}
+
+// Test apply with version gap - nothing should be applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyWithVersionGap) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // Add version 3 only, skip version 2
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 1);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_FALSE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test apply with partial consecutive versions - only consecutive prefix
applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyPartialConsecutive) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // Add versions 2, 3, 5 (version 4 missing)
+ add_pending_rowset(2, create_pending_rowset_meta(2));
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+ add_pending_rowset(5, create_pending_rowset_meta(5));
+
+ _tablet->apply_visible_pending_rowsets();
+
+ // Only versions 2 and 3 should be applied
+ EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 3);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+ EXPECT_FALSE(rowset_map.contains(Version(5, 5)));
+}
+
+// Test apply with pending versions below max_version - nothing applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyPendingBelowMaxVersion) {
+ auto rs1 = create_rowset(Version(0, 1));
+ auto rs2 = create_rowset(Version(2, 5));
+ ASSERT_NE(rs1, nullptr);
+ ASSERT_NE(rs2, nullptr);
+ add_initial_rowsets({rs1, rs2});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 5);
+
+ // Add pending versions 3 and 4, both below max_version
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+ add_pending_rowset(4, create_pending_rowset_meta(4));
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 5);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 2);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 5)));
+ EXPECT_FALSE(rowset_map.contains(Version(3, 3)));
+ EXPECT_FALSE(rowset_map.contains(Version(4, 4)));
+}
+
+// Test apply with initial max_version = -1 (no initial rowsets)
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyWithNoInitialRowsets) {
+ EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+
+ add_pending_rowset(0, create_pending_rowset_meta(0));
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 0);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 1);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 0)));
+}
+
+// Test apply called multiple times incrementally
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyMultipleCalls) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // First apply: version 2
+ add_pending_rowset(2, create_pending_rowset_meta(2));
+ _tablet->apply_visible_pending_rowsets();
+ EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+ EXPECT_TRUE(_tablet->rowset_map().contains(Version(2, 2)));
+
+ // Second apply: version 3
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+ _tablet->apply_visible_pending_rowsets();
+ EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 3);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test gap resolved by later apply call
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyGapResolvedLater) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // Add version 3 first (gap at version 2)
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+ _tablet->apply_visible_pending_rowsets();
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1); // Nothing applied
+ EXPECT_FALSE(_tablet->rowset_map().contains(Version(3, 3)));
+
+ // Now add version 2 to fill the gap
+ add_pending_rowset(2, create_pending_rowset_meta(2));
+ _tablet->apply_visible_pending_rowsets();
+
+ // Both versions 2 and 3 should now be applied
+ EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 3);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test clear_unused_visible_pending_rowsets removes applied entries
+TEST_F(CloudTabletApplyVisiblePendingTest, TestClearAfterApply) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+
+ add_pending_rowset(2, create_pending_rowset_meta(2));
+ add_pending_rowset(3, create_pending_rowset_meta(3));
+ // Version 5 has a gap, won't be applied
+ add_pending_rowset(5, create_pending_rowset_meta(5));
+ EXPECT_EQ(pending_rs_count(), 3);
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 3);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+ EXPECT_FALSE(rowset_map.contains(Version(5, 5)));
+ // Versions 2 and 3 are cleared (applied, version <= max_version)
+ // Version 5 remains (not applied, not expired)
+ EXPECT_EQ(pending_rs_count(), 1);
+}
+
+// Test empty rowset with no existing versions breaks early
+TEST_F(CloudTabletApplyVisiblePendingTest,
TestApplyEmptyRowsetNoExistingVersions) {
+ EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+
+ // Add empty pending rowset at version 0
+ add_pending_rowset(0, nullptr, INT64_MAX, true);
+
+ _tablet->apply_visible_pending_rowsets();
+
+ // Cannot create empty rowset without a previous rowset as template
+ EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+ EXPECT_EQ(_tablet->rowset_map().size(), 0);
+}
+
+// Test empty rowset with existing version uses create_empty_rowset_for_hole
+TEST_F(CloudTabletApplyVisiblePendingTest,
TestApplyEmptyRowsetWithExistingVersion) {
+ auto rs = create_rowset_with_schema(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // Add empty pending rowset at version 2
+ add_pending_rowset(2, nullptr, INT64_MAX, true);
+
+ _tablet->apply_visible_pending_rowsets();
+
+ EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 2);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+}
+
+// Test mixed non-empty followed by empty rowset
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyNonEmptyThenEmptyRowset) {
+ auto rs = create_rowset_with_schema(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ add_initial_rowsets({rs});
+ EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+ // Version 2: non-empty (with schema for empty rowset template), Version
3: empty
+ auto pending_meta = create_pending_rowset_meta(2);
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+ auto* col = schema_pb.add_column();
+ col->set_unique_id(0);
+ col->set_name("k1");
+ col->set_type("INT");
+ col->set_is_key(true);
+ col->set_is_nullable(false);
+ pending_meta->set_tablet_schema(schema_pb);
+ add_pending_rowset(2, std::move(pending_meta));
+ add_pending_rowset(3, nullptr, INT64_MAX, true);
+
+ _tablet->apply_visible_pending_rowsets();
+
+ // Both should be applied; empty rowset uses to_add.back() as prev_rowset
+ EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+ auto& rowset_map = _tablet->rowset_map();
+ EXPECT_EQ(rowset_map.size(), 3);
+ EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+ EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+ EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
} // namespace doris
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 15135eb0f60..ed5539d9388 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3346,6 +3346,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static long mow_get_ms_lock_retry_backoff_interval = 80;
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean enable_notify_be_after_load_txn_commit = false;
+
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 4fe2e9cc61a..043128d02fd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -101,6 +101,7 @@ import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CalcDeleteBitmapTask;
+import org.apache.doris.task.MakeCloudTmpRsVisibleTask;
import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
@@ -494,11 +495,20 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
/**
* Post process of commitTxn
- * 1. update some stats
- * 2. produce event for further processes like async MV
+ * 1. notify BEs to make temporary rowsets visible
+ * 2. update some stats
+ * 3. produce event for further processes like async MV
* @param commitTxnResponse commit txn call response from meta-service
+ * @param tabletCommitInfos tablet commit infos containing backend and
tablet mapping
*/
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos) {
+ // ========================================
+ // notify BEs to make temporary rowsets visible
+ // ========================================
+ if (tabletCommitInfos != null) {
+ notifyBesMakeTmpRsVisible(commitTxnResponse, tabletCommitInfos);
+ }
+
// ========================================
// update some table stats
// ========================================
@@ -723,11 +733,12 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
- executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment);
+ executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment, tabletCommitInfos);
}
private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest,
long transactionId, boolean is2PC,
- TxnCommitAttachment txnCommitAttachment) throws UserException {
+ TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo>
tabletCommitInfos)
+ throws UserException {
if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
LOG.info("debug point FE.mow.commit.exception, throw e");
throw new UserException(InternalErrorCode.INTERNAL_ERR, "debug
point FE.mow.commit.exception");
@@ -750,7 +761,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
- txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
+ txnState = commitTxn(commitTxnRequest, transactionId, is2PC,
tabletCommitInfos);
txnOperated = true;
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
{
throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -789,8 +800,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
- private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC)
- throws UserException {
+ private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC,
+ List<TabletCommitInfo> tabletCommitInfos) throws UserException {
checkCommitInfo(commitTxnRequest);
CommitTxnResponse commitTxnResponse = null;
@@ -850,7 +861,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime()
- txnState.getPrepareTime());
}
- afterCommitTxnResp(commitTxnResponse);
+ afterCommitTxnResp(commitTxnResponse, tabletCommitInfos);
return txnState;
}
@@ -1639,7 +1650,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
- executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
+ executeCommitTxnRequest(commitTxnRequest, transactionId, false, null,
null);
}
private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
@@ -2705,4 +2716,111 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
private void clearTxnLastSignature(long dbId, long txnId) {
txnLastSignatureMap.remove(txnId);
}
+
+ /**
+ * Notify BEs to make temporary cloud rowsets visible after transaction
commit.
+ * This method is called in afterCommitTxnResp to notify BEs to promote
+ * the temporary rowsets from CloudCommittedRSMgr to tablet meta.
+ *
+ * @param commitTxnResponse commit txn response from meta-service
+ * @param tabletCommitInfos tablet commit infos containing backend and
tablet mapping
+ */
+ private void notifyBesMakeTmpRsVisible(CommitTxnResponse commitTxnResponse,
+ List<TabletCommitInfo>
tabletCommitInfos) {
+ if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()
+ || !Config.enable_notify_be_after_load_txn_commit) {
+ return;
+ }
+ long txnId = commitTxnResponse.getTxnInfo().getTxnId();
+ if (DebugPointUtil.isEnable("notifyBesMakeTmpRsVisible.skip")) {
+ LOG.info("skip sendMakeCloudTmpRsVisibleTasks, txn_id: {}", txnId);
+ return;
+ }
+
+ try {
+ // Convert TabletCommitInfo to TTabletCommitInfo
+ List<TTabletCommitInfo> tTabletCommitInfos = Lists.newArrayList();
+ for (TabletCommitInfo commitInfo : tabletCommitInfos) {
+ TTabletCommitInfo tCommitInfo = new TTabletCommitInfo();
+ tCommitInfo.setTabletId(commitInfo.getTabletId());
+ tCommitInfo.setBackendId(commitInfo.getBackendId());
+ tTabletCommitInfos.add(tCommitInfo);
+ }
+
+ // Build partition version map from commit response
+ Map<Long, Long> partitionVersionMap = Maps.newHashMap();
+ int totalPartitionNum =
commitTxnResponse.getPartitionIdsList().size();
+ for (int idx = 0; idx < totalPartitionNum; ++idx) {
+ long partitionId = commitTxnResponse.getPartitionIds(idx);
+ long version = commitTxnResponse.getVersions(idx);
+ partitionVersionMap.put(partitionId, version);
+ }
+
+ long updateVersionVisibleTime =
commitTxnResponse.getVersionUpdateTimeMs();
+
+ // Send tasks to notify BEs
+ sendMakeCloudTmpRsVisibleTasks(txnId, tTabletCommitInfos,
+ partitionVersionMap, updateVersionVisibleTime);
+ } catch (Throwable t) {
+ // According to normal logic, no exceptions will be thrown,
+ // but in order to avoid bugs affecting the original logic, all
exceptions are caught
+ LOG.warn("notifyBesMakeTmpRsVisible failed, txn_id: {}",
+ commitTxnResponse.getTxnInfo().getTxnId(), t);
+ }
+ }
+
+ /**
+ * Send agent tasks to notify BEs to make temporary cloud committed
rowsets visible.
+ * This is called after transaction commit to MS, to notify BEs to promote
+ * rowset meta from CloudCommittedRSMgr to tablet meta.
+ *
+ * just send notify rpc with best effort, no need to retry or guarantee
all BEs receive the rpc.
+ * @param txnId transaction id
+ * @param commitInfos tablet commit infos containing backend and tablet
mapping
+ * @param partitionVersionMap partition id to version mapping
+ * @param updateVersionVisibleTime visible time for the version
+ */
+ public void sendMakeCloudTmpRsVisibleTasks(long txnId,
+ List<TTabletCommitInfo>
commitInfos,
+ Map<Long, Long>
partitionVersionMap,
+ long updateVersionVisibleTime) {
+ if (commitInfos == null || commitInfos.isEmpty()) {
+ LOG.info("no commit infos to send make cloud tmp rs visible tasks,
txn_id: {}", txnId);
+ return;
+ }
+
+ // Group tablet_ids by backend_id
+ Map<Long, List<Long>> beToTabletIds = Maps.newHashMap();
+ for (TTabletCommitInfo commitInfo : commitInfos) {
+ long backendId = commitInfo.getBackendId();
+ long tabletId = commitInfo.getTabletId();
+ beToTabletIds.computeIfAbsent(backendId, k ->
Lists.newArrayList()).add(tabletId);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send make cloud tmp rs visible tasks, txn_id: {},
backend_count: {}, total_tablets: {}",
+ txnId, beToTabletIds.size(), commitInfos.size());
+ }
+
+ // Create agent tasks for each BE
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+ long backendId = entry.getKey();
+ List<Long> tabletIds = entry.getValue();
+
+ MakeCloudTmpRsVisibleTask task = new MakeCloudTmpRsVisibleTask(
+ backendId, txnId, tabletIds, partitionVersionMap,
updateVersionVisibleTime);
+ batchTask.addTask(task);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add make cloud tmp rs visible task, txn_id: {},
backend_id: {}, tablet_count: {}",
+ txnId, backendId, tabletIds.size());
+ }
+ }
+
+ // Submit tasks
+ AgentTaskExecutor.submit(batchTask);
+ LOG.info("sent make cloud tmp rs visible tasks, txn_id: {},
backend_count: {}, total_tablets: {}",
+ txnId, beToTabletIds.size(), commitInfos.size());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c537c42c4c7..56040809b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -5098,7 +5098,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return new TStatus(TStatusCode.INVALID_ARGUMENT);
}
CommitTxnResponse commitTxnResponse =
CommitTxnResponse.parseFrom(receivedProtobufBytes);
-
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse);
+
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse,
null);
} catch (InvalidProtocolBufferException e) {
// Handle the exception, log it, or take appropriate action
e.printStackTrace();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 6ae8c9014bb..6b0928eb49d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -41,6 +41,7 @@ import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TDownloadReq;
import org.apache.doris.thrift.TDropTabletReq;
import org.apache.doris.thrift.TGcBinlogReq;
+import org.apache.doris.thrift.TMakeCloudTmpRsVisibleRequest;
import org.apache.doris.thrift.TMoveDirReq;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPublishVersionRequest;
@@ -499,6 +500,15 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setCleanUdfCacheReq(request);
return tAgentTaskRequest;
}
+ case MAKE_CLOUD_COMMITTED_RS_VISIBLE: {
+ MakeCloudTmpRsVisibleTask makeCloudTmpRsVisibleTask =
(MakeCloudTmpRsVisibleTask) task;
+ TMakeCloudTmpRsVisibleRequest request =
makeCloudTmpRsVisibleTask.toThrift();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request.toString());
+ }
+ tAgentTaskRequest.setMakeCloudTmpRsVisibleReq(request);
+ return tAgentTaskRequest;
+ }
default:
if (LOG.isDebugEnabled()) {
LOG.debug("could not find task type for task [{}]", task);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java
new file mode 100644
index 00000000000..62626c90ba5
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TMakeCloudTmpRsVisibleRequest;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to notify BE to make committed cloud rowsets visible.
+ * After FE commits a transaction to MS, this task notifies BE to promote
+ * the committed rowsets in BE's memory to tablet meta.
+ */
+public class MakeCloudTmpRsVisibleTask extends AgentTask {
+ private final long txnId;
+ private final List<Long> tabletIds; // tablets on this BE involved in the
transaction
+ private final Map<Long, Long> partitionVersionMap; // partition_id ->
version
+ private final long updateVersionVisibleTime;
+
+ public MakeCloudTmpRsVisibleTask(long backendId, long txnId,
+ List<Long> tabletIds,
+ Map<Long, Long> partitionVersionMap,
+ long updateVersionVisibleTime) {
+ super(null, backendId, TTaskType.MAKE_CLOUD_COMMITTED_RS_VISIBLE,
+ -1L, -1L, -1L, -1L, -1L, txnId, System.currentTimeMillis());
+ this.txnId = txnId;
+ this.tabletIds = tabletIds;
+ this.partitionVersionMap = partitionVersionMap;
+ this.updateVersionVisibleTime = updateVersionVisibleTime;
+ }
+
+ public long getTxnId() {
+ return txnId;
+ }
+
+ public List<Long> getTabletIds() {
+ return tabletIds;
+ }
+
+ public Map<Long, Long> getPartitionVersionMap() {
+ return partitionVersionMap;
+ }
+
+ public long getUpdateVersionVisibleTime() {
+ return updateVersionVisibleTime;
+ }
+
+ public TMakeCloudTmpRsVisibleRequest toThrift() {
+ TMakeCloudTmpRsVisibleRequest request = new
TMakeCloudTmpRsVisibleRequest();
+ request.setTxnId(txnId);
+ request.setTabletIds(tabletIds);
+ request.setPartitionVersionMap(partitionVersionMap);
+ request.setVersionUpdateTimeMs(updateVersionVisibleTime);
+ return request;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 614448891a2..b5e30e9893d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -230,7 +230,7 @@ public class GlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
@Override
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos) {
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index d56193fe683..b611ff4e588 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -210,7 +210,7 @@ public interface GlobalTransactionMgrIface extends Writable
{
public void
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation)
throws Exception;
- public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse);
+ public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse,
List<TabletCommitInfo> tabletCommitInfos);
public void addSubTransaction(long dbId, long transactionId, long
subTransactionId);
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index 51bd59e7ae7..c8c65513dc2 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -568,6 +568,14 @@ struct TPushCooldownConfReq {
1: required list<TCooldownConf> cooldown_confs
}
+// Request to make temporary cloud rowsets visible
+struct TMakeCloudTmpRsVisibleRequest {
+ 1: required i64 txn_id
+ 2: required list<Types.TTabletId> tablet_ids // tablets on this BE
involved in the transaction
+ 3: required map<Types.TPartitionId, Types.TVersion> partition_version_map
+ 4: optional i64 version_update_time_ms
+}
+
struct TAgentTaskRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TTaskType task_type
@@ -610,6 +618,7 @@ struct TAgentTaskRequest {
// For cloud
1000: optional TCalcDeleteBitmapRequest calc_delete_bitmap_req
+ 1001: optional TMakeCloudTmpRsVisibleRequest make_cloud_tmp_rs_visible_req
}
struct TAgentResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 87be26403d6..c58148197aa 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -249,7 +249,8 @@ enum TTaskType {
PUSH_INDEX_POLICY = 35,
// CLOUD
- CALCULATE_DELETE_BITMAP = 1000
+ CALCULATE_DELETE_BITMAP = 1000,
+ MAKE_CLOUD_COMMITTED_RS_VISIBLE = 1001
}
// level of verboseness for "explain" output
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..086ffb310ef
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1_1 --
+1 1 1
+
+-- !1_2 --
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+
+-- !2_1 --
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..7274597e33c
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
@@ -0,0 +1,36 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1_1 --
+1 1 1
+2 1 1
+3 1 1
+
+-- !1_2 --
+1 1 1
+1 10 10
+1 20 20
+2 1 1
+2 20 20
+3 1 1
+3 30 30
+4 10 10
+5 20 20
+5 30 30
+6 30 30
+
+-- !2_1 --
+1 1 1
+1 10 10
+1 20 20
+2 1 1
+2 20 20
+3 1 1
+3 30 30
+4 10 10
+5 20 20
+5 30 30
+6 30 30
+100 100 100
+100 100 100
+100 100 100
+100 100 100
+
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..aae048f27ee
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
@@ -0,0 +1,38 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1_1 --
+1 1 1
+
+-- !1_2 --
+1 1 1
+1 1 1
+2 2 2
+3 3 3
+3 3 3
+
+-- !2_1 --
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+1 1 1
+2 2 2
+2 2 2
+2 2 2
+3 3 3
+3 3 3
+3 3 3
+3 3 3
+3 3 3
+3 3 3
+3 3 3
+3 3 3
+
+-- !1_1 --
+1 1 1
+
+-- !1_2 --
+1 1 1
+2 2 2
+3 3 3
+
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..fe055936213
--- /dev/null
+++
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1 --
+1 1 1
+2 1 1
+3 1 1
+
+-- !2 --
+1 20 20
+2 20 20
+3 30 30
+4 10 10
+5 30 30
+6 30 30
+
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index a5da49e5604..01753481dc3 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -64,3 +64,6 @@ enable_python_udf_support=true
python_env_mode=conda
python_conda_root_path=/opt/miniconda3
max_python_process_num=64
+
+enable_cloud_make_rs_visible_on_be=true
+cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
index 704116ce6aa..601a7015223 100644
--- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
@@ -46,4 +46,6 @@ workload_group_max_num = 25
enable_advance_next_id = true
check_table_lock_leaky = true
-enable_outfile_to_local=true
\ No newline at end of file
+enable_outfile_to_local=true
+
+enable_notify_be_after_load_txn_commit=true
\ No newline at end of file
diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf
b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
index f8bdabcb15f..0b9d27e98a7 100644
--- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
@@ -46,3 +46,6 @@ enable_python_udf_support=true
python_env_mode=conda
python_conda_root_path=/opt/miniconda3
max_python_process_num=64
+
+enable_cloud_make_rs_visible_on_be=true
+cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
b/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
index b91a4ed6d38..157f3a07a09 100644
--- a/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
@@ -37,3 +37,5 @@ enable_advance_next_id = true
arrow_flight_sql_port = 8081
enable_job_schedule_second_for_test = true
+
+enable_notify_be_after_load_txn_commit=true
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..ba016c7f05a
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_dup_forward_notify_be_after_txn_commit", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def getTabletAndBackend = { def tableName ->
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from
${tableName};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+ return [tabletId, tabletBackend]
+ }
+
+ def customFeConfig = [
+ enable_notify_be_after_load_txn_commit: true
+ ]
+ def customBeConfig = [
+ enable_cloud_make_rs_visible_on_be : true,
+ cloud_mow_sync_rowsets_when_load_txn_begin : false,
+ enable_stream_load_commit_txn_on_be : true // commit txn to MS
directly on BE
+ ]
+
+ def getTabletRowsets = {def tableName ->
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+ assert tablets.size() == 1
+ String compactionUrl = tablets[0]["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ return tabletJson.rowsets
+ }
+
+ def executeStreamLoad = { def tableName ->
+ String data = """1,1,1"""
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ inputStream new ByteArrayInputStream(data.getBytes())
+ time 10000
+ }
+ }
+
+ setFeConfigTemporary(customFeConfig) {
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ def table1 =
"test_cloud_dup_forward_notify_be_after_txn_commit"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )duplicate KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+
+ executeStreamLoad(table1) // ver=2
+ qt_1_1 "select * from ${table1} order by k1;"
+
+
GetDebugPoint().enableDebugPointForAllFEs("sendMakeCloudTmpRsVisibleTasks.skip");
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["tablet_id": tabletId])
+
+ // 1. test that after turn on the notify feature, rowsets will
be visible on BE without sync_rowsets
+ executeStreamLoad(table1) // ver=3
+ executeStreamLoad(table1) // ver=4
+ executeStreamLoad(table1) // ver=5
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 5
+ qt_1_2 "select * from ${table1} order by k1;"
+ assert getTabletRowsets(table1).size() == 5
+
+ // 2. test the notify rpc arrived not in order
+ // block the notify rpc for version 8
+
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
["tablet_id": tabletId, "version": 8])
+ executeStreamLoad(table1) // ver=6
+ executeStreamLoad(table1) //ver=7
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 7
+ executeStreamLoad(table1) // ver=8
+ executeStreamLoad(table1) // ver=9
+ // due the miss of rowset of version 8, version 8 and version
9 will not be added to BE's tablet meta
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 7
+
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 9
+ qt_2_1 "select * from ${table1} order by k1;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..ad3ad1a0766
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_dup_notify_be_after_txn_commit", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def getTabletAndBackend = { def tableName ->
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from
${tableName};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+ return [tabletId, tabletBackend]
+ }
+
+ def feConfig1 = [
+ enable_notify_be_after_load_txn_commit: false
+ ]
+ def beConfig1 = [
+ enable_cloud_make_rs_visible_on_be : false,
+ cloud_mow_sync_rowsets_when_load_txn_begin : false
+ ]
+ // 0. test the injected error in sync_rowsets will cause the query failure
+ setFeConfigTemporary(feConfig1) {
+ setBeConfigTemporary(beConfig1) {
+ try {
+ def table1 = "test_cloud_dup_notify_be_after_txn_commit_error"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )duplicate KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+ sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);"
+
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["tablet_id": tabletId])
+ // ensure that the injected error will cause the query failure
+ test {
+ sql "select * from ${table1} order by k1;"
+ exception "[sync_tablet_rowsets_unlocked] injected error
for testing"
+ }
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+
+ def customFeConfig = [
+ enable_notify_be_after_load_txn_commit: true
+ ]
+ def customBeConfig = [
+ enable_cloud_make_rs_visible_on_be : true,
+ cloud_mow_sync_rowsets_when_load_txn_begin : false
+ ]
+
+ def getTabletRowsets = {def tableName ->
+ def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
+ assert tablets.size() == 1
+ String compactionUrl = tablets[0]["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ return tabletJson.rowsets
+ }
+
+ setFeConfigTemporary(customFeConfig) {
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ def table1 = "test_cloud_dup_notify_be_after_txn_commit"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )duplicate KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+
+ sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);" //
ver=2
+ qt_1_1 "select * from ${table1} order by k1;"
+
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["tablet_id": tabletId])
+
+ // 1. test that after turn on the notify feature, rowsets will
be visible on BE without sync_rowsets
+ sql "insert into ${table1} values(1,10,10),(4,10,10);" // ver=3
+ sql "insert into ${table1}
values(2,20,20),(5,20,20),(1,20,20);" // ver=4
+ sql "insert into ${table1}
values(3,30,30),(6,30,30),(5,30,30);" // ver=5
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 5
+ qt_1_2 "select * from ${table1} order by k1;"
+ assert getTabletRowsets(table1).size() == 5
+
+ // 2. test the notify rpc arrived not in order
+ // block the notify rpc for version 8
+
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
["tablet_id": tabletId, "version": 8])
+ sql "insert into ${table1} values(100,100,100);" // ver=6
+ sql "insert into ${table1} values(100,100,100);" //ver=7
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 7
+ sql "insert into ${table1} values(100,100,100);" // ver=8
+ sql "insert into ${table1} values(100,100,100);" // ver=9
+ // due the miss of rowset of version 8, version 8 and version
9 will not be added to BE's tablet meta
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 7
+
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+ sleep(500)
+ assert getTabletRowsets(table1).size() == 9
+ qt_2_1 "select * from ${table1} order by k1;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..754c2affd82
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_empty_rs_notify_be_after_txn_commit", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def getTabletAndBackend = { def tableName, int index ->
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from
${tableName};")
+ assert tabletStats.size() > index
+ def tabletId = tabletStats[index].TabletId
+ def tabletBackendId = tabletStats[index].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId},index=${index} on backend
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
+ return [tabletId, tabletBackend]
+ }
+
+ def getTableId = {def tabletId ->
+ def info = sql_return_maparray """ show tablet ${tabletId}; """
+ assert info.size() == 1
+ return info[0]["TableId"]
+ }
+
+ def customFeConfig = [
+ enable_notify_be_after_load_txn_commit: true
+ ]
+ def customBeConfig = [
+ enable_cloud_make_rs_visible_on_be : true,
+ cloud_mow_sync_rowsets_when_load_txn_begin : false,
+ skip_writing_empty_rowset_metadata : true // empty rowset opt
+ ]
+
+ def getTabletRowsets = {def tabletId ->
+ def info = sql_return_maparray """ show tablet ${tabletId}; """
+ assert info.size() == 1
+ def detail = sql_return_maparray """${info[0]["DetailCmd"]}"""
+ assert detail instanceof List
+ assert detail.size() == 1
+ def compactionUrl = detail[0]["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ return tabletJson.rowsets
+ }
+
+ // duplicate table
+ setFeConfigTemporary(customFeConfig) {
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ def table1 =
"test_cloud_dup_empty_rs_notify_be_after_txn_commit"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )duplicate KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 2
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ def (tablet1Id, tablet1Backend) = getTabletAndBackend(table1,
0)
+ def (tablet2Id, tablet2Backend) = getTabletAndBackend(table1,
1)
+
+ def tableId = getTableId(tablet1Id)
+
+ sql "insert into ${table1} values(1,1,1);" // ver=2
+ qt_1_1 "select * from ${table1} order by k1;"
+
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["table_id": tableId])
+
+ // 1. test that after turn on the notify feature, rowsets will
be visible on BE without sync_rowsets
+ sql "insert into ${table1} values(1,1,1);" // ver=3
+ sql "insert into ${table1} values(2,2,2);" // ver=4
+ sql "insert into ${table1} values(3,3,3);" // ver=5
+ sql "insert into ${table1} values(3,3,3);" // ver=6
+ sleep(500)
+ assert getTabletRowsets(tablet1Id).size() == 6
+ assert getTabletRowsets(tablet2Id).size() == 6
+ qt_1_2 "select * from ${table1} order by k1;"
+ assert getTabletRowsets(tablet1Id).size() == 6
+ assert getTabletRowsets(tablet2Id).size() == 6
+
+
+ // 2. test the notify rpc arrived not in order
+
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
["table_id": tableId, "version": 7])
+ sql "insert into ${table1} values(1,1,1);"
+ sql "insert into ${table1} values(1,1,1);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "insert into ${table1} values(1,1,1);"
+ sql "insert into ${table1} values(1,1,1);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "insert into ${table1} values(2,2,2);"
+ sql "insert into ${table1} values(2,2,2);"
+ sql "insert into ${table1} values(3,3,3);"
+ sql "insert into ${table1} values(3,3,3);"
+ sleep(500)
+ assert getTabletRowsets(tablet1Id).size() == 6
+ assert getTabletRowsets(tablet2Id).size() == 6
+
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+ sleep(500)
+ assert getTabletRowsets(tablet1Id).size() == 18
+ assert getTabletRowsets(tablet2Id).size() == 18
+ qt_2_1 "select * from ${table1} order by k1;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+
+
+ // mow table
+ setFeConfigTemporary(customFeConfig) {
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ def table1 =
"test_cloud_mow_empty_rs_notify_be_after_txn_commit"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )unique KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 2
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_num" = "1"); """
+ def (tablet1Id, tablet1Backend) = getTabletAndBackend(table1,
0)
+ def (tablet2Id, tablet2Backend) = getTabletAndBackend(table1,
1)
+
+ def tableId = getTableId(tablet1Id)
+
+ sql "insert into ${table1} values(1,1,1);" // ver=2
+ qt_1_1 "select * from ${table1} order by k1;"
+
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["table_id": tableId])
+
+ // 1. test that after turn on the notify feature, rowsets will
be visible on BE without sync_rowsets
+ sql "insert into ${table1} values(1,1,1);" // ver=3
+ sql "insert into ${table1} values(2,2,2);" // ver=4
+ sql "insert into ${table1} values(3,3,3);" // ver=5
+ sql "insert into ${table1} values(3,3,3);" // ver=6
+ sleep(500)
+ assert getTabletRowsets(tablet1Id).size() == 6
+ assert getTabletRowsets(tablet2Id).size() == 6
+ qt_1_2 "select * from ${table1} order by k1;"
+ assert getTabletRowsets(tablet1Id).size() == 6
+ assert getTabletRowsets(tablet2Id).size() == 6
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+}
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..3b3754d330b
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_notify_be_after_txn_commit", "nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def customFeConfig = [
+ enable_notify_be_after_load_txn_commit: true
+ ]
+ def customBeConfig = [
+ enable_cloud_make_rs_visible_on_be : true,
+ cloud_mow_sync_rowsets_when_load_txn_begin : false
+ ]
+
+ setFeConfigTemporary(customFeConfig) {
+ setBeConfigTemporary(customBeConfig) {
+ try {
+ def table1 = "test_cloud_mow_notify_be_after_txn_commit"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from
${table1};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
+
+ sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);"
+ qt_1 "select * from ${table1} order by k1;"
+
+ // inject error to ordinary sync_rowsets calls
+
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
["tablet_id": tabletId])
+
+ sql "insert into ${table1} values(1,10,10),(4,10,10);"
+ sql "insert into ${table1}
values(2,20,20),(5,20,20),(1,20,20);"
+ sql "insert into ${table1}
values(3,30,30),(6,30,30),(5,30,30);"
+
+ qt_2 "select * from ${table1} order by k1;"
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]