This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 52fbc0233de [improvement](merge-on-write) Optimize publish when there
are missing versions (#28012) (#28964)
52fbc0233de is described below
commit 52fbc0233defcb63f5a07d54c865de57f506f8b8
Author: Xin Liao <[email protected]>
AuthorDate: Tue Dec 26 12:23:10 2023 +0800
[improvement](merge-on-write) Optimize publish when there are missing
versions (#28012) (#28964)
1. Do not retry publishing on be When there are too many missing versions,
just
add to async publish task.
2. To reduce memory consumption, clean up the tasks when there are too many
async publish tasks.
---
be/src/agent/task_worker_pool.cpp | 8 +-
be/src/common/config.cpp | 3 +
be/src/common/config.h | 3 +
be/src/olap/olap_server.cpp | 120 ++++++++++++++---------
be/src/olap/storage_engine.h | 5 +-
be/src/olap/task/engine_publish_version_task.cpp | 13 ++-
be/test/olap/storage_engine_test.cpp | 104 ++++++++++++++++++--
7 files changed, 197 insertions(+), 59 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 05486be044b..c48092d863e 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1543,6 +1543,11 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
if (status.ok()) {
break;
} else if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) {
+ // there are too many missing versions, it has been be added
to async
+ // publish task, so no need to retry here.
+ if (discontinuous_version_tablets.empty()) {
+ break;
+ }
int64_t time_elapsed = time(nullptr) -
agent_task_req.recv_time;
if (time_elapsed > config::publish_version_task_timeout_s) {
LOG(INFO) << "task elapsed " << time_elapsed
@@ -1567,7 +1572,8 @@ void
PublishVersionTaskPool::_publish_version_worker_thread_callback() {
++retry_time;
}
}
- if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() && !is_task_timeout) {
+ if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>() &&
!discontinuous_version_tablets.empty() &&
+ !is_task_timeout) {
continue;
}
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 6acb373928b..055fd13b46e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1074,6 +1074,9 @@ DEFINE_mInt64(LZ4_HC_compression_level, "9");
DEFINE_mBool(enable_merge_on_write_correctness_check, "true");
// rowid conversion correctness check when compaction for mow table
DEFINE_mBool(enable_rowid_conversion_correctness_check, "false");
+// When the number of missing versions is more than this value, do not directly
+// retry the publish and handle it through async publish.
+DEFINE_mInt32(mow_publish_max_discontinuous_version_num, "20");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 24a7340063d..7d4eaffda64 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1124,6 +1124,9 @@ DECLARE_mInt64(LZ4_HC_compression_level);
DECLARE_mBool(enable_merge_on_write_correctness_check);
// rowid conversion correctness check when compaction for mow table
DECLARE_mBool(enable_rowid_conversion_correctness_check);
+// When the number of missing versions is more than this value, do not directly
+// retry the publish and handle it through async publish.
+DECLARE_mInt32(mow_publish_max_discontinuous_version_num);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index aa11277a0eb..258606f5489 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -32,6 +32,7 @@
#include <mutex>
#include <ostream>
#include <random>
+#include <shared_mutex>
#include <string>
#include <type_traits>
#include <unordered_set>
@@ -240,7 +241,7 @@ Status StorageEngine::start_bg_threads() {
.build(&_tablet_publish_txn_thread_pool);
RETURN_IF_ERROR(Thread::create(
- "StorageEngine", "aync_publish_version_thread",
+ "StorageEngine", "async_publish_version_thread",
[this]() { this->_async_publish_callback(); },
&_async_publish_thread));
LOG(INFO) << "async publish thread started";
@@ -1236,6 +1237,20 @@ void StorageEngine::add_async_publish_task(int64_t
partition_id, int64_t tablet_
int64_t publish_version, int64_t
transaction_id,
bool is_recovery) {
if (!is_recovery) {
+ bool exists = false;
+ {
+ std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
+ if (auto tablet_iter = _async_publish_tasks.find(tablet_id);
+ tablet_iter != _async_publish_tasks.end()) {
+ if (auto iter = tablet_iter->second.find(publish_version);
+ iter != tablet_iter->second.end()) {
+ exists = true;
+ }
+ }
+ }
+ if (exists) {
+ return;
+ }
TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
if (tablet == nullptr) {
LOG(INFO) << "tablet may be dropped when add async publish task,
tablet_id: "
@@ -1252,12 +1267,12 @@ void StorageEngine::add_async_publish_task(int64_t
partition_id, int64_t tablet_
LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
<< " version: " << publish_version << " txn_id:" <<
transaction_id
<< " is_recovery: " << is_recovery;
- std::lock_guard<std::mutex> lock(_async_publish_mutex);
+ std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
_async_publish_tasks[tablet_id][publish_version] = {transaction_id,
partition_id};
}
int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
- std::lock_guard<std::mutex> lock(_async_publish_mutex);
+ std::shared_lock<std::shared_mutex> rlock(_async_publish_lock);
auto iter = _async_publish_tasks.find(tablet_id);
if (iter == _async_publish_tasks.end()) {
return INT64_MAX;
@@ -1268,58 +1283,67 @@ int64_t
StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
return iter->second.begin()->first;
}
-void StorageEngine::_async_publish_callback() {
- while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30)) &&
- !k_doris_exit) {
- // tablet, publish_version
- std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
- {
- std::lock_guard<std::mutex> lock(_async_publish_mutex);
- for (auto tablet_iter = _async_publish_tasks.begin();
- tablet_iter != _async_publish_tasks.end();) {
- if (tablet_iter->second.empty()) {
- tablet_iter = _async_publish_tasks.erase(tablet_iter);
- continue;
- }
- int64_t tablet_id = tablet_iter->first;
- TabletSharedPtr tablet =
tablet_manager()->get_tablet(tablet_id);
- if (!tablet) {
- LOG(WARNING) << "tablet does not exist when async publush,
tablet_id: "
- << tablet_id;
- tablet_iter = _async_publish_tasks.erase(tablet_iter);
- continue;
- }
+void StorageEngine::_process_async_publish() {
+ // tablet, publish_version
+ std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
+ {
+ std::unique_lock<std::shared_mutex> wlock(_async_publish_lock);
+ for (auto tablet_iter = _async_publish_tasks.begin();
+ tablet_iter != _async_publish_tasks.end();) {
+ if (tablet_iter->second.empty()) {
+ tablet_iter = _async_publish_tasks.erase(tablet_iter);
+ continue;
+ }
+ int64_t tablet_id = tablet_iter->first;
+ TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+ if (!tablet) {
+ LOG(WARNING) << "tablet does not exist when async publush,
tablet_id: "
+ << tablet_id;
+ tablet_iter = _async_publish_tasks.erase(tablet_iter);
+ continue;
+ }
- auto task_iter = tablet_iter->second.begin();
- int64_t version = task_iter->first;
- int64_t transaction_id = task_iter->second.first;
- int64_t partition_id = task_iter->second.second;
- int64_t max_version = tablet->max_version().second;
+ auto task_iter = tablet_iter->second.begin();
+ int64_t version = task_iter->first;
+ int64_t transaction_id = task_iter->second.first;
+ int64_t partition_id = task_iter->second.second;
+ int64_t max_version = tablet->max_version().second;
- if (version <= max_version) {
+ if (version <= max_version) {
+ need_removed_tasks.emplace_back(tablet, version);
+ tablet_iter->second.erase(task_iter);
+ tablet_iter++;
+ continue;
+ }
+ if (version != max_version + 1) {
+ // Keep only the most recent versions
+ while (tablet_iter->second.size() >
config::max_tablet_version_num) {
need_removed_tasks.emplace_back(tablet, version);
- tablet_iter->second.erase(task_iter);
- tablet_iter++;
- continue;
+ task_iter = tablet_iter->second.erase(task_iter);
+ version = task_iter->first;
}
- if (version != max_version + 1) {
- tablet_iter++;
- continue;
- }
-
- auto async_publish_task =
std::make_shared<AsyncTabletPublishTask>(
- tablet, partition_id, transaction_id, version);
-
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
- [=]() { async_publish_task->handle(); });
- tablet_iter->second.erase(task_iter);
- need_removed_tasks.emplace_back(tablet, version);
tablet_iter++;
+ continue;
}
+
+ auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
+ tablet, partition_id, transaction_id, version);
+ static_cast<void>(_tablet_publish_txn_thread_pool->submit_func(
+ [=]() { async_publish_task->handle(); }));
+ tablet_iter->second.erase(task_iter);
+ need_removed_tasks.emplace_back(tablet, version);
+ tablet_iter++;
}
- for (auto& [tablet, publish_version] : need_removed_tasks) {
- TabletMetaManager::remove_pending_publish_info(tablet->data_dir(),
tablet->tablet_id(),
- publish_version);
- }
+ }
+ for (auto& [tablet, publish_version] : need_removed_tasks) {
+ static_cast<void>(TabletMetaManager::remove_pending_publish_info(
+ tablet->data_dir(), tablet->tablet_id(), publish_version));
+ }
+}
+
+void StorageEngine::_async_publish_callback() {
+ while
(!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
+ _process_async_publish();
}
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 0a20416cb96..7215e2bb484 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -30,6 +30,7 @@
#include <memory>
#include <mutex>
#include <set>
+#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -344,6 +345,8 @@ private:
void _async_publish_callback();
+ void _process_async_publish();
+
Status _persist_broken_paths();
private:
@@ -495,7 +498,7 @@ private:
std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>>
_async_publish_tasks;
// aync publish for discontinuous versions of merge_on_write table
scoped_refptr<Thread> _async_publish_thread;
- std::mutex _async_publish_mutex;
+ std::shared_mutex _async_publish_lock;
bool _clear_segment_cache = false;
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index 3504d8a9280..e1a151c1973 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -179,8 +179,17 @@ Status EnginePublishVersionTask::finish() {
}
auto handle_version_not_continuous = [&]() {
add_error_tablet_id(tablet_info.tablet_id);
- _discontinuous_version_tablets->emplace_back(
- partition_id, tablet_info.tablet_id,
version.first);
+ // When there are too many missing versions, do not
directly retry the
+ // publish and handle it through async publish.
+ if (max_version +
config::mow_publish_max_discontinuous_version_num <
+ version.first) {
+ StorageEngine::instance()->add_async_publish_task(
+ partition_id, tablet_info.tablet_id,
version.first,
+ _publish_version_req.transaction_id,
false);
+ } else {
+ _discontinuous_version_tablets->emplace_back(
+ partition_id, tablet_info.tablet_id,
version.first);
+ }
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>(
"check_version_exist failed");
int64_t missed_version = max_version + 1;
diff --git a/be/test/olap/storage_engine_test.cpp
b/be/test/olap/storage_engine_test.cpp
index ebf572ef5f0..eb1fdc4e8c2 100644
--- a/be/test/olap/storage_engine_test.cpp
+++ b/be/test/olap/storage_engine_test.cpp
@@ -21,17 +21,16 @@
#include <gmock/gmock-matchers.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
#include <filesystem>
#include "common/status.h"
#include "gtest/gtest_pred_impl.h"
-#include "testutil/test_util.h"
-
-using ::testing::_;
-using ::testing::Return;
-using ::testing::SetArgPointee;
-using std::string;
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/tablet_manager.h"
+#include "util/threadpool.h"
namespace doris {
using namespace config;
@@ -39,14 +38,29 @@ using namespace config;
class StorageEngineTest : public testing::Test {
public:
virtual void SetUp() {
+ _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+ EXPECT_TRUE(
+
io::global_local_filesystem()->delete_and_create_directory(_engine_data_path).ok());
+ EXPECT_TRUE(
+
io::global_local_filesystem()->create_directory(_engine_data_path +
"/meta").ok());
+ _data_dir.reset(new DataDir(_engine_data_path, 100000000));
+ static_cast<void>(_data_dir->init());
+
EngineOptions options;
+ options.backend_uid = UniqueId::gen_uid();
_storage_engine.reset(new StorageEngine(options));
+ ExecEnv::GetInstance()->set_storage_engine(_storage_engine.get());
}
- virtual void TearDown() {}
+ virtual void TearDown() {
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
std::unique_ptr<StorageEngine> _storage_engine;
+ std::string _engine_data_path;
+ std::unique_ptr<DataDir> _data_dir;
};
TEST_F(StorageEngineTest, TestBrokenDisk) {
@@ -86,4 +100,80 @@ TEST_F(StorageEngineTest, TestBrokenDisk) {
}
}
+TEST_F(StorageEngineTest, TestAsyncPublish) {
+ auto st = ThreadPoolBuilder("TabletPublishTxnThreadPool")
+ .set_min_threads(config::tablet_publish_txn_max_thread)
+ .set_max_threads(config::tablet_publish_txn_max_thread)
+
.build(&_storage_engine->tablet_publish_txn_thread_pool());
+ EXPECT_EQ(st, Status::OK());
+
+ int64_t partition_id = 1;
+ int64_t tablet_id = 111;
+
+ TColumnType col_type;
+ col_type.__set_type(TPrimitiveType::SMALLINT);
+ TColumn col1;
+ col1.__set_column_name("col1");
+ col1.__set_column_type(col_type);
+ col1.__set_is_key(true);
+ std::vector<TColumn> cols;
+ cols.push_back(col1);
+ TTabletSchema tablet_schema;
+ tablet_schema.__set_short_key_column_count(1);
+ tablet_schema.__set_schema_hash(3333);
+ tablet_schema.__set_keys_type(TKeysType::AGG_KEYS);
+ tablet_schema.__set_storage_type(TStorageType::COLUMN);
+ tablet_schema.__set_columns(cols);
+ TCreateTabletReq create_tablet_req;
+ create_tablet_req.__set_tablet_schema(tablet_schema);
+ create_tablet_req.__set_tablet_id(tablet_id);
+ create_tablet_req.__set_version(10);
+
+ std::vector<DataDir*> data_dirs;
+ data_dirs.push_back(_data_dir.get());
+ RuntimeProfile profile("CreateTablet");
+ st = _storage_engine->tablet_manager()->create_tablet(create_tablet_req,
data_dirs, &profile);
+ EXPECT_EQ(st, Status::OK());
+ TabletSharedPtr tablet =
_storage_engine->tablet_manager()->get_tablet(tablet_id);
+ EXPECT_EQ(tablet->max_version().second, 10);
+
+ for (int64_t i = 5; i < 12; ++i) {
+ _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i,
false);
+ }
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7);
+ EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 5);
+ for (int64_t i = 1; i < 8; ++i) {
+ _storage_engine->_process_async_publish();
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(), 7 -
i);
+ }
+ _storage_engine->_process_async_publish();
+ EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0);
+
+ for (int64_t i = 100; i < config::max_tablet_version_num + 120; ++i) {
+ _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i,
false);
+ }
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
+ config::max_tablet_version_num + 20);
+
+ for (int64_t i = 90; i < 120; ++i) {
+ _storage_engine->add_async_publish_task(partition_id, tablet_id, i, i,
false);
+ }
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
+ config::max_tablet_version_num + 30);
+ EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id), 90);
+
+ _storage_engine->_process_async_publish();
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
+ config::max_tablet_version_num);
+ EXPECT_EQ(_storage_engine->get_pending_publish_min_version(tablet_id),
120);
+
+ st = _storage_engine->tablet_manager()->drop_tablet(tablet_id, 0, false);
+ EXPECT_EQ(st, Status::OK());
+
+ EXPECT_EQ(_storage_engine->_async_publish_tasks[tablet_id].size(),
+ config::max_tablet_version_num);
+ _storage_engine->_process_async_publish();
+ EXPECT_EQ(_storage_engine->_async_publish_tasks.size(), 0);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]