This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 42239d635a [fix](tablet_manager_lock) fix create tablet timeout #20067
(#20069)
42239d635a is described below
commit 42239d635aa7fe6249aabbd256cd70f8f7b1bde4
Author: yujun <[email protected]>
AuthorDate: Sun May 28 23:05:13 2023 +0800
[fix](tablet_manager_lock) fix create tablet timeout #20067 (#20069)
---
be/src/http/action/pad_rowset_action.cpp | 2 ++
be/src/olap/cold_data_compaction.cpp | 2 ++
be/src/olap/compaction.cpp | 1 +
be/src/olap/schema.cpp | 1 +
be/src/olap/schema_change.cpp | 5 ++++
be/src/olap/tablet.cpp | 11 +++++++-
be/src/olap/tablet.h | 2 ++
be/src/olap/tablet_manager.cpp | 26 +++++++++----------
be/src/olap/task/engine_clone_task.cpp | 2 ++
be/src/util/trace.h | 29 ++++++++++++++++++++++
.../apache/doris/datasource/InternalCatalog.java | 2 +-
11 files changed, 67 insertions(+), 16 deletions(-)
diff --git a/be/src/http/action/pad_rowset_action.cpp
b/be/src/http/action/pad_rowset_action.cpp
index 18e0b2568e..d337c0199d 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -38,6 +38,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "util/time.h"
+#include "util/trace.h"
namespace doris {
@@ -113,6 +114,7 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet,
const Version& versi
std::vector<RowsetSharedPtr> to_delete;
{
std::unique_lock wlock(tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
tablet->modify_rowsets(to_add, to_delete);
tablet->save_meta();
}
diff --git a/be/src/olap/cold_data_compaction.cpp
b/be/src/olap/cold_data_compaction.cpp
index 6757691d84..9b2979ff3c 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -35,6 +35,7 @@
#include "olap/tablet_meta.h"
#include "runtime/thread_context.h"
#include "util/thread.h"
+#include "util/trace.h"
#include "util/uid_util.h"
namespace doris {
@@ -83,6 +84,7 @@ Status ColdDataCompaction::modify_rowsets(const
Merger::Statistics* stats) {
UniqueId cooldown_meta_id = UniqueId::gen_uid();
{
std::lock_guard wlock(_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Merged cooldowned rowsets MUST NOT be managed by version graph,
they will be reclaimed by `remove_unused_remote_files`.
_tablet->delete_rowsets(_input_rowsets, false);
_tablet->add_rowsets({_output_rowset});
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 4634cdbfc7..0b1a702654 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -535,6 +535,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics*
stats) {
{
std::lock_guard<std::mutex>
wrlock_(_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex>
wrlock(_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Convert the delete bitmap of the input rowsets to output rowset
for
// incremental data.
diff --git a/be/src/olap/schema.cpp b/be/src/olap/schema.cpp
index 373773fffe..95f6a47a70 100644
--- a/be/src/olap/schema.cpp
+++ b/be/src/olap/schema.cpp
@@ -26,6 +26,7 @@
#include "common/config.h"
#include "runtime/define_primitive_type.h"
+#include "util/trace.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_map.h"
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 5952108eab..5baedefa42 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -63,6 +63,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/defer_op.h"
+#include "util/trace.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/aggregate_functions/aggregate_function_reader.h"
#include "vec/columns/column.h"
@@ -747,6 +748,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
std::lock_guard<std::mutex>
base_tablet_lock(base_tablet->get_push_lock());
std::lock_guard<std::mutex>
new_tablet_lock(new_tablet->get_push_lock());
std::lock_guard<std::shared_mutex>
base_tablet_wlock(base_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
std::lock_guard<std::shared_mutex>
new_tablet_wlock(new_tablet->get_header_lock());
do {
@@ -947,6 +949,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
// step 3
std::lock_guard<std::mutex>
rwlock(new_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t new_max_version = new_tablet->max_version().second;
rowsets.clear();
if (max_version < new_max_version) {
@@ -977,6 +980,7 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
} else {
// set state to ready
std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
if (!res) {
break;
@@ -1053,6 +1057,7 @@ Status
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
{
// save tablet meta here because rowset meta is not saved during
add rowset
std::lock_guard<std::shared_mutex>
new_wlock(sc_params.new_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
sc_params.new_tablet->save_meta();
}
if (res) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 0cda3978ae..9584adffe7 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -127,12 +127,15 @@ class Block;
} // namespace vectorized
using namespace ErrorCode;
+using namespace std::chrono_literals;
using std::pair;
using std::string;
using std::vector;
using io::FileSystemSPtr;
+const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 10s;
+
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_bytes, MetricUnit::BYTES);
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(flush_finish_count,
MetricUnit::OPERATIONS);
@@ -379,7 +382,7 @@ Status Tablet::revise_tablet_meta(const
std::vector<RowsetSharedPtr>& to_add,
}
RowsetSharedPtr Tablet::get_rowset(const RowsetId& rowset_id) {
- std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
+ std::shared_lock rdlock(_meta_lock);
for (auto& version_rowset : _rs_version_map) {
if (version_rowset.second->rowset_id() == rowset_id) {
return version_rowset.second;
@@ -396,6 +399,7 @@ RowsetSharedPtr Tablet::get_rowset(const RowsetId&
rowset_id) {
Status Tablet::add_rowset(RowsetSharedPtr rowset) {
DCHECK(rowset != nullptr);
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// If the rowset already exist, just return directly. The rowset_id is an
unique-id,
// we can use it to check this situation.
if (_contains_rowset(rowset->rowset_id())) {
@@ -669,6 +673,7 @@ void Tablet::_delete_stale_rowset_by_version(const Version&
version) {
void Tablet::delete_expired_stale_rowset() {
int64_t now = UnixSeconds();
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
// Compute the end time to delete rowsets, when a expired rowset
createtime less then this time, it will be deleted.
double expired_stale_sweep_endtime =
::difftime(now, config::tablet_rowset_stale_sweep_time_sec);
@@ -1137,6 +1142,7 @@ void
Tablet::_max_continuous_version_from_beginning_unlocked(Version* version, V
void Tablet::calculate_cumulative_point() {
std::lock_guard<std::shared_mutex> wrlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
int64_t ret_cumulative_point;
_cumulative_compaction_policy->calculate_cumulative_point(
this, _tablet_meta->all_rs_metas(), _cumulative_point,
&ret_cumulative_point);
@@ -1929,6 +1935,7 @@ Status Tablet::_cooldown_data() {
erase_pending_remote_rowset(new_rowset_id.to_string());
{
std::unique_lock meta_rlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
save_meta();
}
// upload cooldowned rowset meta to remote fs
@@ -2051,6 +2058,7 @@ Status Tablet::_follow_cooldowned_data() {
{
std::lock_guard wlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
if (tablet_state() != TABLET_RUNNING) {
return Status::InternalError("tablet not running");
}
@@ -2099,6 +2107,7 @@ Status Tablet::_follow_cooldowned_data() {
}
{
std::lock_guard rlock(_meta_lock);
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
save_meta();
}
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1399b4c253..027806ddc0 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -85,6 +85,8 @@ using TabletSharedPtr = std::shared_ptr<Tablet>;
enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE,
STORAGE_TYPE_REMOTE_AND_LOCAL };
+extern const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD;
+
class Tablet : public BaseTablet {
public:
static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr
tablet_meta,
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 9ede6f0d73..f0494dd3ab 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -480,6 +480,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId
tablet_id, TReplicaId repl
if (!keep_files) {
// drop tablet will update tablet meta, should lock
std::lock_guard<std::shared_mutex>
wrlock(to_drop_tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
LOG(INFO) << "set tablet to shutdown state and remove it from memory. "
<< "tablet_id=" << tablet_id << ", tablet_path=" <<
to_drop_tablet->tablet_path();
// NOTE: has to update tablet here, but must not update tablet meta
directly.
@@ -1107,20 +1108,17 @@ void
TabletManager::update_root_path_info(std::map<string, DataDirInfo>* path_ma
size_t* tablet_count) {
DCHECK(tablet_count);
*tablet_count = 0;
- for (const auto& tablets_shard : _tablets_shards) {
- std::shared_lock rdlock(tablets_shard.lock);
- for (const auto& item : tablets_shard.tablet_map) {
- TabletSharedPtr tablet = item.second;
- ++(*tablet_count);
- auto iter = path_map->find(tablet->data_dir()->path());
- if (iter == path_map->end()) {
- continue;
- }
- if (iter->second.is_used) {
- iter->second.local_used_capacity +=
tablet->tablet_local_size();
- iter->second.remote_used_capacity +=
tablet->tablet_remote_size();
- }
- }
+ auto filter = [path_map, tablet_count](Tablet* t) -> bool {
+ ++(*tablet_count);
+ auto iter = path_map->find(t->data_dir()->path());
+ return iter != path_map->end() && iter->second.is_used;
+ };
+
+ auto tablets = get_all_tablet(filter);
+ for (const auto& tablet : tablets) {
+ auto& data_dir_info = (*path_map)[tablet->data_dir()->path()];
+ data_dir_info.local_used_capacity += tablet->tablet_local_size();
+ data_dir_info.remote_used_capacity += tablet->tablet_remote_size();
}
}
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 34a6b34a8e..2e47c176a1 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -63,6 +63,7 @@
#include "util/network_util.h"
#include "util/stopwatch.hpp"
#include "util/thrift_rpc_helper.h"
+#include "util/trace.h"
using std::set;
using std::stringstream;
@@ -582,6 +583,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const
std::string& clone_d
std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock());
+ SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
if (is_incremental_clone) {
status = _finish_incremental_clone(tablet, cloned_tablet_meta,
committed_version);
} else {
diff --git a/be/src/util/trace.h b/be/src/util/trace.h
index 1ebd602b65..7fe40a2aa2 100644
--- a/be/src/util/trace.h
+++ b/be/src/util/trace.h
@@ -21,15 +21,18 @@
#include <rapidjson/writer.h>
#include <stdint.h>
+#include <chrono>
#include <iosfwd>
#include <string>
#include <utility>
#include <vector>
+#include "common/logging.h"
#include "gutil/ref_counted.h"
#include "gutil/strings/stringpiece.h"
#include "gutil/strings/substitute.h"
#include "gutil/threading/thread_collision_warner.h"
+#include "util/scoped_cleanup.h"
#include "util/spinlock.h"
#include "util/time.h"
#include "util/trace_metrics.h"
@@ -113,6 +116,32 @@ class Trace;
} \
}()
+// If this scope times out, make a simple trace.
+// It will log the cost time only.
+// Timeout is chrono duration struct, eg: 5ms, 100 * 1s.
+#define SCOPED_SIMPLE_TRACE_IF_TIMEOUT(timeout) \
+ SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, LOG(WARNING))
+
+// If this scope times out, then put simple trace to the stream.
+// Timeout is chrono duration struct, eg: 5ms, 100 * 1s.
+// For example:
+//
+// std::string tag = "[foo]";
+// SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(5s, LOG(INFO) << tag);
+//
+#define SCOPED_SIMPLE_TRACE_TO_STREAM_IF_TIMEOUT(timeout, stream)
\
+ using namespace std::chrono_literals;
\
+ auto VARNAME_LINENUM(scoped_simple_trace) = doris::MonotonicMicros();
\
+ SCOPED_CLEANUP({
\
+ auto VARNAME_LINENUM(timeout_us) =
\
+
std::chrono::duration_cast<std::chrono::microseconds>(timeout).count(); \
+ auto VARNAME_LINENUM(cost_us) =
\
+ doris::MonotonicMicros() -
VARNAME_LINENUM(scoped_simple_trace); \
+ if (VARNAME_LINENUM(cost_us) >= VARNAME_LINENUM(timeout_us)) {
\
+ stream << "Simple trace cost(us): " << VARNAME_LINENUM(cost_us);
\
+ }
\
+ })
+
namespace doris {
struct TraceEntry;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 3040fc5d3a..8ffe8b7c5f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1784,7 +1784,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
if (!ok || !countDownLatch.getStatus().ok()) {
- errMsg = "Failed to create partition[" + partitionName + "].
Timeout:" + timeout + " seconds.";
+ errMsg = "Failed to create partition[" + partitionName + "].
Timeout:" + (timeout / 1000) + " seconds.";
// clear tasks
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]