This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ce9a20a375 [enhancement](merge-on-write) format logs about MoW and add
more stats for publish (#20853)
ce9a20a375 is described below
commit ce9a20a375001372881b3d7080397d57548aa4ec
Author: zhannngchen <[email protected]>
AuthorDate: Sat Jun 17 23:14:28 2023 +0800
[enhancement](merge-on-write) format logs about MoW and add more stats for
publish (#20853)
---
be/src/agent/task_worker_pool.cpp | 5 ++-
be/src/olap/delta_writer.cpp | 13 +++---
be/src/olap/memtable.cpp | 10 +++++
be/src/olap/olap_common.h | 6 ++-
be/src/olap/tablet.cpp | 47 +++++++++++++++-------
be/src/olap/tablet.h | 4 +-
be/src/olap/task/engine_publish_version_task.cpp | 26 ++++++++----
be/src/olap/task/engine_publish_version_task.h | 23 ++++++++++-
be/src/olap/txn_manager.cpp | 38 ++++++++++++-----
be/src/olap/txn_manager.h | 12 +++---
be/test/olap/delta_writer_test.cpp | 7 +++-
.../olap/engine_storage_migration_task_test.cpp | 4 +-
be/test/olap/remote_rowset_gc_test.cpp | 4 +-
be/test/olap/tablet_cooldown_test.cpp | 4 +-
be/test/olap/txn_manager_test.cpp | 7 +++-
15 files changed, 151 insertions(+), 59 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index fd2c8a17dd..100ac2fd4a 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1469,7 +1469,7 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
_tasks.push_back(agent_task_req);
_worker_thread_condition_variable.notify_one();
}
- LOG_EVERY_SECOND(INFO) << "wait for previous publish version
task to be done"
+ LOG_EVERY_SECOND(INFO) << "wait for previous publish version
task to be done, "
<< "transaction_id: " <<
publish_version_req.transaction_id;
break;
} else {
@@ -1521,7 +1521,8 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
LOG_INFO("successfully publish version")
.tag("signature", agent_task_req.signature)
.tag("transaction_id", publish_version_req.transaction_id)
- .tag("tablets_num", succ_tablet_ids.size());
+ .tag("tablets_num", succ_tablet_ids.size())
+ .tag("cost(s)", time(nullptr) - agent_task_req.recv_time);
}
status.to_thrift(&finish_task_request.task_status);
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 6789e12e19..394950a193 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -203,8 +203,8 @@ Status DeltaWriter::init() {
context.tablet_id = _tablet->table_id();
context.tablet = _tablet;
context.write_type = DataWriteType::TYPE_DIRECT;
- context.mow_context =
- std::make_shared<MowContext>(_cur_max_version, _rowset_ids,
_delete_bitmap);
+ context.mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
+ _delete_bitmap);
RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
_schema.reset(new Schema(_tablet_schema));
@@ -347,7 +347,8 @@ void DeltaWriter::_reset_mem_table() {
_mem_table_insert_trackers.push_back(mem_table_insert_tracker);
_mem_table_flush_trackers.push_back(mem_table_flush_tracker);
}
- auto mow_context = std::make_shared<MowContext>(_cur_max_version,
_rowset_ids, _delete_bitmap);
+ auto mow_context = std::make_shared<MowContext>(_cur_max_version,
_req.txn_id, _rowset_ids,
+ _delete_bitmap);
_mem_table.reset(new MemTable(_tablet, _schema.get(),
_tablet_schema.get(), _req.slots,
_req.tuple_desc, _rowset_writer.get(),
mow_context,
mem_table_insert_tracker,
mem_table_flush_tracker));
@@ -454,9 +455,9 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes&
slave_tablet_nodes,
_delete_bitmap));
}
int64_t cur_max_version = _tablet->max_version().second;
-
RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(_cur_rowset,
_rowset_ids,
-
_delete_bitmap, cur_max_version,
- segments,
_rowset_writer.get()));
+ RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
+ _cur_rowset, _rowset_ids, _delete_bitmap, cur_max_version,
segments, _req.txn_id,
+ _rowset_writer.get()));
_rowset_ids = _tablet->all_rs_id(cur_max_version);
}
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id,
_tablet, _req.txn_id,
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 9a6e5687d2..2cdc440848 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -459,9 +459,19 @@ Status MemTable::_generate_delete_bitmap(int32_t
segment_id) {
SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
return Status::OK();
}
+
+ OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments,
&_mow_context->rowset_ids,
_mow_context->delete_bitmap,
_mow_context->max_version));
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " <<
tablet_id()
+ << ", rowset_ids: " << _mow_context->rowset_ids.size()
+ << ", cur max_version: " << _mow_context->max_version
+ << ", transaction_id: " << _mow_context->txn_id
+ << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
return Status::OK();
}
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index d3d9bf3122..1ebf306b41 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -461,9 +461,11 @@ using RowsetIdUnorderedSet = std::unordered_set<RowsetId,
HashOfRowsetId>;
class DeleteBitmap;
// merge on write context
struct MowContext {
- MowContext(int64_t version, const RowsetIdUnorderedSet& ids,
std::shared_ptr<DeleteBitmap> db)
- : max_version(version), rowset_ids(ids), delete_bitmap(db) {}
+ MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
+ std::shared_ptr<DeleteBitmap> db)
+ : max_version(version), txn_id(txnid), rowset_ids(ids),
delete_bitmap(db) {}
int64_t max_version;
+ int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
std::shared_ptr<DeleteBitmap> delete_bitmap;
};
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index fb9d0bdadd..6a3749ca4f 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3009,11 +3009,6 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
for (auto seg_delete_bitmap : seg_delete_bitmaps) {
delete_bitmap->merge(*seg_delete_bitmap);
}
-
- LOG(INFO) << "construct delete bitmap tablet: " << tablet_id() << "
rowset: " << rowset_id
- << " dummy_version: " << end_version + 1
- << " bitmap num: " << delete_bitmap->delete_bitmap.size()
- << " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
}
@@ -3170,8 +3165,18 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
RowsetIdUnorderedSet cur_rowset_ids = all_rs_id(cur_version - 1);
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments,
delete_bitmap));
+
+ OlapStopWatch watch;
RETURN_IF_ERROR(
calc_delete_bitmap(rowset, segments, &cur_rowset_ids,
delete_bitmap, cur_version - 1));
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: "
<< tablet_id()
+ << ", rowset_ids: " << cur_rowset_ids.size() << ", cur
max_version: " << cur_version
+ << ", transaction_id: " << -1 << ", cost: " <<
watch.get_elapse_time_us()
+ << "(us), total rows: " << total_rows;
+
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
_tablet_meta->delete_bitmap().merge(
@@ -3184,7 +3189,8 @@ Status Tablet::update_delete_bitmap_without_lock(const
RowsetSharedPtr& rowset)
Status Tablet::commit_phase_update_delete_bitmap(
const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet&
pre_rowset_ids,
DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
- const std::vector<segment_v2::SegmentSharedPtr>& segments,
RowsetWriter* rowset_writer) {
+ const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t
txn_id,
+ RowsetWriter* rowset_writer) {
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
@@ -3192,22 +3198,28 @@ Status Tablet::commit_phase_update_delete_bitmap(
std::shared_lock meta_rlock(_meta_lock);
cur_rowset_ids = all_rs_id(cur_version);
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
- if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
- LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
- << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
- }
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
+ OlapStopWatch watch;
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add,
delete_bitmap,
cur_version, rowset_writer));
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Before Commit] construct delete bitmap tablet: " <<
tablet_id()
+ << ", rowset_ids to add: " << rowset_ids_to_add.size()
+ << ", rowset_ids to del: " << rowset_ids_to_del.size()
+ << ", cur max_version: " << cur_version << ", transaction_id: "
<< txn_id
+ << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
return Status::OK();
}
Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer) {
+ DeleteBitmapPtr delete_bitmap, int64_t
txn_id,
+ RowsetWriter* rowset_writer) {
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
@@ -3227,16 +3239,21 @@ Status Tablet::update_delete_bitmap(const
RowsetSharedPtr& rowset,
}
cur_rowset_ids = all_rs_id(cur_version - 1);
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
&rowset_ids_to_del);
- if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
- LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
- << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
- }
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
+ OlapStopWatch watch;
RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add,
delete_bitmap,
cur_version - 1, rowset_writer));
+ size_t total_rows = std::accumulate(
+ segments.begin(), segments.end(), 0,
+ [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
+ LOG(INFO) << "[Publish] construct delete bitmap tablet: " << tablet_id()
+ << ", rowset_ids to add: " << rowset_ids_to_add.size()
+ << ", rowset_ids to del: " << rowset_ids_to_del.size()
+ << ", cur max_version: " << cur_version << ", transaction_id: "
<< txn_id
+ << ", cost: " << watch.get_elapse_time_us() << "(us), total
rows: " << total_rows;
// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 50eb3469f1..3c3a010397 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -458,12 +458,12 @@ public:
Status commit_phase_update_delete_bitmap(
const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet&
pre_rowset_ids,
DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
- const std::vector<segment_v2::SegmentSharedPtr>& segments,
+ const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t
txn_id,
RowsetWriter* rowset_writer = nullptr);
Status update_delete_bitmap(const RowsetSharedPtr& rowset,
const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap,
+ DeleteBitmapPtr delete_bitmap, int64_t txn_id,
RowsetWriter* rowset_writer = nullptr);
void calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index d8fd5c1bbe..bca689960a 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -142,11 +142,12 @@ Status EnginePublishVersionTask::finish() {
}
if (tablet_state == TabletState::TABLET_RUNNING &&
version.first != max_version.second + 1) {
- VLOG_NOTICE << "uniq key with merge-on-write version not
continuous, current "
- "max "
- "version="
- << max_version.second << ", publish_version="
<< version.first
- << " tablet_id=" << tablet->tablet_id();
+ LOG_EVERY_SECOND(INFO)
+ << "uniq key with merge-on-write version not
continuous, "
+ "current max version="
+ << max_version.second << ", publish_version=" <<
version.first
+ << ", tablet_id=" << tablet->tablet_id()
+ << ", transaction_id=" <<
_publish_version_req.transaction_id;
// If a tablet migrates out and back, the previously failed
// publish task may retry on the new tablet, so check
// whether the version exists. if not exist, then set
@@ -219,16 +220,19 @@
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
_partition_id(partition_id),
_transaction_id(transaction_id),
_version(version),
- _tablet_info(tablet_info) {}
+ _tablet_info(tablet_info) {
+ _stats.submit_time_us = MonotonicMicros();
+}
void TabletPublishTxnTask::handle() {
+ _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
Defer defer {[&] {
if (_engine_publish_version_task->finish_task() == 1) {
_engine_publish_version_task->notify();
}
}};
auto publish_status =
StorageEngine::instance()->txn_manager()->publish_txn(
- _partition_id, _tablet, _transaction_id, _version);
+ _partition_id, _tablet, _transaction_id, _version, &_stats);
if (publish_status != Status::OK()) {
LOG(WARNING) << "failed to publish version. rowset_id=" <<
_rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ",
txn_id=" << _transaction_id
@@ -238,7 +242,9 @@ void TabletPublishTxnTask::handle() {
}
// add visible rowset to tablet
+ int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(_rowset);
+ _stats.add_inc_rowset_us = MonotonicMicros() - t1;
if (publish_status != Status::OK() &&
!publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" <<
_rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ",
txn_id=" << _transaction_id
@@ -247,10 +253,14 @@ void TabletPublishTxnTask::handle() {
return;
}
_engine_publish_version_task->add_succ_tablet_id(_tablet_info.tablet_id);
+ int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
+ // print stats if publish cost > 500ms
LOG(INFO) << "publish version successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", tablet=" <<
_tablet->full_name()
<< ", transaction_id=" << _transaction_id << ", version=" <<
_version.first
- << ", num_rows=" << _rowset->num_rows() << ", res=" <<
publish_status;
+ << ", num_rows=" << _rowset->num_rows() << ", res=" <<
publish_status
+ << ", cost: " << cost_us << "(us) "
+ << (cost_us > 500 * 1000 ? _stats.to_string() : "");
}
} // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index 7c163839bd..efc7c8a4cf 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -31,18 +31,38 @@
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"
#include "olap/task/engine_task.h"
+#include "util/time.h"
namespace doris {
class EnginePublishVersionTask;
class TPublishVersionRequest;
+struct TabletPublishStatistics {
+ int64_t submit_time_us = 0;
+ int64_t schedule_time_us = 0;
+ int64_t lock_wait_time_us = 0;
+ int64_t save_meta_time_us = 0;
+ int64_t calc_delete_bitmap_time_us = 0;
+ int64_t partial_update_write_segment_us = 0;
+ int64_t add_inc_rowset_us = 0;
+
+ std::string to_string() {
+ return fmt::format(
+ "[Publish Statistics: schedule time(us): {}, lock wait
time(us): {}, save meta "
+ "time(us): {}, calc delete bitmap time(us): {}, partial update
write segment "
+ "time(us): {}, add inc rowset time(us): {}]",
+ schedule_time_us, lock_wait_time_us, save_meta_time_us,
calc_delete_bitmap_time_us,
+ partial_update_write_segment_us, add_inc_rowset_us);
+ }
+};
+
class TabletPublishTxnTask {
public:
TabletPublishTxnTask(EnginePublishVersionTask* engine_task,
TabletSharedPtr tablet,
RowsetSharedPtr rowset, int64_t partition_id, int64_t
transaction_id,
Version version, const TabletInfo& tablet_info);
- ~TabletPublishTxnTask() {}
+ ~TabletPublishTxnTask() = default;
void handle();
@@ -55,6 +75,7 @@ private:
int64_t _transaction_id;
Version _version;
TabletInfo _tablet_info;
+ TabletPublishStatistics _stats;
};
class EnginePublishVersionTask : public EngineTask {
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index f7e56fd785..ad799868aa 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -42,6 +42,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
#include "util/time.h"
namespace doris {
@@ -78,7 +79,7 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t
txn_shard_size)
_txn_map_locks = new std::shared_mutex[_txn_map_shard_size];
_txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size];
_txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size];
- _txn_mutex = new std::mutex[_txn_shard_size];
+ _txn_mutex = new std::shared_mutex[_txn_shard_size];
_txn_tablet_delta_writer_map = new
txn_tablet_delta_writer_map_t[_txn_map_shard_size];
_txn_tablet_delta_writer_map_locks = new
std::shared_mutex[_txn_map_shard_size];
}
@@ -165,9 +166,11 @@ Status TxnManager::commit_txn(TPartitionId partition_id,
const TabletSharedPtr&
}
Status TxnManager::publish_txn(TPartitionId partition_id, const
TabletSharedPtr& tablet,
- TTransactionId transaction_id, const Version&
version) {
+ TTransactionId transaction_id, const Version&
version,
+ TabletPublishStatistics* stats) {
return publish_txn(tablet->data_dir()->get_meta(), partition_id,
transaction_id,
- tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version);
+ tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), version,
+ stats);
}
// delete the txn from manager if it is not committed(not have a valid rowset)
@@ -192,7 +195,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId
partition_id,
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
{
// get tx
std::lock_guard<std::shared_mutex>
wrlock(_get_txn_map_lock(transaction_id));
@@ -236,7 +239,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
return Status::Error<ROWSET_INVALID>();
}
- std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+ std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
// this while loop just run only once, just for if break
do {
// get tx
@@ -322,21 +325,24 @@ Status TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
// remove a txn from txn manager
Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id, TTabletId
tablet_id,
- SchemaHash schema_hash, TabletUid tablet_uid,
- const Version& version) {
+ SchemaHash schema_hash, TabletUid tablet_uid,
const Version& version,
+ TabletPublishStatistics* stats) {
auto tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
return Status::OK();
}
+ DCHECK(stats != nullptr);
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
RowsetSharedPtr rowset = nullptr;
TabletTxnInfo tablet_txn_info;
+ int64_t t1 = MonotonicMicros();
/// Step 1: get rowset, tablet_txn_info by key
{
- std::unique_lock<std::mutex> txn_rlock(_get_txn_lock(transaction_id));
+ std::shared_lock txn_rlock(_get_txn_lock(transaction_id));
std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id));
+ stats->lock_wait_time_us += MonotonicMicros() - t1;
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
@@ -368,9 +374,12 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
std::unique_ptr<RowsetWriter> rowset_writer;
_create_transient_rowset_writer(tablet, rowset, &rowset_writer);
+ int64_t t2 = MonotonicMicros();
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset,
tablet_txn_info.rowset_ids,
-
tablet_txn_info.delete_bitmap,
+
tablet_txn_info.delete_bitmap, transaction_id,
rowset_writer.get()));
+ int64_t t3 = MonotonicMicros();
+ stats->calc_delete_bitmap_time_us = t3 - t2;
if (rowset->tablet_schema()->is_partial_update()) {
// build rowset writer and merge transient rowset
RETURN_IF_ERROR(rowset_writer->flush());
@@ -380,8 +389,11 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
// erase segment cache cause we will add a segment to rowset
SegmentLoader::instance()->erase_segment(rowset->rowset_id());
}
+ stats->partial_update_write_segment_us = MonotonicMicros() - t3;
+ int64_t t4 = MonotonicMicros();
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
+ stats->save_meta_time_us = MonotonicMicros() - t4;
}
/// Step 3: add to binlog
@@ -397,8 +409,10 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
}
/// Step 4: save meta
+ int64_t t5 = MonotonicMicros();
auto status = RowsetMetaManager::save(meta, tablet_uid,
rowset->rowset_id(),
rowset->rowset_meta()->get_rowset_pb(), enable_binlog);
+ stats->save_meta_time_us += MonotonicMicros() - t5;
if (!status.ok()) {
LOG(WARNING) << "save committed rowset failed. when publish txn
rowset_id:"
<< rowset->rowset_id() << ", tablet id: " << tablet_id
@@ -415,14 +429,16 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
/// Step 5: remove tablet_info from tnx_tablet_map
// txn_tablet_map[key] empty, remove key from txn_tablet_map
- std::unique_lock<std::mutex> txn_lock(_get_txn_lock(transaction_id));
+ int64_t t6 = MonotonicMicros();
+ std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id));
std::lock_guard<std::shared_mutex>
wrlock(_get_txn_map_lock(transaction_id));
+ stats->lock_wait_time_us += MonotonicMicros() - t6;
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) {
it->second.erase(tablet_info);
VLOG_NOTICE << "publish txn successfully."
<< " partition_id: " << key.first << ", txn_id: " <<
key.second
- << ", tablet: " << tablet_info.to_string()
+ << ", tablet_id: " << tablet_info.tablet_id
<< ", rowsetid: " << rowset->rowset_id() << ", version: "
<< version.first
<< "," << version.second;
if (it->second.empty()) {
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index e2ed4290f9..36be3b03f5 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -48,6 +48,7 @@
namespace doris {
class DeltaWriter;
class OlapMeta;
+struct TabletPublishStatistics;
struct TabletTxnInfo {
PUniqueId load_id;
@@ -106,7 +107,8 @@ public:
const RowsetSharedPtr& rowset_ptr, bool is_recovery);
Status publish_txn(TPartitionId partition_id, const TabletSharedPtr&
tablet,
- TTransactionId transaction_id, const Version& version);
+ TTransactionId transaction_id, const Version& version,
+ TabletPublishStatistics* stats);
// delete the txn from manager if it is not committed(not have a valid
rowset)
Status rollback_txn(TPartitionId partition_id, const TabletSharedPtr&
tablet,
@@ -124,7 +126,7 @@ public:
// not persist rowset meta because
Status publish_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid
tablet_uid,
- const Version& version);
+ const Version& version, TabletPublishStatistics* stats);
// delete the txn from manager if it is not committed(not have a valid
rowset)
Status rollback_txn(TPartitionId partition_id, TTransactionId
transaction_id,
@@ -201,7 +203,7 @@ private:
txn_partition_map_t& _get_txn_partition_map(TTransactionId transactionId);
- inline std::mutex& _get_txn_lock(TTransactionId transactionId);
+ inline std::shared_mutex& _get_txn_lock(TTransactionId transactionId);
std::shared_mutex& _get_txn_tablet_delta_writer_map_lock(TTransactionId
transactionId);
@@ -231,7 +233,7 @@ private:
std::shared_mutex* _txn_map_locks;
- std::mutex* _txn_mutex;
+ std::shared_mutex* _txn_mutex;
txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map;
std::shared_mutex* _txn_tablet_delta_writer_map_locks;
@@ -251,7 +253,7 @@ inline TxnManager::txn_partition_map_t&
TxnManager::_get_txn_partition_map(
return _txn_partition_maps[transactionId & (_txn_map_shard_size - 1)];
}
-inline std::mutex& TxnManager::_get_txn_lock(TTransactionId transactionId) {
+inline std::shared_mutex& TxnManager::_get_txn_lock(TTransactionId
transactionId) {
return _txn_mutex[transactionId & (_txn_shard_size - 1)];
}
diff --git a/be/test/olap/delta_writer_test.cpp
b/be/test/olap/delta_writer_test.cpp
index 1085a09879..f1d5f49619 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -47,6 +47,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
+#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
@@ -610,9 +611,10 @@ TEST_F(TestDeltaWriter, vec_write) {
for (auto& tablet_rs : tablet_related_rs) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_rs.second;
+ TabletPublishStatistics stats;
res = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
write_req.schema_hash,
- tablet_rs.first.tablet_uid,
version);
+ tablet_rs.first.tablet_uid,
version, &stats);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
@@ -725,9 +727,10 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;
+ TabletPublishStatistics pstats;
res = k_engine->txn_manager()->publish_txn(
meta, write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
- write_req.schema_hash,
tablet_related_rs.begin()->first.tablet_uid, version);
+ write_req.schema_hash,
tablet_related_rs.begin()->first.tablet_uid, version, &pstats);
ASSERT_TRUE(res.ok());
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
index e4dfe784d2..79fbf26203 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -46,6 +46,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
+#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptor_helper.h"
@@ -206,9 +207,10 @@ TEST_F(TestEngineStorageMigrationTask,
write_and_migration) {
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
+ TabletPublishStatistics stats;
res = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
tablet->tablet_id(),
tablet->schema_hash(),
- tablet->tablet_uid(),
version);
+ tablet->tablet_uid(),
version, &stats);
EXPECT_EQ(Status::OK(), res);
res = tablet->add_inc_rowset(rowset);
EXPECT_EQ(Status::OK(), res);
diff --git a/be/test/olap/remote_rowset_gc_test.cpp
b/be/test/olap/remote_rowset_gc_test.cpp
index 49a70b56a5..a3b21a52c9 100644
--- a/be/test/olap/remote_rowset_gc_test.cpp
+++ b/be/test/olap/remote_rowset_gc_test.cpp
@@ -50,6 +50,7 @@
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptor_helper.h"
@@ -212,9 +213,10 @@ TEST_F(RemoteRowsetGcTest, normal) {
write_req.txn_id, write_req.partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
+ TabletPublishStatistics stats;
st = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
write_req.tablet_id,
write_req.schema_hash,
- tablet_rs.first.tablet_uid,
version);
+ tablet_rs.first.tablet_uid,
version, &stats);
ASSERT_EQ(Status::OK(), st);
st = tablet->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index 52b23c68f2..038605f7cb 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -57,6 +57,7 @@
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/tablet_meta.h"
+#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptor_helper.h"
@@ -405,9 +406,10 @@ void createTablet(StorageEngine* engine, TabletSharedPtr*
tablet, int64_t replic
&tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
RowsetSharedPtr rowset = tablet_rs.second;
+ TabletPublishStatistics stats;
st = engine->txn_manager()->publish_txn(meta, write_req.partition_id,
write_req.txn_id,
(*tablet)->tablet_id(),
(*tablet)->schema_hash(),
- (*tablet)->tablet_uid(),
version);
+ (*tablet)->tablet_uid(),
version, &stats);
ASSERT_EQ(Status::OK(), st);
st = (*tablet)->add_inc_rowset(rowset);
ASSERT_EQ(Status::OK(), st);
diff --git a/be/test/olap/txn_manager_test.cpp
b/be/test/olap/txn_manager_test.cpp
index 2932ae4aa8..146836cdc2 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -40,6 +40,7 @@
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
+#include "olap/task/engine_publish_version_task.h"
#include "util/uid_util.h"
using ::testing::_;
@@ -280,8 +281,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
schema_hash, _tablet_uid, load_id,
_rowset, false);
EXPECT_TRUE(status == Status::OK());
Version new_version(10, 11);
+ TabletPublishStatistics stats;
status = _txn_mgr->publish_txn(_meta, partition_id, transaction_id,
tablet_id, schema_hash,
- _tablet_uid, new_version);
+ _tablet_uid, new_version, &stats);
EXPECT_TRUE(status == Status::OK());
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
@@ -298,8 +300,9 @@ TEST_F(TxnManagerTest, PublishVersionSuccessful) {
TEST_F(TxnManagerTest, PublishNotExistedTxn) {
Version new_version(10, 11);
auto not_exist_txn = transaction_id + 1000;
+ TabletPublishStatistics stats;
Status status = _txn_mgr->publish_txn(_meta, partition_id, not_exist_txn,
tablet_id,
- schema_hash, _tablet_uid,
new_version);
+ schema_hash, _tablet_uid,
new_version, &stats);
EXPECT_EQ(status, Status::OK());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]