This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/read-uncommitted-phase1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 473aa6a8a69906fa31d673f0640a09a1cdeb1b16 Author: Yongqiang YANG <[email protected]> AuthorDate: Sat Feb 7 16:51:57 2026 -0800 [feature](read-uncommitted) Add READ UNCOMMITTED isolation level for unpublished rowsets (Phase 1) Enable queries to read uncommitted but committed-to-storage rowsets via SET read_uncommitted = true. Supports DUP_KEYS and UNIQUE_KEYS tables in both local and cloud modes. Includes async cross-uncommitted dedup for MoW tables with compaction and schema change awareness. --- be/src/cloud/cloud_rowset_builder.cpp | 17 ++ be/src/cloud/cloud_storage_engine.cpp | 5 + be/src/cloud/cloud_storage_engine.h | 5 + be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 10 + be/src/olap/base_tablet.cpp | 9 + be/src/olap/compaction.cpp | 8 + be/src/olap/rowset_builder.cpp | 16 + be/src/olap/storage_engine.cpp | 5 + be/src/olap/storage_engine.h | 5 + be/src/olap/txn_manager.cpp | 13 + be/src/olap/uncommitted_rowset_registry.cpp | 333 +++++++++++++++++++++ be/src/olap/uncommitted_rowset_registry.h | 121 ++++++++ be/src/pipeline/exec/olap_scan_operator.cpp | 65 ++++ be/src/runtime/runtime_state.h | 4 + .../java/org/apache/doris/qe/SessionVariable.java | 17 +- gensrc/thrift/PaloInternalService.thrift | 2 + 16 files changed, 633 insertions(+), 2 deletions(-) diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 8ef15424a3d..b9c528af88e 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -19,6 +19,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" +#include "olap/uncommitted_rowset_registry.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" #include "olap/storage_policy.h" @@ -157,6 +158,22 @@ Status CloudRowsetBuilder::set_txn_related_delete_bitmap() { _req.txn_id, _tablet->tablet_id(), _delete_bitmap, *_rowset_ids, _rowset, _req.txn_expiration, _partial_update_info); } + + // Register uncommitted rowset for READ UNCOMMITTED visibility + if (auto* registry = _engine.uncommitted_rowset_registry()) { + auto entry = std::make_shared<UncommittedRowsetEntry>(); + entry->rowset = _rowset; + entry->transaction_id = _req.txn_id; + entry->partition_id = _req.partition_id; + entry->tablet_id = _tablet->tablet_id(); + entry->unique_key_merge_on_write = _tablet->enable_unique_key_merge_on_write(); + entry->creation_time = _rowset->creation_time(); + if (entry->unique_key_merge_on_write && _delete_bitmap) { + entry->committed_delete_bitmap = std::make_shared<DeleteBitmap>(*_delete_bitmap); + } + registry->register_rowset(std::move(entry)); + } + return Status::OK(); } #include "common/compile_check_end.h" diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 126e9701a79..177c56e2f31 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -58,6 +58,7 @@ #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/memtable_flush_executor.h" #include "olap/storage_policy.h" +#include "olap/uncommitted_rowset_registry.h" #include "runtime/memory/cache_manager.h" #include "util/parse_util.h" #include "util/time.h" @@ -231,6 +232,10 @@ Status CloudStorageEngine::open() { _cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this); + _uncommitted_rowset_registry = std::make_unique<UncommittedRowsetRegistry>(); + RETURN_IF_ERROR(_uncommitted_rowset_registry->init( + std::max(1, config::calc_delete_bitmap_max_thread / 2))); + RETURN_NOT_OK_STATUS_WITH_WARN( init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path), "init StreamLoadRecorder failed"); diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 9c27e164bba..3a435e10d90 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -51,6 +51,7 @@ class CloudWarmUpManager; class CloudCompactionStopToken; class CloudSnapshotMgr; class CloudIndexChangeCompaction; +class UncommittedRowsetRegistry; class CloudStorageEngine final : public BaseStorageEngine { public: @@ -91,6 +92,9 @@ public: CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; } CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; } + UncommittedRowsetRegistry* uncommitted_rowset_registry() { + return _uncommitted_rowset_registry.get(); + } CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; } @@ -225,6 +229,7 @@ private: std::unique_ptr<ThreadPool> _sync_load_for_tablets_thread_pool; std::unique_ptr<ThreadPool> _warmup_cache_async_thread_pool; std::unique_ptr<CloudSnapshotMgr> _cloud_snapshot_mgr; + std::unique_ptr<UncommittedRowsetRegistry> _uncommitted_rowset_registry; // FileSystem with latest shared storage info, new data will be written to this fs. mutable std::mutex _latest_fs_mtx; diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp index ab04f4eff16..de3ecc56b10 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp @@ -29,6 +29,7 @@ #include "olap/olap_common.h" #include "olap/tablet_meta.h" #include "olap/txn_manager.h" +#include "olap/uncommitted_rowset_registry.h" namespace doris { @@ -264,6 +265,10 @@ void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() { .tag("tablet_id", iter->second.tablet_id); _empty_rowset_markers.erase(marker_iter); } + // Unregister from UncommittedRowsetRegistry on expiration + if (auto* registry = get_uncommitted_rowset_registry()) { + registry->unregister_rowset(iter->second.tablet_id, iter->second.txn_id); + } _expiration_txn.erase(iter); } } @@ -283,6 +288,11 @@ void CloudTxnDeleteBitmapCache::remove_unused_tablet_txn_info(TTransactionId tra erase(cache_key); _txn_map.erase(txn_key); } + + // Unregister from UncommittedRowsetRegistry + if (auto* registry = get_uncommitted_rowset_registry()) { + registry->unregister_rowset(tablet_id, transaction_id); + } } void CloudTxnDeleteBitmapCache::mark_empty_rowset(TTransactionId txn_id, int64_t tablet_id, diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 241e531462d..0a6e43506c4 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -48,6 +48,7 @@ #include "olap/rowset/segment_v2/column_reader.h" #include "olap/tablet_fwd.h" #include "olap/txn_manager.h" +#include "olap/uncommitted_rowset_registry.h" #include "service/point_query_executor.h" #include "util/bvar_helper.h" #include "util/debug_points.h" @@ -169,6 +170,14 @@ Status BaseTablet::set_tablet_state(TabletState state) { "could not change tablet state from shutdown to {}", state); } _tablet_meta->set_tablet_state(state); + + // Notify UncommittedRowsetRegistry when tablet leaves RUNNING state + if (state != TABLET_RUNNING) { + if (auto* registry = get_uncommitted_rowset_registry()) { + registry->on_tablet_state_change(tablet_id(), state); + } + } + return Status::OK(); } diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 4f3ed34939d..30e7ca86df2 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -69,6 +69,7 @@ #include "olap/rowset/segment_v2/inverted_index_desc.h" #include "olap/rowset/segment_v2/inverted_index_fs_directory.h" #include "olap/storage_engine.h" +#include "olap/uncommitted_rowset_registry.h" #include "olap/storage_policy.h" #include "olap/tablet.h" #include "olap/tablet_meta.h" @@ -1359,6 +1360,13 @@ Status CompactionMixin::modify_rowsets() { tablet()->merge_delete_bitmap(output_rowset_delete_bitmap); RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); } + + // Notify UncommittedRowsetRegistry that published rowsets changed. + // Cross-uncommitted delete bitmaps may reference rows that moved due to + // compaction, so they must be re-computed. + if (auto* registry = _engine.uncommitted_rowset_registry()) { + registry->on_compaction_completed(_tablet->tablet_id()); + } } else { std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index f250f3bef10..fffa8c49f7a 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -34,6 +34,7 @@ #include "io/fs/file_system.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "olap/calc_delete_bitmap_executor.h" +#include "olap/uncommitted_rowset_registry.h" #include "olap/olap_define.h" #include "olap/partial_update_info.h" #include "olap/rowset/beta_rowset.h" @@ -347,6 +348,21 @@ Status RowsetBuilder::commit_txn() { _delete_bitmap, *_rowset_ids, _partial_update_info); } + // Register uncommitted rowset for READ UNCOMMITTED visibility + if (auto* registry = _engine.uncommitted_rowset_registry()) { + auto entry = std::make_shared<UncommittedRowsetEntry>(); + entry->rowset = _rowset; + entry->transaction_id = _req.txn_id; + entry->partition_id = _req.partition_id; + entry->tablet_id = tablet()->tablet_id(); + entry->unique_key_merge_on_write = _tablet->enable_unique_key_merge_on_write(); + entry->creation_time = _rowset->creation_time(); + if (entry->unique_key_merge_on_write && _delete_bitmap) { + entry->committed_delete_bitmap = std::make_shared<DeleteBitmap>(*_delete_bitmap); + } + registry->register_rowset(std::move(entry)); + } + _is_committed = true; return Status::OK(); } diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 3a64a82d1ab..21462f44cc5 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -71,6 +71,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/txn_manager.h" +#include "olap/uncommitted_rowset_registry.h" #include "runtime/client_cache.h" #include "runtime/stream_load/stream_load_recorder.h" #include "util/doris_metrics.h" @@ -313,6 +314,10 @@ Status StorageEngine::_open() { ? config::calc_delete_bitmap_for_load_max_thread : std::max(1, CpuInfo::num_cores() / 2)); + _uncommitted_rowset_registry = std::make_unique<UncommittedRowsetRegistry>(); + RETURN_IF_ERROR(_uncommitted_rowset_registry->init( + std::max(1, config::calc_delete_bitmap_max_thread / 2))); + _parse_default_rowset_type(); return Status::OK(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 335f4a45f21..05d081d0b69 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -73,6 +73,7 @@ class ReportWorker; class CreateTabletRRIdxCache; struct DirInfo; class SnapshotManager; +class UncommittedRowsetRegistry; class WorkloadGroup; using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>; @@ -311,6 +312,9 @@ public: TabletManager* tablet_manager() { return _tablet_manager.get(); } TxnManager* txn_manager() { return _txn_manager.get(); } SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); } + UncommittedRowsetRegistry* uncommitted_rowset_registry() { + return _uncommitted_rowset_registry.get(); + } // Rowset garbage collection helpers bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id); PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; } @@ -533,6 +537,7 @@ private: std::unique_ptr<TabletManager> _tablet_manager; std::unique_ptr<TxnManager> _txn_manager; + std::unique_ptr<UncommittedRowsetRegistry> _uncommitted_rowset_registry; // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue. // Type of new loaded data diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index d760e891527..c0b20167e86 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -46,6 +46,7 @@ #include "olap/schema_change.h" #include "olap/segment_loader.h" #include "olap/storage_engine.h" +#include "olap/uncommitted_rowset_registry.h" #include "olap/tablet_manager.h" #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" @@ -614,6 +615,12 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, stats->lock_wait_time_us += MonotonicMicros() - t6; _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, wrlock); + + // Unregister from UncommittedRowsetRegistry on publish + if (auto* registry = _engine.uncommitted_rowset_registry()) { + registry->unregister_rowset(tablet_id, transaction_id); + } + VLOG_NOTICE << "publish txn successfully." << " partition_id: " << key.first << ", txn_id: " << key.second << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id() @@ -736,6 +743,12 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, g_tablet_txn_info_txn_partitions_count << -1; _clear_txn_partition_map_unlocked(transaction_id, partition_id); } + + // Unregister from UncommittedRowsetRegistry on delete + if (auto* registry = _engine.uncommitted_rowset_registry()) { + registry->unregister_rowset(tablet_id, transaction_id); + } + return st; } diff --git a/be/src/olap/uncommitted_rowset_registry.cpp b/be/src/olap/uncommitted_rowset_registry.cpp new file mode 100644 index 00000000000..b96aedd2af2 --- /dev/null +++ b/be/src/olap/uncommitted_rowset_registry.cpp @@ -0,0 +1,333 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/uncommitted_rowset_registry.h" + +#include <algorithm> + +#include "cloud/cloud_storage_engine.h" +#include "cloud/config.h" +#include "common/logging.h" +#include "olap/base_tablet.h" +#include "olap/calc_delete_bitmap_executor.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/storage_engine.h" +#include "runtime/exec_env.h" + +namespace doris { + +UncommittedRowsetRegistry::UncommittedRowsetRegistry() = default; + +UncommittedRowsetRegistry::~UncommittedRowsetRegistry() { + if (_dedup_thread_pool) { + _dedup_thread_pool->shutdown(); + } +} + +Status UncommittedRowsetRegistry::init(int dedup_threads) { + return ThreadPoolBuilder("UncommittedDedupPool") + .set_min_threads(1) + .set_max_threads(dedup_threads) + .build(&_dedup_thread_pool); +} + +std::shared_ptr<std::mutex> UncommittedRowsetRegistry::_get_tablet_dedup_mutex( + Shard& shard, int64_t tablet_id) { + auto it = shard.tablet_dedup_mutex.find(tablet_id); + if (it == shard.tablet_dedup_mutex.end()) { + auto mtx = std::make_shared<std::mutex>(); + shard.tablet_dedup_mutex[tablet_id] = mtx; + return mtx; + } + return it->second; +} + +void UncommittedRowsetRegistry::register_rowset(std::shared_ptr<UncommittedRowsetEntry> entry) { + int64_t tablet_id = entry->tablet_id; + auto& shard = _get_shard(tablet_id); + + { + std::lock_guard wlock(shard.lock); + auto& entries = shard.entries[tablet_id]; + entries.push_back(entry); + // Keep entries sorted by creation_time for correct dedup ordering + std::sort(entries.begin(), entries.end(), + [](const auto& a, const auto& b) { + return a->creation_time < b->creation_time; + }); + } + + // For DUP_KEYS tables, no dedup needed — mark ready immediately + if (!entry->unique_key_merge_on_write) { + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + + _submit_dedup_task(tablet_id, entry); +} + +void UncommittedRowsetRegistry::unregister_rowset(int64_t tablet_id, int64_t transaction_id) { + auto& shard = _get_shard(tablet_id); + std::lock_guard wlock(shard.lock); + + auto it = shard.entries.find(tablet_id); + if (it == shard.entries.end()) { + return; + } + + auto& entries = it->second; + entries.erase(std::remove_if(entries.begin(), entries.end(), + [transaction_id](const auto& e) { + return e->transaction_id == transaction_id; + }), + entries.end()); + + if (entries.empty()) { + shard.entries.erase(it); + shard.tablet_dedup_mutex.erase(tablet_id); + } +} + +void UncommittedRowsetRegistry::get_ready_rowsets( + int64_t tablet_id, std::vector<std::shared_ptr<UncommittedRowsetEntry>>* result) { + auto& shard = _get_shard(tablet_id); + std::shared_lock rlock(shard.lock); + + auto it = shard.entries.find(tablet_id); + if (it == shard.entries.end()) { + return; + } + + for (const auto& entry : it->second) { + if (entry->dedup_ready.load(std::memory_order_acquire)) { + result->push_back(entry); + } + } +} + +void UncommittedRowsetRegistry::on_compaction_completed(int64_t tablet_id) { + auto& shard = _get_shard(tablet_id); + bool has_mow_entries = false; + + { + std::shared_lock rlock(shard.lock); + auto it = shard.entries.find(tablet_id); + if (it == shard.entries.end()) { + return; + } + + // Invalidate all cross-delete bitmaps and mark not ready + for (auto& entry : it->second) { + if (entry->unique_key_merge_on_write) { + entry->dedup_ready.store(false, std::memory_order_release); + entry->cross_delete_bitmap.reset(); + has_mow_entries = true; + } + } + } + + // Re-compute dedup for all MoW entries + if (has_mow_entries) { + _recompute_all_dedup(tablet_id); + } +} + +void UncommittedRowsetRegistry::on_tablet_state_change(int64_t tablet_id, + TabletState new_state) { + if (new_state == TABLET_RUNNING) { + return; // Only clear on non-running states + } + + auto& shard = _get_shard(tablet_id); + std::lock_guard wlock(shard.lock); + shard.entries.erase(tablet_id); + shard.tablet_dedup_mutex.erase(tablet_id); +} + +void UncommittedRowsetRegistry::_submit_dedup_task( + int64_t tablet_id, std::shared_ptr<UncommittedRowsetEntry> entry) { + if (!_dedup_thread_pool) { + // No thread pool available, mark ready without cross-bitmap + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + + // Get the per-tablet dedup mutex + std::shared_ptr<std::mutex> dedup_mutex; + { + auto& shard = _get_shard(tablet_id); + std::lock_guard wlock(shard.lock); + dedup_mutex = _get_tablet_dedup_mutex(shard, tablet_id); + } + + // Collect earlier uncommitted rowsets for this tablet + std::vector<RowsetSharedPtr> earlier_rowsets; + { + auto& shard = _get_shard(tablet_id); + std::shared_lock rlock(shard.lock); + auto it = shard.entries.find(tablet_id); + if (it != shard.entries.end()) { + for (const auto& other : it->second) { + if (other->creation_time < entry->creation_time && + other->unique_key_merge_on_write) { + earlier_rowsets.push_back(other->rowset); + } + } + } + } + + // If no earlier uncommitted MoW rowsets, no cross-dedup needed + if (earlier_rowsets.empty()) { + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + + // Submit async task to compute cross-uncommitted delete bitmap + auto st = _dedup_thread_pool->submit_func( + [this, tablet_id, entry, earlier_rowsets = std::move(earlier_rowsets), + dedup_mutex]() mutable { + // Serialize dedup per tablet + std::lock_guard tablet_lock(*dedup_mutex); + + // Check if entry was unregistered while waiting + { + auto& shard = _get_shard(tablet_id); + std::shared_lock rlock(shard.lock); + auto it = shard.entries.find(tablet_id); + if (it == shard.entries.end()) { + return; + } + bool found = false; + for (const auto& e : it->second) { + if (e->transaction_id == entry->transaction_id) { + found = true; + break; + } + } + if (!found) { + return; + } + } + + // Get tablet for bitmap computation + auto tablet_result = + ExecEnv::GetInstance()->storage_engine().get_tablet(tablet_id); + if (!tablet_result.has_value()) { + LOG(WARNING) << "Failed to get tablet " << tablet_id + << " for cross-uncommitted dedup, marking ready without " + "cross-bitmap"; + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + auto tablet = tablet_result.value(); + + // Load segments from the rowset + auto* beta_rowset = dynamic_cast<BetaRowset*>(entry->rowset.get()); + if (!beta_rowset) { + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + std::vector<segment_v2::SegmentSharedPtr> segments; + auto load_st = beta_rowset->load_segments(&segments); + if (!load_st.ok()) { + LOG(WARNING) << "Failed to load segments for cross-uncommitted dedup, " + "tablet_id=" + << tablet_id << " txn_id=" << entry->transaction_id + << ": " << load_st; + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + + if (segments.empty()) { + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + + // Use CalcDeleteBitmapExecutor for the actual computation + auto* calc_executor = + ExecEnv::GetInstance()->storage_engine().calc_delete_bitmap_executor(); + if (!calc_executor) { + entry->dedup_ready.store(true, std::memory_order_release); + return; + } + auto token = calc_executor->create_token(); + + // Compute cross-uncommitted delete bitmap + auto cross_bitmap = std::make_shared<DeleteBitmap>(tablet_id); + auto calc_st = BaseTablet::calc_delete_bitmap( + tablet, entry->rowset, segments, earlier_rowsets, cross_bitmap, + DeleteBitmap::TEMP_VERSION_COMMON, token.get()); + if (calc_st.ok()) { + calc_st = token->wait(); + } + + if (calc_st.ok()) { + entry->cross_delete_bitmap = cross_bitmap; + } else { + LOG(WARNING) << "Failed to compute cross-uncommitted delete bitmap, " + "tablet_id=" + << tablet_id << " txn_id=" << entry->transaction_id + << ": " << calc_st; + } + + entry->dedup_ready.store(true, std::memory_order_release); + }); + + if (!st.ok()) { + LOG(WARNING) << "Failed to submit cross-uncommitted dedup task, tablet_id=" << tablet_id + << " txn_id=" << entry->transaction_id << ": " << st; + entry->dedup_ready.store(true, std::memory_order_release); + } +} + +void UncommittedRowsetRegistry::_recompute_all_dedup(int64_t tablet_id) { + std::vector<std::shared_ptr<UncommittedRowsetEntry>> mow_entries; + + { + auto& shard = _get_shard(tablet_id); + std::shared_lock rlock(shard.lock); + auto it = shard.entries.find(tablet_id); + if (it == shard.entries.end()) { + return; + } + for (auto& entry : it->second) { + if (entry->unique_key_merge_on_write) { + mow_entries.push_back(entry); + } + } + } + + // Re-submit dedup tasks in order (they will serialize via tablet dedup mutex) + for (auto& entry : mow_entries) { + _submit_dedup_task(tablet_id, entry); + } +} + +UncommittedRowsetRegistry* get_uncommitted_rowset_registry() { + auto* env = ExecEnv::GetInstance(); + if (!env) { + return nullptr; + } + if (config::is_cloud_mode()) { + return env->storage_engine().to_cloud().uncommitted_rowset_registry(); + } else { + return env->storage_engine().to_local().uncommitted_rowset_registry(); + } +} + +} // namespace doris diff --git a/be/src/olap/uncommitted_rowset_registry.h b/be/src/olap/uncommitted_rowset_registry.h new file mode 100644 index 00000000000..e2a07634cc3 --- /dev/null +++ b/be/src/olap/uncommitted_rowset_registry.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <atomic> +#include <memory> +#include <mutex> +#include <shared_mutex> +#include <unordered_map> +#include <vector> + +#include "olap/rowset/rowset.h" +#include "olap/tablet_fwd.h" +#include "olap/tablet_meta.h" +#include "util/threadpool.h" + +namespace doris { + +// Represents a single uncommitted rowset tracked by the registry. +// The committed_delete_bitmap is computed at commit time (vs published rowsets). +// The cross_delete_bitmap is computed asynchronously (vs earlier uncommitted rowsets). +struct UncommittedRowsetEntry { + RowsetSharedPtr rowset; + int64_t transaction_id; + int64_t partition_id; + int64_t tablet_id; + // Delete bitmap computed at commit phase against published rowsets (layer 2) + DeleteBitmapPtr committed_delete_bitmap; + // Delete bitmap computed async against earlier uncommitted rowsets (layer 3) + DeleteBitmapPtr cross_delete_bitmap; + bool unique_key_merge_on_write; + int64_t creation_time; // for ordering: later wins in dedup + std::atomic<bool> dedup_ready {false}; // only serve to queries when true +}; + +// UncommittedRowsetRegistry tracks uncommitted rowsets across all tablets on this BE. +// It supports the READ UNCOMMITTED isolation level by making uncommitted data visible +// to queries that opt in via `SET read_uncommitted = true`. +// +// Thread safety: Uses sharded locks for the main map and per-tablet mutexes for +// serializing async dedup computation. +class UncommittedRowsetRegistry { +public: + UncommittedRowsetRegistry(); + ~UncommittedRowsetRegistry(); + + Status init(int dedup_threads); + + // === Write Path === + + // Register an uncommitted rowset. For MoW unique key tables, this triggers + // async dedup computation against earlier uncommitted rowsets. + void register_rowset(std::shared_ptr<UncommittedRowsetEntry> entry); + + // Unregister on publish or rollback. Removes the entry and any associated state. + void unregister_rowset(int64_t tablet_id, int64_t transaction_id); + + // === Read Path === + + // Get all dedup-ready uncommitted rowsets for a tablet. + void get_ready_rowsets(int64_t tablet_id, + std::vector<std::shared_ptr<UncommittedRowsetEntry>>* result); + + // === Compaction Path === + + // Called after compaction modifies published rowsets. Invalidates cross-delete + // bitmaps for all uncommitted rowsets on this tablet and re-computes them. + void on_compaction_completed(int64_t tablet_id); + + // === Schema Change Path === + + // Called when a tablet transitions to a non-running state. Clears all entries. + void on_tablet_state_change(int64_t tablet_id, TabletState new_state); + +private: + static constexpr int SHARD_COUNT = 16; + + struct Shard { + mutable std::shared_mutex lock; + // tablet_id -> list of uncommitted entries (ordered by creation_time) + std::unordered_map<int64_t, std::vector<std::shared_ptr<UncommittedRowsetEntry>>> entries; + // Per-tablet mutex for serializing dedup computation + std::unordered_map<int64_t, std::shared_ptr<std::mutex>> tablet_dedup_mutex; + }; + + Shard& _get_shard(int64_t tablet_id) { return _shards[tablet_id % SHARD_COUNT]; } + const Shard& _get_shard(int64_t tablet_id) const { return _shards[tablet_id % SHARD_COUNT]; } + + // Get or create the per-tablet dedup mutex (must hold shard write lock) + std::shared_ptr<std::mutex> _get_tablet_dedup_mutex(Shard& shard, int64_t tablet_id); + + // Submit async dedup task for one entry against earlier uncommitted rowsets + void _submit_dedup_task(int64_t tablet_id, std::shared_ptr<UncommittedRowsetEntry> entry); + + // Recompute all cross-delete bitmaps for a tablet (after compaction) + void _recompute_all_dedup(int64_t tablet_id); + + Shard _shards[SHARD_COUNT]; + std::unique_ptr<ThreadPool> _dedup_thread_pool; +}; + +// Helper function to get the UncommittedRowsetRegistry from the current storage engine. +// Works for both local and cloud mode. Returns nullptr if not available. +UncommittedRowsetRegistry* get_uncommitted_rowset_registry(); + +} // namespace doris diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 44fe060066a..cb8aecaae6d 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -33,6 +33,7 @@ #include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" +#include "olap/uncommitted_rowset_registry.h" #include "pipeline/exec/scan_operator.h" #include "pipeline/query_cache/query_cache.h" #include "runtime/runtime_state.h" @@ -744,6 +745,70 @@ Status OlapScanLocalState::prepare(RuntimeState* state) { print_id(PipelineXLocalState<>::_state->query_id())); } } + // Inject uncommitted rowsets for READ UNCOMMITTED isolation level + if (_state->read_uncommitted()) { + auto* registry = get_uncommitted_rowset_registry(); + if (registry) { + for (size_t i = 0; i < _scan_ranges.size(); i++) { + auto& tablet = _tablets[i].tablet; + // Only DUP_KEYS and UNIQUE_KEYS supported + if (tablet->keys_type() != DUP_KEYS && + tablet->keys_type() != UNIQUE_KEYS) { + continue; + } + // Skip tablets not in RUNNING state + if (tablet->tablet_state() != TABLET_RUNNING) { + continue; + } + + std::vector<std::shared_ptr<UncommittedRowsetEntry>> ready_entries; + registry->get_ready_rowsets(tablet->tablet_id(), &ready_entries); + if (ready_entries.empty()) { + continue; + } + + bool bitmap_copied = false; + auto ensure_bitmap_copy = [&]() { + if (!bitmap_copied) { + if (_read_sources[i].delete_bitmap) { + _read_sources[i].delete_bitmap = + std::make_shared<DeleteBitmap>(*_read_sources[i].delete_bitmap); + } else { + _read_sources[i].delete_bitmap = + std::make_shared<DeleteBitmap>(tablet->tablet_id()); + } + bitmap_copied = true; + } + }; + + for (auto& entry : ready_entries) { + RowsetReaderSharedPtr rs_reader; + auto st = entry->rowset->create_reader(&rs_reader); + if (!st.ok()) { + LOG(WARNING) << "Failed to create reader for uncommitted rowset, " + "tablet_id=" + << tablet->tablet_id() + << " txn_id=" << entry->transaction_id << ": " << st; + continue; + } + _read_sources[i].rs_splits.emplace_back(std::move(rs_reader)); + + bool is_mow = entry->unique_key_merge_on_write; + // Merge committed-vs-published bitmap (layer 2) + if (is_mow && entry->committed_delete_bitmap) { + ensure_bitmap_copy(); + _read_sources[i].delete_bitmap->merge(*entry->committed_delete_bitmap); + } + // Merge cross-uncommitted bitmap (layer 3) + if (is_mow && entry->cross_delete_bitmap) { + ensure_bitmap_copy(); + _read_sources[i].delete_bitmap->merge(*entry->cross_delete_bitmap); + } + } + } + } + } + timer.stop(); double cost_secs = static_cast<double>(timer.elapsed_time()) / NANOS_PER_SEC; if (cost_secs > 1) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 70234928c64..3901c6d3eeb 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -436,6 +436,10 @@ public: return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version; } + bool read_uncommitted() const { + return _query_options.__isset.read_uncommitted && _query_options.read_uncommitted; + } + int64_t data_queue_max_blocks() const { return _query_options.__isset.data_queue_max_blocks ? _query_options.data_queue_max_blocks : 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 9468a75ce3b..2c129f680cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -447,6 +447,8 @@ public class SessionVariable implements Serializable, Writable { public static final String SKIP_BAD_TABLET = "skip_bad_tablet"; + public static final String READ_UNCOMMITTED = "read_uncommitted"; + public static final String ENABLE_PUSH_DOWN_NO_GROUP_AGG = "enable_push_down_no_group_agg"; public static final String ENABLE_CBO_STATISTICS = "enable_cbo_statistics"; @@ -770,7 +772,8 @@ public class SessionVariable implements Serializable, Writable { SKIP_STORAGE_ENGINE_MERGE, SKIP_MISSING_VERSION, SKIP_BAD_TABLET, - SHOW_HIDDEN_COLUMNS + SHOW_HIDDEN_COLUMNS, + READ_UNCOMMITTED ); public static final String ENABLE_STATS = "enable_stats"; @@ -2059,6 +2062,14 @@ public class SessionVariable implements Serializable, Writable { affectQueryResultInExecution = true) public boolean skipBadTablet = false; + @VariableMgr.VarAttr(name = READ_UNCOMMITTED, needForward = true, + varType = VariableAnnotation.EXPERIMENTAL, + description = {"启用 READ UNCOMMITTED 隔离级别,允许查询读取未提交的 rowset 数据(实验特性)", + "Enable READ UNCOMMITTED isolation level to allow queries to read " + + "uncommitted rowset data (experimental)"}, + affectQueryResultInPlan = true, affectQueryResultInExecution = true) + public boolean readUncommitted = false; + // This variable is used to avoid FE fallback to the original parser. When we execute SQL in regression tests // for nereids, fallback will cause the Doris return the correct result although the syntax is unsupported // in nereids for some mistaken modification. You should set it on the @@ -3589,7 +3600,7 @@ public class SessionVariable implements Serializable, Writable { public boolean isInDebugMode() { return showHiddenColumns || skipDeleteBitmap || skipDeletePredicate || skipDeleteSign || skipStorageEngineMerge - || skipMissingVersion || skipBadTablet; + || skipMissingVersion || skipBadTablet || readUncommitted; } public String printDebugModeVariables() { @@ -5067,6 +5078,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setSkipDeleteBitmap(skipDeleteBitmap); + tResult.setReadUncommitted(readUncommitted); + tResult.setEnableFileCache(enableFileCache); tResult.setEnablePageCache(enablePageCache); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 6709520641b..455e7bf3653 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -426,6 +426,8 @@ struct TQueryOptions { 185: optional bool enable_parquet_file_page_cache = true; + 186: optional bool read_uncommitted = false; + 195: optional bool enable_left_semi_direct_return_opt; // For cloud, to control if the content would be written into file cache --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
