This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new f96bc62 [feature](balance) Support balance between disks on a single
BE (#8553)
f96bc62 is described below
commit f96bc6257324ca329bf55907d36782c4a15aa7b3
Author: yinzhijian <[email protected]>
AuthorDate: Mon Mar 28 10:03:21 2022 +0800
[feature](balance) Support balance between disks on a single BE (#8553)
Current situation of Doris is that the cluster is balanced, but the disks
of a backend may be unbalanced.
for example, backend A have two disks: disk1 and disk2, disk1's usage is
98%, but disk2's usage is only 40%.
disk1 is unable to take more data, therefore only one disk of backend A can
take new data,
the available write throughput of backend A is only half of its ability,
and we can not resolve this through load or
partition rebalance now.
So we introduce disk rebalancer, disk rebalancer is different from other
rebalancer(load or partition)
which take care of cluster-wide data balancing. it takes care about
backend-wide data balancing.
[For more details see
#8550](https://github.com/apache/incubator-doris/issues/8550)
---
be/src/agent/task_worker_pool.cpp | 13 +-
be/src/agent/task_worker_pool.h | 2 +-
be/src/common/config.h | 5 +
be/src/olap/task/engine_storage_migration_task.cpp | 316 ++++++++++++++-----
be/src/olap/task/engine_storage_migration_task.h | 20 +-
be/test/olap/CMakeLists.txt | 1 +
.../olap/engine_storage_migration_task_test.cpp | 302 +++++++++++++++++++
docs/.vuepress/sidebar/en.js | 2 +
docs/.vuepress/sidebar/zh-CN.js | 2 +
.../Administration/ADMIN CANCEL REBALANCE DISK.md | 51 ++++
.../Administration/ADMIN REBALANCE DISK.md | 52 ++++
.../Administration/ADMIN CANCEL REBALANCE DISK.md | 52 ++++
.../Administration/ADMIN REBALANCE DISK.md | 54 ++++
fe/fe-core/src/main/cup/sql_parser.cup | 20 +-
.../analysis/AdminCancelRebalanceDiskStmt.java | 73 +++++
.../doris/analysis/AdminRebalanceDiskStmt.java | 79 +++++
.../apache/doris/clone/BackendLoadStatistic.java | 58 +++-
.../org/apache/doris/clone/DiskRebalancer.java | 334 +++++++++++++++++++++
.../java/org/apache/doris/clone/Rebalancer.java | 33 +-
.../org/apache/doris/clone/TabletSchedCtx.java | 65 ++++
.../org/apache/doris/clone/TabletScheduler.java | 123 +++++++-
.../main/java/org/apache/doris/common/Config.java | 6 +
.../java/org/apache/doris/master/MasterImpl.java | 17 +-
.../org/apache/doris/master/ReportHandler.java | 4 +-
.../main/java/org/apache/doris/qe/DdlExecutor.java | 6 +
.../doris/task/StorageMediaMigrationTask.java | 15 +
fe/fe-core/src/main/jflex/sql_scanner.flex | 2 +
.../analysis/AdminCancelRebalanceDiskStmtTest.java | 82 +++++
.../doris/analysis/AdminRebalanceDiskStmtTest.java | 83 +++++
.../{RebalanceTest.java => DiskRebalanceTest.java} | 228 ++++++--------
.../java/org/apache/doris/clone/RebalanceTest.java | 29 +-
.../org/apache/doris/clone/RebalancerTestUtil.java | 32 +-
.../java/org/apache/doris/task/AgentTaskTest.java | 15 +
33 files changed, 1924 insertions(+), 252 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 29219fc..a10d2c4 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -941,8 +941,8 @@ void
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
TStatusCode::type status_code = TStatusCode::OK;
// check request and get info
TabletSharedPtr tablet;
- DataDir* dest_store;
- if (_check_migrate_requset(storage_medium_migrate_req, tablet,
&dest_store) !=
+ DataDir* dest_store = nullptr;
+ if (_check_migrate_request(storage_medium_migrate_req, tablet,
&dest_store) !=
OLAP_SUCCESS) {
status_code = TStatusCode::RUNTIME_ERROR;
} else {
@@ -953,7 +953,7 @@ void
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
<< ", signature: " << agent_task_req.signature;
status_code = TStatusCode::RUNTIME_ERROR;
} else {
- LOG(INFO) << "storage media migrate success. status:" << res
<< ","
+ LOG(INFO) << "storage media migrate success. status:" << res
<< ", signature:" << agent_task_req.signature;
}
}
@@ -974,7 +974,7 @@ void
TaskWorkerPool::_storage_medium_migrate_worker_thread_callback() {
}
}
-OLAPStatus TaskWorkerPool::_check_migrate_requset(const
TStorageMediumMigrateReq& req,
+OLAPStatus TaskWorkerPool::_check_migrate_request(const
TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store) {
int64_t tablet_id = req.tablet_id;
int32_t schema_hash = req.schema_hash;
@@ -1020,6 +1020,11 @@ OLAPStatus TaskWorkerPool::_check_migrate_requset(const
TStorageMediumMigrateReq
*dest_store = stores[0];
}
+ if (tablet->data_dir()->path() == (*dest_store)->path()) {
+ LOG(INFO) << "tablet is already on specified path. "
+ << "path=" << tablet->data_dir()->path();
+ return OLAP_REQUEST_FAILED;
+ }
// check disk capacity
int64_t tablet_size = tablet->tablet_footprint();
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3cffbc7..4181d0c 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -208,7 +208,7 @@ private:
Status _move_dir(const TTabletId tablet_id, const TSchemaHash schema_hash,
const std::string& src, int64_t job_id, bool
overwrite);
- OLAPStatus _check_migrate_requset(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
+ OLAPStatus _check_migrate_request(const TStorageMediumMigrateReq& req,
TabletSharedPtr& tablet,
DataDir** dest_store);
// random sleep 1~second seconds
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 438a4a3..fe92b6b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -332,6 +332,11 @@ CONF_Int32(min_tablet_migration_threads, "1");
CONF_Int32(max_tablet_migration_threads, "1");
CONF_mInt32(finished_migration_tasks_size, "10000");
+// If size less than this, the remaining rowsets will be force to complete
+CONF_mInt32(migration_remaining_size_threshold_mb, "10");
+// If the task runs longer than this time, the task will be terminated, in
seconds.
+// tablet max size / migration min speed * factor = 10GB / 1MBps * 2 = 20480
seconds
+CONF_mInt32(migration_task_timeout_secs, "20480");
// Port to start debug webserver on
CONF_Int32(webserver_port, "8040");
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 486730a..a3724a2 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -17,6 +17,8 @@
#include "olap/task/engine_storage_migration_task.h"
+#include <ctime>
+
#include "olap/snapshot_manager.h"
#include "olap/tablet_meta_manager.h"
@@ -24,146 +26,298 @@ namespace doris {
using std::stringstream;
+const int CHECK_TXNS_MAX_WAIT_TIME_SECS = 60;
+
EngineStorageMigrationTask::EngineStorageMigrationTask(const TabletSharedPtr&
tablet,
DataDir* dest_store)
- : _tablet(tablet), _dest_store(dest_store) {}
+ : _tablet(tablet), _dest_store(dest_store) {
+ _task_start_time = time(nullptr);
+ }
OLAPStatus EngineStorageMigrationTask::execute() {
return _migrate();
}
-OLAPStatus EngineStorageMigrationTask::_migrate() {
- int64_t tablet_id = _tablet->tablet_id();
- int32_t schema_hash = _tablet->schema_hash();
- LOG(INFO) << "begin to process tablet migrate. "
- << "tablet_id=" << tablet_id << ", dest_store=" <<
_dest_store->path();
+OLAPStatus EngineStorageMigrationTask::_get_versions(int32_t start_version,
int32_t* end_version,
+ std::vector<RowsetSharedPtr>
*consistent_rowsets) {
+ ReadLock rdlock(_tablet->get_header_lock());
+ const RowsetSharedPtr last_version = _tablet->rowset_with_max_version();
+ if (last_version == nullptr) {
+ LOG(WARNING) << "failed to get rowset with max version, tablet="
+ << _tablet->full_name();
+ return OLAP_ERR_VERSION_NOT_EXIST;
+ }
- DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+ *end_version = last_version->end_version();
+ if (*end_version < start_version) {
+ // rowsets are empty
+ VLOG_DEBUG << "consistent rowsets empty. tablet=" <<
_tablet->full_name()
+ << ", start_version=" << start_version << ",
end_version=" << *end_version;
+ return OLAP_SUCCESS;
+ }
+ _tablet->capture_consistent_rowsets(Version(start_version, *end_version),
consistent_rowsets);
+ if (consistent_rowsets->empty()) {
+ LOG(WARNING) << "fail to capture consistent rowsets. tablet=" <<
_tablet->full_name()
+ << ", version=" << *end_version;
+ return OLAP_ERR_VERSION_NOT_EXIST;
+ }
+ return OLAP_SUCCESS;
+}
- // try hold migration lock first
- OLAPStatus res = OLAP_SUCCESS;
- UniqueWriteLock migration_wlock(_tablet->get_migration_lock(),
std::try_to_lock);
- if (!migration_wlock.owns_lock()) {
- return OLAP_ERR_RWLOCK_ERROR;
+bool EngineStorageMigrationTask::_is_timeout() {
+ int64_t time_elapsed = time(nullptr) - _task_start_time;
+ if (time_elapsed > config::migration_task_timeout_secs) {
+ LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" <<
time_elapsed
+ << ", tablet=" << _tablet->full_name();
+ return true;
}
+ return false;
+}
- // check if this tablet has related running txns. if yes, can not do
migration.
+OLAPStatus EngineStorageMigrationTask::_check_running_txns() {
+ // need hold migration lock outside
int64_t partition_id;
std::set<int64_t> transaction_ids;
+ // check if this tablet has related running txns. if yes, can not do
migration.
StorageEngine::instance()->txn_manager()->get_tablet_related_txns(
- tablet_id, schema_hash, _tablet->tablet_uid(), &partition_id,
&transaction_ids);
+ _tablet->tablet_id(), _tablet->schema_hash(),
_tablet->tablet_uid(), &partition_id, &transaction_ids);
if (transaction_ids.size() > 0) {
- LOG(WARNING) << "could not migration because has unfinished txns, "
- << " tablet=" << _tablet->full_name();
return OLAP_ERR_HEADER_HAS_PENDING_DATA;
}
+ return OLAP_SUCCESS;
+}
- std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
- // TODO(ygl): the tablet should not under schema change or rollup or load
+OLAPStatus
EngineStorageMigrationTask::_check_running_txns_until_timeout(UniqueWriteLock*
migration_wlock) {
+ // caller should not hold migration lock, and 'migration_wlock' should not
be nullptr
+ // ownership of the migration_wlock is transferred to the caller if check
succ
+ DCHECK_NE(migration_wlock, nullptr);
+ OLAPStatus res = OLAP_SUCCESS;
+ int try_times = 1;
do {
- std::vector<RowsetSharedPtr> consistent_rowsets;
- {
- ReadLock rdlock(_tablet->get_header_lock());
- // get all versions to be migrate
- const RowsetSharedPtr last_version =
_tablet->rowset_with_max_version();
- if (last_version == nullptr) {
- res = OLAP_ERR_VERSION_NOT_EXIST;
- LOG(WARNING) << "failed to get rowset with max version,
tablet="
- << _tablet->full_name();
- break;
- }
- int32_t end_version = last_version->end_version();
- res = _tablet->capture_consistent_rowsets(Version(0, end_version),
&consistent_rowsets);
- if (consistent_rowsets.empty()) {
- res = OLAP_ERR_VERSION_NOT_EXIST;
- LOG(WARNING) << "fail to capture consistent rowsets. tablet="
<< _tablet->full_name()
- << ", version=" << end_version;
- break;
- }
+ // to avoid invalid loops, the lock is guaranteed to be acquired here
+ UniqueWriteLock wlock(_tablet->get_migration_lock());
+ res = _check_running_txns();
+ if (res == OLAP_SUCCESS) {
+ // transfer the lock to the caller
+ *migration_wlock = std::move(wlock);
+ return res;
+ }
+ LOG(INFO) << "check running txns fail, try again until timeout."
+ << " tablet=" << _tablet->full_name()
+ << ", try times=" << try_times
+ << ", res=" << res;
+ // unlock and sleep for a while, try again
+ wlock.unlock();
+ sleep(std::min(config::sleep_one_second * try_times,
CHECK_TXNS_MAX_WAIT_TIME_SECS));
+ ++try_times;
+ } while (!_is_timeout());
+ return res;
+}
+
+OLAPStatus EngineStorageMigrationTask::_gen_and_write_header_to_hdr_file(
+ uint64_t shard,
+ const std::string& full_path,
+ const std::vector<RowsetSharedPtr>&
consistent_rowsets) {
+ // need hold migration lock and push lock outside
+ OLAPStatus res = OLAP_SUCCESS;
+ int64_t tablet_id = _tablet->tablet_id();
+ int32_t schema_hash = _tablet->schema_hash();
+ TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta());
+ {
+ ReadLock rdlock(_tablet->get_header_lock());
+ _generate_new_header(shard, consistent_rowsets, new_tablet_meta);
+ }
+ std::string new_meta_file = full_path + "/" + std::to_string(tablet_id) +
".hdr";
+ res = new_tablet_meta->save(new_meta_file);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "failed to save meta to path: " << new_meta_file;
+ return res;
+ }
+
+ // reset tablet id and rowset id
+ res = TabletMeta::reset_tablet_uid(new_meta_file);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file;
+ return res;
+ }
+ // it will change rowset id and its create time
+ // rowset create time is useful when load tablet from meta to check which
tablet is the tablet to load
+ res = SnapshotManager::instance()->convert_rowset_ids(full_path,
tablet_id, schema_hash);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "failed to convert rowset id when do storage migration"
+ << " path = " << full_path;
+ return res;
+ }
+ return res;
+}
+
+OLAPStatus EngineStorageMigrationTask::_reload_tablet(
+ const std::string& full_path) {
+ // need hold migration lock and push lock outside
+ OLAPStatus res = OLAP_SUCCESS;
+ int64_t tablet_id = _tablet->tablet_id();
+ int32_t schema_hash = _tablet->schema_hash();
+ res = StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
+ _dest_store, tablet_id, schema_hash, full_path, false);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "failed to load tablet from new path. tablet_id=" <<
tablet_id
+ << " schema_hash=" << schema_hash << " path = " <<
full_path;
+ return res;
+ }
+
+ // if old tablet finished schema change, then the schema change status of
the new tablet is DONE
+ // else the schema change status of the new tablet is FAILED
+ TabletSharedPtr new_tablet =
+ StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id,
schema_hash);
+ if (new_tablet == nullptr) {
+ LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id
+ << " schema_hash=" << schema_hash;
+ return OLAP_ERR_TABLE_NOT_FOUND;
+ }
+ return res;
+}
+
+// if the size less than threshold, return true
+bool EngineStorageMigrationTask::_is_rowsets_size_less_than_threshold(
+ const std::vector<RowsetSharedPtr>& consistent_rowsets) {
+ size_t total_size = 0;
+ for (const auto& rs : consistent_rowsets) {
+ total_size += rs->index_disk_size() + rs->data_disk_size();
+ }
+ if (total_size < config::migration_remaining_size_threshold_mb) {
+ return true;
+ }
+ return false;
+}
+
+OLAPStatus EngineStorageMigrationTask::_migrate() {
+ int64_t tablet_id = _tablet->tablet_id();
+ LOG(INFO) << "begin to process tablet migrate. "
+ << "tablet_id=" << tablet_id << ", dest_store=" <<
_dest_store->path();
+
+ DorisMetrics::instance()->storage_migrate_requests_total->increment(1);
+ int32_t start_version = 0;
+ int32_t end_version = 0;
+ std::vector<RowsetSharedPtr> consistent_rowsets;
+
+ // try hold migration lock first
+ OLAPStatus res = OLAP_SUCCESS;
+ uint64_t shard = 0;
+ string full_path;
+ {
+ UniqueWriteLock migration_wlock(_tablet->get_migration_lock(),
std::try_to_lock);
+ if (!migration_wlock.owns_lock()) {
+ return OLAP_ERR_RWLOCK_ERROR;
+ }
+
+ // check if this tablet has related running txns. if yes, can not do
migration.
+ res = _check_running_txns();
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "could not migration because has unfinished txns, "
+ << " tablet=" << _tablet->full_name();
+ return res;
}
- uint64_t shard = 0;
+ std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
+ // get versions to be migrate
+ res = _get_versions(start_version, &end_version, &consistent_rowsets);
+ if (res != OLAP_SUCCESS) {
+ return res;
+ }
+
+ // TODO(ygl): the tablet should not under schema change or rollup or
load
res = _dest_store->get_shard(&shard);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to get shard from store: " <<
_dest_store->path();
- break;
+ return res;
}
FilePathDescStream root_path_desc_s;
root_path_desc_s << _dest_store->path_desc() << DATA_PREFIX << "/" <<
shard;
FilePathDesc full_path_desc =
SnapshotManager::instance()->get_schema_hash_full_path(
_tablet, root_path_desc_s.path_desc());
- string full_path = full_path_desc.filepath;
+ full_path = full_path_desc.filepath;
// if dir already exist then return err, it should not happen.
// should not remove the dir directly, for safety reason.
if (FileUtils::check_exist(full_path)) {
LOG(INFO) << "schema hash path already exist, skip this path. "
- << "full_path=" << full_path;
- res = OLAP_ERR_FILE_ALREADY_EXIST;
- break;
+ << "full_path=" << full_path;
+ return OLAP_ERR_FILE_ALREADY_EXIST;
}
Status st = FileUtils::create_dir(full_path);
if (!st.ok()) {
res = OLAP_ERR_CANNOT_CREATE_DIR;
LOG(WARNING) << "fail to create path. path=" << full_path
- << ", error:" << st.to_string();
- break;
+ << ", error:" << st.to_string();
+ return res;
}
+ }
+ std::vector<RowsetSharedPtr> temp_consistent_rowsets(consistent_rowsets);
+ do {
// migrate all index and data files but header file
- res = _copy_index_and_data_files(full_path, consistent_rowsets);
+ res = _copy_index_and_data_files(full_path, temp_consistent_rowsets);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "fail to copy index and data files when migrate.
res=" << res;
break;
}
-
- // generate new tablet meta and write to hdr file
- TabletMetaSharedPtr new_tablet_meta(new (std::nothrow) TabletMeta());
- {
- ReadLock rdlock(_tablet->get_header_lock());
- _generate_new_header(shard, consistent_rowsets, new_tablet_meta);
- }
- std::string new_meta_file = full_path + "/" +
std::to_string(tablet_id) + ".hdr";
- res = new_tablet_meta->save(new_meta_file);
+ UniqueWriteLock migration_wlock;
+ res = _check_running_txns_until_timeout(&migration_wlock);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "failed to save meta to path: " << new_meta_file;
break;
}
-
- // reset tablet id and rowset id
- res = TabletMeta::reset_tablet_uid(new_meta_file);
+ std::lock_guard<std::mutex> lock(_tablet->get_push_lock());
+ start_version = end_version;
+ // clear temp rowsets before get remaining versions
+ temp_consistent_rowsets.clear();
+ // get remaining versions
+ res = _get_versions(end_version + 1, &end_version,
&temp_consistent_rowsets);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "errors while set tablet uid: '" << new_meta_file;
break;
}
- // it will change rowset id and its create time
- // rowset create time is useful when load tablet from meta to check
which tablet is the tablet to load
- res = SnapshotManager::instance()->convert_rowset_ids(full_path,
tablet_id, schema_hash);
- if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "failed to convert rowset id when do storage
migration"
- << " path = " << full_path;
- break;
+ if (start_version < end_version) {
+ // we have remaining versions to be migrated
+ consistent_rowsets.insert(consistent_rowsets.end(),
+ temp_consistent_rowsets.begin(),
temp_consistent_rowsets.end());
+ LOG(INFO) << "we have remaining versions to be migrated.
start_version="
+ << start_version << " end_version=" << end_version;
+ // if the remaining size is less than
config::migration_remaining_size_threshold_mb(default 10MB),
+ // we take the lock to complete it to avoid long-term competition
with other tasks
+ if (_is_rowsets_size_less_than_threshold(temp_consistent_rowsets))
{
+ // force to copy the remaining data and index
+ res = _copy_index_and_data_files(full_path,
temp_consistent_rowsets);
+ if (res != OLAP_SUCCESS) {
+ LOG(WARNING) << "fail to copy the remaining index and data
files when migrate. res=" << res;
+ break;
+ }
+ } else {
+ if (_is_timeout()) {
+ res = OLAP_ERR_HEADER_HAS_PENDING_DATA;
+ break;
+ }
+ // there is too much remaining data here.
+ // in order not to affect other tasks, release the lock and
then copy it
+ continue;
+ }
}
- res =
StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
- _dest_store, tablet_id, schema_hash, full_path, false);
+ // generate new tablet meta and write to hdr file
+ res = _gen_and_write_header_to_hdr_file(shard, full_path,
consistent_rowsets);
if (res != OLAP_SUCCESS) {
- LOG(WARNING) << "failed to load tablet from new path. tablet_id="
<< tablet_id
- << " schema_hash=" << schema_hash << " path = " <<
full_path;
break;
}
-
- // if old tablet finished schema change, then the schema change status
of the new tablet is DONE
- // else the schema change status of the new tablet is FAILED
- TabletSharedPtr new_tablet =
-
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, schema_hash);
- if (new_tablet == nullptr) {
- LOG(WARNING) << "tablet not found. tablet_id=" << tablet_id
- << " schema_hash=" << schema_hash;
- res = OLAP_ERR_TABLE_NOT_FOUND;
+ res = _reload_tablet(full_path);
+ if (res != OLAP_SUCCESS) {
break;
}
- } while (0);
+
+ break;
+ } while (true);
+
+ if (res != OLAP_SUCCESS) {
+ // we should remove the dir directly for avoid disk full of junk data,
and it's safe to remove
+ FileUtils::remove_all(full_path);
+ }
return res;
}
diff --git a/be/src/olap/task/engine_storage_migration_task.h
b/be/src/olap/task/engine_storage_migration_task.h
index 3009f4c..ffd42a4 100644
--- a/be/src/olap/task/engine_storage_migration_task.h
+++ b/be/src/olap/task/engine_storage_migration_task.h
@@ -36,6 +36,24 @@ public:
private:
OLAPStatus _migrate();
+ // check if task is timeout
+ bool _is_timeout();
+ OLAPStatus _get_versions(int32_t start_version,
+ int32_t* end_version,
+ std::vector<RowsetSharedPtr>
*consistent_rowsets);
+ OLAPStatus _check_running_txns();
+ // caller should not hold migration lock, and 'migration_wlock' should not
be nullptr
+ // ownership of the migration lock is transferred to the caller if check
succ
+ OLAPStatus _check_running_txns_until_timeout(UniqueWriteLock*
migration_wlock);
+
+ // if the size less than threshold, return true
+ bool _is_rowsets_size_less_than_threshold(const
std::vector<RowsetSharedPtr>& consistent_rowsets);
+
+ OLAPStatus _gen_and_write_header_to_hdr_file(
+ uint64_t shard,
+ const std::string& full_path,
+ const std::vector<RowsetSharedPtr>&
consistent_rowsets);
+ OLAPStatus _reload_tablet(const std::string& full_path);
void _generate_new_header(uint64_t new_shard,
const std::vector<RowsetSharedPtr>&
consistent_rowsets,
@@ -52,7 +70,7 @@ private:
TabletSharedPtr _tablet;
// destination data dir
DataDir* _dest_store;
-
+ int64_t _task_start_time;
}; // EngineTask
} // namespace doris
diff --git a/be/test/olap/CMakeLists.txt b/be/test/olap/CMakeLists.txt
index 5e4d15f..81a551f 100644
--- a/be/test/olap/CMakeLists.txt
+++ b/be/test/olap/CMakeLists.txt
@@ -26,6 +26,7 @@ ADD_BE_TEST(row_block_test)
ADD_BE_TEST(row_block_v2_test)
ADD_BE_TEST(bit_field_test)
ADD_BE_TEST(byte_buffer_test)
+ADD_BE_TEST(engine_storage_migration_task_test)
ADD_BE_TEST(run_length_byte_test)
ADD_BE_TEST(run_length_integer_test)
ADD_BE_TEST(stream_index_test)
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp
b/be/test/olap/engine_storage_migration_task_test.cpp
new file mode 100644
index 0000000..68c88f8
--- /dev/null
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_storage_migration_task.h"
+
+#include <gtest/gtest.h>
+#include <sys/file.h>
+
+#include <string>
+
+#include "gen_cpp/Descriptors_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/Types_types.h"
+#include "olap/delta_writer.h"
+#include "olap/field.h"
+#include "olap/options.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta_manager.h"
+#include "olap/utils.h"
+#include "runtime/descriptor_helper.h"
+#include "runtime/exec_env.h"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/tuple.h"
+#include "util/file_utils.h"
+#include "util/logging.h"
+
+namespace doris {
+
+static const uint32_t MAX_PATH_LEN = 1024;
+
+StorageEngine* k_engine = nullptr;
+std::string path1;
+std::string path2;
+
+void set_up() {
+ char buffer[MAX_PATH_LEN];
+ ASSERT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ path1 = std::string(buffer) + "/data_test_1";
+ path2 = std::string(buffer) + "/data_test_2";
+ config::storage_root_path = path1 + ";" + path2;
+ FileUtils::remove_all(path1);
+ FileUtils::create_dir(path1);
+
+ FileUtils::remove_all(path2);
+ FileUtils::create_dir(path2);
+ std::vector<StorePath> paths;
+ paths.emplace_back(path1, -1);
+ paths.emplace_back(path2, -1);
+
+ doris::EngineOptions options;
+ options.store_paths = paths;
+ Status s = doris::StorageEngine::open(options, &k_engine);
+ ASSERT_TRUE(s.ok()) << s.to_string();
+
+ ExecEnv* exec_env = doris::ExecEnv::GetInstance();
+ exec_env->set_storage_engine(k_engine);
+ k_engine->start_bg_threads();
+}
+
+void tear_down() {
+ if (k_engine != nullptr) {
+ k_engine->stop();
+ delete k_engine;
+ k_engine = nullptr;
+ }
+ ASSERT_EQ(system("rm -rf ./data_test_1"), 0);
+ ASSERT_EQ(system("rm -rf ./data_test_2"), 0);
+ FileUtils::remove_all(std::string(getenv("DORIS_HOME")) + UNUSED_PREFIX);
+}
+
+void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t
schema_hash,
+ TCreateTabletReq* request) {
+ request->tablet_id = tablet_id;
+ request->__set_version(1);
+ request->tablet_schema.schema_hash = schema_hash;
+ request->tablet_schema.short_key_column_count = 2;
+ request->tablet_schema.keys_type = TKeysType::UNIQUE_KEYS;
+ request->tablet_schema.storage_type = TStorageType::COLUMN;
+ request->tablet_schema.__set_sequence_col_idx(2);
+
+ TColumn k1;
+ k1.column_name = "k1";
+ k1.__set_is_key(true);
+ k1.column_type.type = TPrimitiveType::TINYINT;
+ request->tablet_schema.columns.push_back(k1);
+
+ TColumn k2;
+ k2.column_name = "k2";
+ k2.__set_is_key(true);
+ k2.column_type.type = TPrimitiveType::SMALLINT;
+ request->tablet_schema.columns.push_back(k2);
+
+ TColumn sequence_col;
+ sequence_col.column_name = SEQUENCE_COL;
+ sequence_col.__set_is_key(false);
+ sequence_col.column_type.type = TPrimitiveType::INT;
+ sequence_col.__set_aggregation_type(TAggregationType::REPLACE);
+ request->tablet_schema.columns.push_back(sequence_col);
+
+ TColumn v1;
+ v1.column_name = "v1";
+ v1.__set_is_key(false);
+ v1.column_type.type = TPrimitiveType::DATETIME;
+ v1.__set_aggregation_type(TAggregationType::REPLACE);
+ request->tablet_schema.columns.push_back(v1);
+}
+
+TDescriptorTable create_descriptor_tablet_with_sequence_col() {
+ TDescriptorTableBuilder dtb;
+ TTupleDescriptorBuilder tuple_builder;
+
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_TINYINT).column_name("k1").column_pos(0).build());
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("k2").column_pos(1).build());
+ tuple_builder.add_slot(TSlotDescriptorBuilder()
+ .type(TYPE_INT)
+ .column_name(SEQUENCE_COL)
+ .column_pos(2)
+ .build());
+ tuple_builder.add_slot(
+
TSlotDescriptorBuilder().type(TYPE_DATETIME).column_name("v1").column_pos(3).build());
+ tuple_builder.build(&dtb);
+
+ return dtb.desc_tbl();
+}
+
+class TestEngineStorageMigrationTask : public ::testing::Test {
+public:
+ TestEngineStorageMigrationTask() {}
+ ~TestEngineStorageMigrationTask() {}
+
+ void SetUp() {
+ // Create local data dir for StorageEngine.
+ std::cout << "setup" << std::endl;
+ }
+
+ void TearDown() {
+ // Remove all dir.
+ std::cout << "tear down" << std::endl;
+ //doris::tear_down();
+ //ASSERT_EQ(OLAP_SUCCESS, remove_all_dir(config::storage_root_path));
+ }
+};
+
+TEST_F(TestEngineStorageMigrationTask, write_and_migration) {
+ TCreateTabletReq request;
+ create_tablet_request_with_sequence_col(10005, 270068377, &request);
+ OLAPStatus res = k_engine->create_tablet(request);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+
+ TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+ ObjectPool obj_pool;
+ DescriptorTbl* desc_tbl = nullptr;
+ DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+ TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+ const std::vector<SlotDescriptor*>& slots = tuple_desc->slots();
+
+ PUniqueId load_id;
+ load_id.set_hi(0);
+ load_id.set_lo(0);
+ WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
+ 30003, load_id, tuple_desc,
&(tuple_desc->slots())};
+ DeltaWriter* delta_writer = nullptr;
+ DeltaWriter::open(&write_req, &delta_writer);
+ ASSERT_NE(delta_writer, nullptr);
+
+ MemTracker tracker;
+ MemPool pool(&tracker);
+ // Tuple 1
+ {
+ Tuple* tuple =
reinterpret_cast<Tuple*>(pool.allocate(tuple_desc->byte_size()));
+ memset(tuple, 0, tuple_desc->byte_size());
+ *(int8_t*)(tuple->get_slot(slots[0]->tuple_offset())) = 123;
+ *(int16_t*)(tuple->get_slot(slots[1]->tuple_offset())) = 456;
+ *(int32_t*)(tuple->get_slot(slots[2]->tuple_offset())) = 1;
+ ((DateTimeValue*)(tuple->get_slot(slots[3]->tuple_offset())))
+ ->from_date_str("2020-07-16 19:39:43", 19);
+
+ res = delta_writer->write(tuple);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ }
+
+ res = delta_writer->close();
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ res = delta_writer->close_wait(nullptr, false);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+
+ // publish version success
+ TabletSharedPtr tablet =
+ k_engine->tablet_manager()->get_tablet(write_req.tablet_id,
write_req.schema_hash);
+ std::cout << "before publish, tablet row nums:" << tablet->num_rows() <<
std::endl;
+ OlapMeta* meta = tablet->data_dir()->get_meta();
+ Version version;
+ version.first = tablet->rowset_with_max_version()->end_version() + 1;
+ version.second = tablet->rowset_with_max_version()->end_version() + 1;
+ std::cout << "start to add rowset version:" << version.first << "-" <<
version.second
+ << std::endl;
+ std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+ StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+ write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+ for (auto& tablet_rs : tablet_related_rs) {
+ std::cout << "start to publish txn" << std::endl;
+ RowsetSharedPtr rowset = tablet_rs.second;
+ res = k_engine->txn_manager()->publish_txn(meta,
write_req.partition_id, write_req.txn_id,
+ write_req.tablet_id,
write_req.schema_hash,
+ tablet_rs.first.tablet_uid,
version);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ std::cout << "start to add inc rowset:" << rowset->rowset_id()
+ << ", num rows:" << rowset->num_rows() << ", version:" <<
rowset->version().first
+ << "-" << rowset->version().second << std::endl;
+ res = tablet->add_inc_rowset(rowset);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ }
+ ASSERT_EQ(1, tablet->num_rows());
+ // we should sleep 1 second for the migrated tablet has different time
with the current tablet
+ sleep(1);
+
+ // test case 1
+ // prepare
+ DataDir* dest_store = nullptr;
+ if (tablet->data_dir()->path() == path1) {
+ dest_store = StorageEngine::instance()->get_store(path2);
+ } else if (tablet->data_dir()->path() == path2) {
+ dest_store = StorageEngine::instance()->get_store(path1);
+ }
+ ASSERT_NE(dest_store, nullptr);
+ std::cout << "dest store:" << dest_store->path() << std::endl;
+ // migrating
+ EngineStorageMigrationTask engine_task(tablet, dest_store);
+ res = engine_task.execute();
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ // reget the tablet from manager after migration
+ auto tablet_id = 10005;
+ auto schema_hash = 270068377;
+ TabletSharedPtr tablet2 =
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+ // check path
+ ASSERT_EQ(tablet2->data_dir()->path(), dest_store->path());
+ // check rows
+ ASSERT_EQ(1, tablet2->num_rows());
+ // tablet2 should not equal to tablet
+ ASSERT_NE(tablet2, tablet);
+
+ // test case 2
+ // migrate tablet2 back to the tablet's path
+ // sleep 1 second for update time
+ sleep(1);
+ dest_store =
StorageEngine::instance()->get_store(tablet->data_dir()->path());
+ ASSERT_NE(dest_store, nullptr);
+ ASSERT_NE(dest_store->path(), tablet2->data_dir()->path());
+ std::cout << "dest store:" << dest_store->path() << std::endl;
+ EngineStorageMigrationTask engine_task2(tablet2, dest_store);
+ res = engine_task2.execute();
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ TabletSharedPtr tablet3 =
k_engine->tablet_manager()->get_tablet(tablet_id, schema_hash);
+ // check path
+ ASSERT_EQ(tablet3->data_dir()->path(), tablet->data_dir()->path());
+ // check rows
+ ASSERT_EQ(1, tablet3->num_rows());
+ // orgi_tablet should not equal to new_tablet and tablet
+ ASSERT_NE(tablet3, tablet2);
+ ASSERT_NE(tablet3, tablet);
+ // test case 2 end
+
+ res = k_engine->tablet_manager()->drop_tablet(tablet_id, schema_hash);
+ ASSERT_EQ(OLAP_SUCCESS, res);
+ delete delta_writer;
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), false)) {
+ fprintf(stderr, "error read config file. \n");
+ return -1;
+ }
+ int ret = doris::OLAP_SUCCESS;
+ testing::InitGoogleTest(&argc, argv);
+ doris::CpuInfo::init();
+ doris::set_up();
+ ret = RUN_ALL_TESTS();
+ doris::tear_down();
+ google::protobuf::ShutdownProtobufLibrary();
+ return ret;
+}
diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js
index 896805f..b96c51e 100644
--- a/docs/.vuepress/sidebar/en.js
+++ b/docs/.vuepress/sidebar/en.js
@@ -578,10 +578,12 @@ module.exports = [
directoryPath: "Administration/",
initialOpenGroupIndex: -1,
children: [
+ "ADMIN CANCEL REBALANCE DISK",
"ADMIN CANCEL REPAIR",
"ADMIN CLEAN TRASH",
"ADMIN CHECK TABLET",
"ADMIN COMPACT",
+ "ADMIN REBALANCE DISK",
"ADMIN REPAIR",
"ADMIN SET CONFIG",
"ADMIN SET REPLICA STATUS",
diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js
index 30c5fc2..fc7c6bd 100644
--- a/docs/.vuepress/sidebar/zh-CN.js
+++ b/docs/.vuepress/sidebar/zh-CN.js
@@ -591,10 +591,12 @@ module.exports = [
directoryPath: "Administration/",
initialOpenGroupIndex: -1,
children: [
+ "ADMIN CANCEL REBALANCE DISK",
"ADMIN CANCEL REPAIR",
"ADMIN CLEAN TRASH",
"ADMIN CHECK TABLET",
"ADMIN COMPACT",
+ "ADMIN REBALANCE DISK",
"ADMIN REPAIR",
"ADMIN SET CONFIG",
"ADMIN SET REPLICA STATUS",
diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL
REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN
CANCEL REBALANCE DISK.md
new file mode 100644
index 0000000..475e266
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN CANCEL
REBALANCE DISK.md
@@ -0,0 +1,51 @@
+---
+{
+ "title": "ADMIN CANCEL REBALANCE DISK",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN CANCEL REBALANCE DISK
+## Description
+
+This statement is used to cancel rebalancing disks of specified backends with
high priority
+
+Grammar:
+
+ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1",
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+Explain:
+
+1. This statement only indicates that the system no longer rebalance disks of
specified backends with high priority. The system will still rebalance disks by
default scheduling.
+
+## example
+
+1. Cancel High Priority Disk Rebalance of all of backends of the cluster
+
+ADMIN CANCEL REBALANCE DISK;
+
+2. Cancel High Priority Disk Rebalance of specified backends
+
+ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+
+## keyword
+ADMIN,CANCEL,REBALANCE DISK
diff --git a/docs/en/sql-reference/sql-statements/Administration/ADMIN
REBALANCE DISK.md b/docs/en/sql-reference/sql-statements/Administration/ADMIN
REBALANCE DISK.md
new file mode 100644
index 0000000..6e1c1aa
--- /dev/null
+++ b/docs/en/sql-reference/sql-statements/Administration/ADMIN REBALANCE
DISK.md
@@ -0,0 +1,52 @@
+---
+{
+ "title": "ADMIN REBALANCE DISK",
+ "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN REBALANCE DISK
+## Description
+
+This statement is used to try to rebalance disks of the specified backends
first, no matter if the cluster is balanced
+
+Grammar:
+
+ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1",
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+Explain:
+
+1. This statement only means that the system attempts to rebalance disks of
specified backends with high priority, no matter if the cluster is balanced.
+2. The default timeout is 24 hours. Timeout means that the system will no
longer rebalance disks of specified backends with high priority. The command
settings need to be reused.
+
+## example
+
+1. Attempt to rebalance disks of all backends
+
+ADMIN REBALANCE DISK;
+
+2. Attempt to rebalance disks oof the specified backends
+
+ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+
+## keyword
+ADMIN,REBALANCE,DISK
diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN
CANCEL REBALANCE DISK.md
b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL REBALANCE
DISK.md
new file mode 100644
index 0000000..e697810
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN CANCEL
REBALANCE DISK.md
@@ -0,0 +1,52 @@
+---
+{
+ "title": "ADMIN CANCEL REBALANCE DISK",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN CANCEL REBALANCE DISK
+## description
+
+ 该语句用于取消优先均衡BE的磁盘
+
+ 语法:
+
+ ADMIN CANCEL REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1",
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+ 说明:
+
+ 1. 该语句仅表示系统不再优先均衡指定BE的磁盘数据。系统仍会以默认调度方式均衡BE的磁盘数据。
+
+## example
+
+ 1. 取消集群所有BE的优先磁盘均衡
+
+ ADMIN CANCEL REBALANCE DISK;
+
+ 2. 取消指定BE的优先磁盘均衡
+
+ ADMIN CANCEL REBALANCE DISK ON ("192.168.1.1:1234",
"192.168.1.2:1234");
+
+## keyword
+ ADMIN,CANCEL,REBALANCE,DISK
+
diff --git a/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN
REBALANCE DISK.md
b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE DISK.md
new file mode 100644
index 0000000..0bb78f5
--- /dev/null
+++ b/docs/zh-CN/sql-reference/sql-statements/Administration/ADMIN REBALANCE
DISK.md
@@ -0,0 +1,54 @@
+---
+{
+ "title": "ADMIN REBALANCE DISK",
+ "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# ADMIN REBALANCE DISK
+## description
+
+ 该语句用于尝试优先均衡指定的BE磁盘数据
+
+ 语法:
+
+ ADMIN REBALANCE DISK [ON ("BackendHost1:BackendHeartBeatPort1",
"BackendHost2:BackendHeartBeatPort2", ...)];
+
+ 说明:
+
+ 1. 该语句表示让系统尝试优先均衡指定BE的磁盘数据,不受限于集群是否均衡。
+ 2. 默认的 timeout 是 24小时。超时意味着系统将不再优先均衡指定的BE磁盘数据。需要重新使用该命令设置。
+ 3. 指定BE的磁盘数据均衡后,该BE的优先级将会失效。
+
+## example
+
+ 1. 尝试优先均衡集群内的所有BE
+
+ ADMIN REBALANCE DISK;
+
+ 2. 尝试优先均衡指定BE
+
+ ADMIN REBALANCE DISK ON ("192.168.1.1:1234", "192.168.1.2:1234");
+
+## keyword
+ ADMIN,REBALANCE,DISK
+
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index da30388..2bf9da4 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -241,7 +241,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE,
KW_ALIAS, KW_ALL, KW_A
KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMMIT,
KW_COMMITTED, KW_COMPACT,
KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT,
KW_COUNT, KW_CREATE, KW_CREATION, KW_CROSS, KW_CUBE, KW_CURRENT,
KW_CURRENT_USER,
KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY,
KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE,
- KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA,
KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE,
KW_DROP, KW_DROPP, KW_DUPLICATE,
+ KW_DELETE, KW_UPDATE, KW_DISK, KW_DISTINCT, KW_DISTINCTPC,
KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS,
KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE,
KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE,
KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE,
KW_EXISTS, KW_EXPORT, KW_EXTENDED, KW_EXTERNAL, KW_EXTRACT,
KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM,
KW_FIELDS, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT,
KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS,
@@ -260,7 +260,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE,
KW_ALIAS, KW_ALL, KW_A
KW_PLUGIN, KW_PLUGINS,
KW_PROC, KW_PROCEDURE, KW_PROCESSLIST, KW_PROFILE, KW_PROPERTIES,
KW_PROPERTY,
KW_QUERY, KW_QUOTA,
- KW_RANDOM, KW_RANGE, KW_READ, KW_RECOVER, KW_REFRESH, KW_REGEXP,
KW_RELEASE, KW_RENAME,
+ KW_RANDOM, KW_RANGE, KW_READ, KW_REBALANCE, KW_RECOVER, KW_REFRESH,
KW_REGEXP, KW_RELEASE, KW_RENAME,
KW_REPAIR, KW_REPEATABLE, KW_REPOSITORY, KW_REPOSITORIES, KW_REPLACE,
KW_REPLACE_IF_NOT_NULL, KW_REPLICA, KW_RESOURCE, KW_RESOURCES, KW_RESTORE,
KW_RETURNS, KW_RESUME, KW_REVOKE,
KW_RIGHT, KW_ROLE, KW_ROLES, KW_ROLLBACK, KW_ROLLUP, KW_ROUTINE, KW_ROW,
KW_ROWS,
KW_S3, KW_SCHEMA, KW_SCHEMAS, KW_SECOND, KW_SELECT, KW_SEMI,
KW_SERIALIZABLE, KW_SESSION, KW_SET, KW_SETS, KW_SHOW, KW_SIGNED,
@@ -5297,6 +5297,22 @@ admin_stmt ::=
{:
RESULT = new AdminCheckTabletsStmt(tabletIds, properties);
:}
+ | KW_ADMIN KW_REBALANCE KW_DISK KW_ON LPAREN string_list:backends RPAREN
+ {:
+ RESULT = new AdminRebalanceDiskStmt(backends);
+ :}
+ | KW_ADMIN KW_REBALANCE KW_DISK
+ {:
+ RESULT = new AdminRebalanceDiskStmt(null);
+ :}
+ | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK KW_ON LPAREN
string_list:backends RPAREN
+ {:
+ RESULT = new AdminCancelRebalanceDiskStmt(backends);
+ :}
+ | KW_ADMIN KW_CANCEL KW_REBALANCE KW_DISK
+ {:
+ RESULT = new AdminCancelRebalanceDiskStmt(null);
+ :}
| KW_ADMIN KW_CLEAN KW_TRASH KW_ON LPAREN string_list:backends RPAREN
{:
RESULT = new AdminCleanTrashStmt(backends);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
new file mode 100644
index 0000000..3e9bab3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmt.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AdminCancelRebalanceDiskStmt extends DdlStmt {
+ private List<Backend> backends = Lists.newArrayList();
+
+ public AdminCancelRebalanceDiskStmt(List<String> backends) {
+ ImmutableMap<Long, Backend> backendsInfo =
Catalog.getCurrentSystemInfo().getIdToBackend();
+ Map<String, Long> backendsID = new HashMap<String, Long>();
+ for (Backend backend : backendsInfo.values()) {
+ backendsID.put(String.valueOf(backend.getHost()) + ":" +
String.valueOf(backend.getHeartbeatPort()), backend.getId());
+ }
+ if (backends == null) {
+ for (Backend backend : backendsInfo.values()) {
+ this.backends.add(backend);
+ }
+ } else {
+ for (String backend : backends) {
+ if (backendsID.get(backend) != null) {
+
this.backends.add(backendsInfo.get(backendsID.get(backend)));
+ backendsID.remove(backend); // avoid repetition
+ }
+ }
+ }
+ }
+
+ public List<Backend> getBackends() {
+ return backends;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.NO_FORWARD;
+ }
+}
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
new file mode 100644
index 0000000..c8f0aa6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminRebalanceDiskStmt.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.Backend;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AdminRebalanceDiskStmt extends DdlStmt {
+ private List<Backend> backends = Lists.newArrayList();
+ private long timeoutS = 0;
+
+ public AdminRebalanceDiskStmt(List<String> backends) {
+ ImmutableMap<Long, Backend> backendsInfo =
Catalog.getCurrentSystemInfo().getIdToBackend();
+ Map<String, Long> backendsID = new HashMap<String, Long>();
+ for (Backend backend : backendsInfo.values()) {
+ backendsID.put(String.valueOf(backend.getHost()) + ":" +
String.valueOf(backend.getHeartbeatPort()), backend.getId());
+ }
+ if (backends == null) {
+ for (Backend backend : backendsInfo.values()) {
+ this.backends.add(backend);
+ }
+ } else {
+ for (String backend : backends) {
+ if (backendsID.get(backend) != null) {
+
this.backends.add(backendsInfo.get(backendsID.get(backend)));
+ backendsID.remove(backend); // avoid repetition
+ }
+ }
+ }
+ timeoutS = 24 * 3600; // default 24 hours
+ }
+
+ public List<Backend> getBackends() {
+ return backends;
+ }
+
+ public long getTimeoutS() {
+ return timeoutS;
+ }
+
+ @Override
+ public void analyze(Analyzer analyzer) throws AnalysisException {
+ if
(!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(),
PrivPredicate.ADMIN)) {
+
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
"ADMIN");
+ }
+ }
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.NO_FORWARD;
+ }
+}
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
index 8c575d8..f01c017 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/BackendLoadStatistic.java
@@ -246,8 +246,8 @@ public class BackendLoadStatistic {
}
}
- LOG.debug("classify path by load. storage: {} avg used percent: {}.
low/mid/high: {}/{}/{}",
- avgUsedPercent, medium, lowCounter, midCounter, highCounter);
+ LOG.debug("classify path by load. be id: {} storage: {} avg used
percent: {}. low/mid/high: {}/{}/{}",
+ beId, medium, avgUsedPercent, lowCounter, midCounter,
highCounter);
}
public void calcScore(Map<TStorageMedium, Double>
avgClusterUsedCapacityPercentMap,
@@ -315,6 +315,60 @@ public class BackendLoadStatistic {
return status;
}
+ /*
+ * Check whether the backend can be more balance if we migrate a tablet
with size 'tabletSize' from
+ * `srcPath` to 'destPath'
+ * 1. recalculate the load score of src and dest path after migrate the
tablet.
+ * 2. if the summary of the diff between the new score and average score
becomes smaller, we consider it
+ * as more balance.
+ */
+ public boolean isMoreBalanced(long srcPath, long destPath, long tabletId,
long tabletSize,
+ TStorageMedium medium) {
+ long totalCapacity = 0;
+ long totalUsedCapacity = 0;
+ RootPathLoadStatistic srcPathStat = null;
+ RootPathLoadStatistic destPathStat = null;
+ for (RootPathLoadStatistic pathStat : pathStatistics) {
+ if (pathStat.getStorageMedium() == medium) {
+ totalCapacity += pathStat.getCapacityB();
+ totalUsedCapacity += pathStat.getUsedCapacityB();
+ if (pathStat.getPathHash() == srcPath) {
+ srcPathStat = pathStat;
+ } else if (pathStat.getPathHash() == destPath) {
+ destPathStat = pathStat;
+ }
+ }
+ }
+ if (srcPathStat == null || destPathStat == null) {
+ LOG.info("migrate {}(size: {}) from {} to {} failed, medium: {},
src or dest path stat does not exist.",
+ tabletId, tabletSize, srcPath, destPath, medium);
+ return false;
+ }
+ double avgUsedPercent = totalCapacity == 0 ? 0.0 : totalUsedCapacity /
(double) totalCapacity;
+ double currentSrcPathScore = srcPathStat.getCapacityB() == 0
+ ? 0.0 : srcPathStat.getUsedCapacityB() / (double)
srcPathStat.getCapacityB();
+ double currentDestPathScore = destPathStat.getCapacityB() == 0
+ ? 0.0 : destPathStat.getUsedCapacityB() / (double)
destPathStat.getCapacityB();
+
+ double newSrcPathScore = srcPathStat.getCapacityB() == 0
+ ? 0.0 : (srcPathStat.getUsedCapacityB() - tabletSize) / (double)
srcPathStat.getCapacityB();
+ double newDestPathScore = destPathStat.getCapacityB() == 0
+ ? 0.0 : (destPathStat.getUsedCapacityB() + tabletSize) / (double)
destPathStat.getCapacityB();
+
+ double currentDiff = Math.abs(currentSrcPathScore - avgUsedPercent)
+ + Math.abs(currentDestPathScore - avgUsedPercent);
+ double newDiff = Math.abs(newSrcPathScore - avgUsedPercent) +
Math.abs(newDestPathScore - avgUsedPercent);
+
+ LOG.debug("after migrate {}(size: {}) from {} to {}, medium: {}, the
load score changed."
+ + " src: {} -> {}, dest: {}->{}, average score: {}.
current diff: {}, new diff: {},"
+ + " more balanced: {}",
+ tabletId, tabletSize, srcPath, destPath, medium,
currentSrcPathScore, newSrcPathScore,
+ currentDestPathScore, newDestPathScore, avgUsedPercent,
currentDiff, newDiff,
+ (newDiff < currentDiff));
+
+ return newDiff < currentDiff;
+ }
+
public boolean hasAvailDisk() {
for (RootPathLoadStatistic rootPathLoadStatistic : pathStatistics) {
if (rootPathLoadStatistic.getDiskState() == DiskState.ONLINE) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
new file mode 100644
index 0000000..f32e31a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.clone.SchedException.Status;
+import org.apache.doris.clone.TabletSchedCtx.Priority;
+import org.apache.doris.clone.TabletSchedCtx.BalanceType;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStorageMedium;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/*
+
+ * This DiskBalancer is different from other Balancers which takes care of
cluster-wide data balancing.
+ * This DiskBalancer chooses a backend and moves tablet from one disk to
another.
+ * DiskRebalancer strategy:
+ * 1. only works while the cluster is balanced(which means the cluster has no
high and mid load backends)
+ * 1.1 if user has given prio backends, then select tablets from prio backends
no matter cluster is balanced or not.
+ * 2. selecting alternative tablets from mid load backends, and return them to
tablet scheduler.
+ * 3. given a tablet which has src path(disk), find a path(disk) to migration.
+ */
+public class DiskRebalancer extends Rebalancer {
+ private static final Logger LOG =
LogManager.getLogger(DiskRebalancer.class);
+
+ public DiskRebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex) {
+ super(infoService, invertedIndex);
+ }
+
+ public List<BackendLoadStatistic>
filterByPrioBackends(List<BackendLoadStatistic> bes) {
+ List<BackendLoadStatistic> stats = Lists.newArrayList();
+ for (BackendLoadStatistic backend : bes) {
+ long backendId = backend.getBeId();
+ Long timeoutS = prioBackends.getOrDefault(backendId, 0L);
+ if (timeoutS != 0) {
+ if (timeoutS > System.currentTimeMillis()) {
+ // remove backends from prio if timeout
+ prioBackends.remove(backendId);
+ continue;
+ }
+ stats.add(backend);
+ }
+ }
+ return stats;
+ }
+
+ // true means BE has low and high paths for balance after reclassification
+ private boolean checkAndReclassifyPaths(Set<Long> pathLow, Set<Long>
pathMid, Set<Long> pathHigh) {
+ if (pathLow.isEmpty() && pathHigh.isEmpty()) {
+ // balanced
+ return false;
+ }
+ if (pathLow.isEmpty()) {
+ // mid => low
+ pathLow.addAll(pathMid);
+ } else if (pathHigh.isEmpty()) {
+ // mid => high
+ pathHigh.addAll(pathMid);
+ }
+ if (pathLow.isEmpty() || pathHigh.isEmpty()) {
+ // check again
+ return false;
+ }
+ return true;
+ }
+
+ /*
+ * Try to select alternative tablets to balance the disks.
+ * 1. Classify the backend into low, mid and high class by load score.
+ * 2. Try to select tablets from mid load backends.
+ * 1. Here we only select alternative tablets, without considering
selected tablets' status,
+ * and whether it is benefit for balance (All these will be
checked in tablet scheduler)
+ * 2. Only select tablets from 'mid' backends.
+ * 3. Only select tablets from 'high' paths.
+ * 3. Try to select tablets from prio backends.
+ *
+ * Here we only select tablets from mid load node, do not set its dest,
all this will be set
+ * when this tablet is being scheduled in tablet scheduler.
+ *
+ * NOTICE that we may select any available tablets here, ignore their
state.
+ * The state will be checked when being scheduled in tablet scheduler.
+ */
+ @Override
+ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
+ ClusterLoadStatistic clusterStat, TStorageMedium medium) {
+ String clusterName = clusterStat.getClusterName();
+ List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
+
+ // get classification of backends
+ List<BackendLoadStatistic> lowBEs = Lists.newArrayList();
+ List<BackendLoadStatistic> midBEs = Lists.newArrayList();
+ List<BackendLoadStatistic> highBEs = Lists.newArrayList();
+ clusterStat.getBackendStatisticByClass(lowBEs, midBEs, highBEs,
medium);
+
+ if (!(lowBEs.isEmpty() && highBEs.isEmpty())) {
+ // the cluster is not balanced
+ if (prioBackends.isEmpty()) {
+ LOG.info("cluster is not balanced: {} with medium: {}. skip",
clusterName, medium);
+ return alternativeTablets;
+ } else {
+ // prioBEs are not empty, we only schedule prioBEs' disk
balance task
+ midBEs.addAll(lowBEs);
+ midBEs.addAll(highBEs);
+ midBEs = filterByPrioBackends(midBEs);
+ }
+ }
+
+ // first we should check if mid backends is available.
+ // if all mid backends is not available, we should not start balance
+ if (midBEs.stream().noneMatch(BackendLoadStatistic::isAvailable)) {
+ LOG.info("all mid load backends is dead: {} with medium: {}. skip",
+
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+ return alternativeTablets;
+ }
+
+ if (midBEs.stream().noneMatch(BackendLoadStatistic::hasAvailDisk)) {
+ LOG.info("all mid load backends {} have no available disk with
medium: {}. skip",
+
lowBEs.stream().mapToLong(BackendLoadStatistic::getBeId).toArray(), medium);
+ return alternativeTablets;
+ }
+
+ Set<Long> unbalancedBEs = Sets.newHashSet();
+ // choose tablets from backends randomly.
+ Collections.shuffle(midBEs);
+ for (int i = midBEs.size() - 1; i >= 0; i--) {
+ BackendLoadStatistic beStat = midBEs.get(i);
+
+ // classify the paths.
+ Set<Long> pathLow = Sets.newHashSet();
+ Set<Long> pathMid = Sets.newHashSet();
+ Set<Long> pathHigh = Sets.newHashSet();
+ // we only select tablets from available high load path
+ beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
+ // check if BE has low and high paths for balance after
reclassification
+ if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
+ continue;
+ }
+
+ // get all tablets on this backend, and shuffle them for random
selection
+ List<Long> tabletIds =
invertedIndex.getTabletIdsByBackendIdAndStorageMedium(beStat.getBeId(), medium);
+ Collections.shuffle(tabletIds);
+
+ // for each path, we try to select at most
BALANCE_SLOT_NUM_FOR_PATH tablets
+ Map<Long, Integer> remainingPaths = Maps.newHashMap();
+ for (Long pathHash : pathHigh) {
+ remainingPaths.put(pathHash,
TabletScheduler.BALANCE_SLOT_NUM_FOR_PATH);
+ }
+
+ if (remainingPaths.isEmpty()) {
+ return alternativeTablets;
+ }
+
+ // select tablet from shuffled tablets
+ for (Long tabletId : tabletIds) {
+ Replica replica = invertedIndex.getReplica(tabletId,
beStat.getBeId());
+ if (replica == null) {
+ continue;
+ }
+ // ignore empty replicas as they do not make disk more
balance. (disk usage)
+ if (replica.getDataSize() == 0) {
+ continue;
+ }
+
+ // check if replica's is on 'high' path.
+ // and only select it if the selected tablets num of this path
+ // does not exceed the limit (BALANCE_SLOT_NUM_FOR_PATH).
+ long replicaPathHash = replica.getPathHash();
+ if (remainingPaths.containsKey(replicaPathHash)) {
+ TabletMeta tabletMeta =
invertedIndex.getTabletMeta(tabletId);
+ if (tabletMeta == null) {
+ continue;
+ }
+
+ TabletSchedCtx tabletCtx = new
TabletSchedCtx(TabletSchedCtx.Type.BALANCE, clusterName,
+ tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId(),
+ tabletMeta.getIndexId(), tabletId, null /* replica
alloc is not used for balance*/,
+ System.currentTimeMillis());
+ // we set temp src here to simplify completeSchedCtx
method, and avoid take slot here
+ tabletCtx.setTempSrc(replica);
+ tabletCtx.setTag(clusterStat.getTag());
+ if (prioBackends.containsKey(beStat.getBeId())) {
+ // priority of balance task of prio BE is NORMAL
+ tabletCtx.setOrigPriority(Priority.NORMAL);
+ } else {
+ // balance task's default priority is LOW
+ tabletCtx.setOrigPriority(Priority.LOW);
+ }
+ // we must set balanceType to DISK_BALANCE for create
migration task
+ tabletCtx.setBalanceType(BalanceType.DISK_BALANCE);
+
+ alternativeTablets.add(tabletCtx);
+ unbalancedBEs.add(beStat.getBeId());
+ // update remaining paths
+ int remaining = remainingPaths.get(replicaPathHash) - 1;
+ if (remaining <= 0) {
+ remainingPaths.remove(replicaPathHash);
+ } else {
+ remainingPaths.put(replicaPathHash, remaining);
+ }
+ }
+ }
+ } // end for mid backends
+
+ // remove balanced BEs from prio backends
+ prioBackends.keySet().removeIf(id -> !unbalancedBEs.contains(id));
+ LOG.info("select alternative tablets for cluster: {}, medium: {}, num:
{}, detail: {}",
+ clusterName, medium, alternativeTablets.size(),
+
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
+ return alternativeTablets;
+ }
+
+ /*
+ * Create a StorageMediaMigrationTask of this selected tablet for balance.
+ * 1. Check if the cluster is balanced. if not, the balance will be
cancelled.
+ * 2. Check if the src replica still on high load path. If not, the
balance will be cancelled.
+ * 3. Select a low load path from this backend as destination.
+ */
+ @Override
+ public void completeSchedCtx(TabletSchedCtx tabletCtx, Map<Long, PathSlot>
backendsWorkingSlots) throws SchedException {
+ ClusterLoadStatistic clusterStat =
statisticMap.get(tabletCtx.getCluster(), tabletCtx.getTag());
+ if (clusterStat == null) {
+ throw new SchedException(Status.UNRECOVERABLE, "cluster does not
exist");
+ }
+ if (tabletCtx.getTempSrcBackendId() == -1 ||
tabletCtx.getTempSrcPathHash() == -1) {
+ throw new SchedException(Status.UNRECOVERABLE,
+ "src does not appear to be set correctly, something goes
wrong");
+ }
+ Replica replica = invertedIndex.getReplica(tabletCtx.getTabletId(),
tabletCtx.getTempSrcBackendId());
+ // check src replica still there
+ if (replica == null || replica.getPathHash() !=
tabletCtx.getTempSrcPathHash()) {
+ throw new SchedException(Status.UNRECOVERABLE, "src replica may be
rebalanced");
+ }
+ // ignore empty replicas as they do not make disk more balance
+ if (replica.getDataSize() == 0) {
+ throw new SchedException(Status.UNRECOVERABLE, "size of src
replica is zero");
+ }
+ // check src slot
+ PathSlot slot = backendsWorkingSlots.get(replica.getBackendId());
+ if (slot == null) {
+ LOG.debug("BE does not have slot: {}", replica.getBackendId());
+ throw new SchedException(Status.UNRECOVERABLE, "unable to take src
slot");
+ }
+ long pathHash = slot.takeBalanceSlot(replica.getPathHash());
+ if (pathHash == -1) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to take src
slot");
+ }
+ // after take src slot, we can set src replica now
+ tabletCtx.setSrc(replica);
+
+ BackendLoadStatistic beStat =
clusterStat.getBackendLoadStatistic(replica.getBackendId());
+ if (!beStat.isAvailable()) {
+ throw new SchedException(Status.UNRECOVERABLE, "the backend is not
available");
+ }
+ // classify the paths.
+ // If src path is 'high', then we can select path from 'low' and 'mid'
+ // If src path is 'mid', then we can only select path from 'low'
+ // If src path is 'low', then we have nothing to do
+ Set<Long> pathLow = Sets.newHashSet();
+ Set<Long> pathMid = Sets.newHashSet();
+ Set<Long> pathHigh = Sets.newHashSet();
+ beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh,
tabletCtx.getStorageMedium());
+ if (pathHigh.contains(replica.getPathHash())) {
+ pathLow.addAll(pathMid);
+ } else if (!pathMid.contains(replica.getPathHash())) {
+ throw new SchedException(Status.UNRECOVERABLE, "src path is low
load");
+ }
+ // check if this migration task can make the be's disks more balance.
+ List<RootPathLoadStatistic> availPaths = Lists.newArrayList();
+ BalanceStatus bs;
+ if ((bs = beStat.isFit(tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium(), availPaths,
+ false /* not supplement */)) != BalanceStatus.OK) {
+ LOG.debug("tablet not fit in BE {}, reason: {}", beStat.getBeId(),
bs.getErrMsgs());
+ throw new SchedException(Status.UNRECOVERABLE, "tablet not fit in
BE");
+ }
+ // Select a low load path as destination.
+ boolean setDest = false;
+ for (RootPathLoadStatistic stat : availPaths) {
+ // check if avail path is src path
+ if (stat.getPathHash() == replica.getPathHash()) {
+ continue;
+ }
+ // check if avail path is low path
+ if (!pathLow.contains(stat.getPathHash())) {
+ LOG.debug("the path :{} is not low load", stat.getPathHash());
+ continue;
+ }
+ if (!beStat.isMoreBalanced(tabletCtx.getSrcPathHash(),
stat.getPathHash(),
+ tabletCtx.getTabletId(), tabletCtx.getTabletSize(),
tabletCtx.getStorageMedium())) {
+ LOG.debug("the path :{} can not make more balance",
stat.getPathHash());
+ continue;
+ }
+ long destPathHash = slot.takeBalanceSlot(stat.getPathHash());
+ if (destPathHash == -1) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to take
dest slot");
+ }
+ tabletCtx.setDest(beStat.getBeId(), destPathHash, stat.getPath());
+ setDest = true;
+ break;
+ }
+
+ if (!setDest) {
+ throw new SchedException(Status.UNRECOVERABLE, "unable to find low
load path");
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
index 1fca406..a7177c2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/Rebalancer.java
@@ -20,11 +20,13 @@ package org.apache.doris.clone;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
-import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTask;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Maps;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
@@ -50,6 +52,8 @@ public abstract class Rebalancer {
protected Table<String, Tag, ClusterLoadStatistic> statisticMap =
HashBasedTable.create();
protected TabletInvertedIndex invertedIndex;
protected SystemInfoService infoService;
+ // be id -> end time of prio
+ protected Map<Long, Long> prioBackends = Maps.newConcurrentMap();
public Rebalancer(SystemInfoService infoService, TabletInvertedIndex
invertedIndex) {
this.infoService = infoService;
@@ -71,10 +75,14 @@ public abstract class Rebalancer {
protected abstract List<TabletSchedCtx> selectAlternativeTabletsForCluster(
ClusterLoadStatistic clusterStat, TStorageMedium medium);
- public void createBalanceTask(TabletSchedCtx tabletCtx, Map<Long,
PathSlot> backendsWorkingSlots,
- AgentBatchTask batchTask) throws
SchedException {
+ public AgentTask createBalanceTask(TabletSchedCtx tabletCtx, Map<Long,
PathSlot> backendsWorkingSlots)
+ throws SchedException {
completeSchedCtx(tabletCtx, backendsWorkingSlots);
- batchTask.addTask(tabletCtx.createCloneReplicaAndTask());
+ if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.BE_BALANCE) {
+ return tabletCtx.createCloneReplicaAndTask();
+ } else {
+ return tabletCtx.createStorageMediaMigrationTask();
+ }
}
// Before createCloneReplicaAndTask, we need to complete the
TabletSchedCtx.
@@ -93,4 +101,21 @@ public abstract class Rebalancer {
public void updateLoadStatistic(Table<String, Tag, ClusterLoadStatistic>
statisticMap) {
this.statisticMap = statisticMap;
}
+
+ public void addPrioBackends(List<Backend> backends, long timeoutS) {
+ long currentTimeMillis = System.currentTimeMillis();
+ for (Backend backend : backends) {
+ prioBackends.put(backend.getId(), currentTimeMillis + timeoutS);
+ }
+ }
+
+ public void removePrioBackends(List<Backend> backends) {
+ for (Backend backend : backends) {
+ prioBackends.remove(backend.getId());
+ }
+ }
+
+ public boolean hasPrioBackends() {
+ return !prioBackends.isEmpty();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index 6610b48..60e8080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -40,6 +40,7 @@ import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CloneTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TStatusCode;
@@ -108,6 +109,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
BALANCE, REPAIR
}
+ public enum BalanceType {
+ BE_BALANCE, DISK_BALANCE
+ }
+
public enum Priority {
LOW,
NORMAL,
@@ -141,6 +146,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
private Type type;
+ private BalanceType balanceType;
/*
* origPriority is the origin priority being set when this tablet being
added to scheduler.
@@ -193,11 +199,16 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
private Replica srcReplica = null;
private long srcPathHash = -1;
+ // for disk balance to keep src path, and avoid take slot on
selectAlternativeTabletsForCluster
+ private Replica tempSrcReplica = null;
private long destBackendId = -1;
private long destPathHash = -1;
+ // for disk balance to set migration task's datadir
+ private String destPath = null;
private String errMsg = null;
private CloneTask cloneTask = null;
+ private StorageMediaMigrationTask storageMediaMigrationTask = null;
// statistics gathered from clone task report
// the total size of clone files and the total cost time in ms.
@@ -227,6 +238,7 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.infoService = Catalog.getCurrentSystemInfo();
this.state = State.PENDING;
this.replicaAlloc = replicaAlloc;
+ this.balanceType = BalanceType.BE_BALANCE;
}
public ReplicaAllocation getReplicaAlloc() {
@@ -249,6 +261,14 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
return type;
}
+ public void setBalanceType(BalanceType type) {
+ this.balanceType = type;
+ }
+
+ public BalanceType getBalanceType() {
+ return balanceType;
+ }
+
public Priority getOrigPriority() {
return origPriority;
}
@@ -380,6 +400,11 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.destBackendId = destBeId;
this.destPathHash = destPathHash;
}
+
+ public void setDest(Long destBeId, long destPathHash, String destPath) {
+ setDest(destBeId, destPathHash);
+ this.destPath = destPath;
+ }
public void setErrMsg(String errMsg) {
this.errMsg = errMsg;
@@ -414,6 +439,24 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.srcPathHash = srcReplica.getPathHash();
}
+ public void setTempSrc(Replica srcReplica) {
+ this.tempSrcReplica = srcReplica;
+ }
+
+ public long getTempSrcBackendId() {
+ if (tempSrcReplica != null) {
+ return tempSrcReplica.getBackendId();
+ }
+ return -1;
+ }
+
+ public long getTempSrcPathHash() {
+ if (tempSrcReplica != null) {
+ return tempSrcReplica.getPathHash();
+ }
+ return -1;
+ }
+
public long getDestBackendId() {
return destBackendId;
}
@@ -422,6 +465,10 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
return destPathHash;
}
+ public String getDestPath() {
+ return destPath;
+ }
+
// database lock should be held.
public long getTabletSize() {
long max = Long.MIN_VALUE;
@@ -687,6 +734,9 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
}
}
+ if (storageMediaMigrationTask != null) {
+
AgentTaskQueue.removeTask(storageMediaMigrationTask.getBackendId(),
TTaskType.STORAGE_MEDIUM_MIGRATE, storageMediaMigrationTask.getSignature());
+ }
if (cloneTask != null) {
AgentTaskQueue.removeTask(cloneTask.getBackendId(),
TTaskType.CLONE, cloneTask.getSignature());
@@ -729,13 +779,28 @@ public class TabletSchedCtx implements
Comparable<TabletSchedCtx> {
this.srcPathHash = -1;
this.destBackendId = -1;
this.destPathHash = -1;
+ this.destPath = null;
this.cloneTask = null;
+ this.storageMediaMigrationTask = null;
}
}
public void deleteReplica(Replica replica) {
tablet.deleteReplicaByBackendId(replica.getBackendId());
}
+
+ public StorageMediaMigrationTask createStorageMediaMigrationTask() throws
SchedException {
+ storageMediaMigrationTask = new
StorageMediaMigrationTask(getSrcBackendId(), getTabletId(),
+ getSchemaHash(), getStorageMedium());
+ if (destPath == null || destPath.isEmpty()) {
+ throw new SchedException(Status.UNRECOVERABLE,
+ "backend " + srcReplica.getBackendId() + ", dest path is
empty");
+ }
+ storageMediaMigrationTask.setDataDir(destPath);
+ this.taskTimeoutMs = getApproximateTimeoutMs();
+ this.state = State.RUNNING;
+ return storageMediaMigrationTask;
+ }
// database lock should be held.
public CloneTask createCloneReplicaAndTask() throws SchedException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1f80cad..68ab63a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -17,6 +17,8 @@
package org.apache.doris.clone;
+import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
+import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
@@ -51,7 +53,9 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CloneTask;
import org.apache.doris.task.DropReplicaTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.TransactionState;
@@ -101,13 +105,14 @@ public class TabletScheduler extends MasterDaemon {
private static final long SCHEDULE_INTERVAL_MS = 1000; // 1s
- public static final int BALANCE_SLOT_NUM_FOR_PATH = 2;
+ // 1 slot for reduce unnecessary balance task, provided a more accurate
estimate of capacity
+ public static final int BALANCE_SLOT_NUM_FOR_PATH = 1;
/*
* Tablet is added to pendingTablets as well it's id in allTabletIds.
* TabletScheduler will take tablet from pendingTablets but will not
remove it's id from allTabletIds when
* handling a tablet.
- * Tablet' id can only be removed after the clone task is done(timeout,
cancelled or finished).
+ * Tablet' id can only be removed after the clone task or migration task
is done(timeout, cancelled or finished).
* So if a tablet's id is still in allTabletIds, TabletChecker can not add
tablet to TabletScheduler.
*
* pendingTablets + runningTablets = allTabletIds
@@ -135,6 +140,7 @@ public class TabletScheduler extends MasterDaemon {
private ColocateTableIndex colocateTableIndex;
private TabletSchedulerStat stat;
private Rebalancer rebalancer;
+ private Rebalancer diskRebalancer;
// result of adding a tablet to pendingTablets
public enum AddResult {
@@ -157,6 +163,8 @@ public class TabletScheduler extends MasterDaemon {
} else {
this.rebalancer = new BeLoadRebalancer(infoService, invertedIndex);
}
+ // if rebalancer can not get new task, then use diskRebalancer to get
task
+ this.diskRebalancer = new DiskRebalancer(infoService, invertedIndex);
}
public TabletSchedulerStat getStat() {
@@ -244,6 +252,14 @@ public class TabletScheduler extends MasterDaemon {
return allTabletIds.contains(tabletId);
}
+ public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
+ diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS());
+ }
+
+ public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt
stmt) {
+ diskRebalancer.removePrioBackends(stmt.getBackends());
+ }
+
/**
* Iterate current tablets, change their priority to VERY_HIGH if
necessary.
*/
@@ -300,6 +316,7 @@ public class TabletScheduler extends MasterDaemon {
updateClusterLoadStatistic();
rebalancer.updateLoadStatistic(statisticMap);
+ diskRebalancer.updateLoadStatistic(statisticMap);
adjustPriorities();
@@ -463,7 +480,6 @@ public class TabletScheduler extends MasterDaemon {
* Try to schedule a single tablet.
*/
private void scheduleTablet(TabletSchedCtx tabletCtx, AgentBatchTask
batchTask) throws SchedException {
- LOG.debug("schedule tablet: {}, type: {}, status: {}",
tabletCtx.getTabletId(), tabletCtx.getType(), tabletCtx.getTabletStatus());
long currentTime = System.currentTimeMillis();
tabletCtx.setLastSchedTime(currentTime);
tabletCtx.setLastVisitedTime(currentTime);
@@ -561,6 +577,11 @@ public class TabletScheduler extends MasterDaemon {
throw new SchedException(Status.UNRECOVERABLE, "tablet is
unhealthy when doing balance");
}
+ // for disk balance more accutely, we only schedule tablet when
has lastly stat info about disk
+ if (tabletCtx.getType() == TabletSchedCtx.Type.BALANCE &&
+ tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+ checkDiskBalanceLastSuccTime(tabletCtx.getTempSrcBackendId(),
tabletCtx.getTempSrcPathHash());
+ }
// we do not concern priority here.
// once we take the tablet out of priority queue, priority is
meaningless.
tabletCtx.setTablet(tablet);
@@ -574,6 +595,25 @@ public class TabletScheduler extends MasterDaemon {
}
}
+ private void checkDiskBalanceLastSuccTime(long beId, long pathHash) throws
SchedException {
+ PathSlot pathSlot = backendsWorkingSlots.get(beId);
+ if (pathSlot == null) {
+ throw new SchedException(Status.UNRECOVERABLE, "path slot does not
exist");
+ }
+ long succTime = pathSlot.getDiskBalanceLastSuccTime(pathHash);
+ if (succTime > lastStatUpdateTime) {
+ throw new SchedException(Status.UNRECOVERABLE, "stat info is
outdated");
+ }
+ }
+
+ public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
+ PathSlot pathSlot = backendsWorkingSlots.get(beId);
+ if (pathSlot == null) {
+ return;
+ }
+ pathSlot.updateDiskBalanceLastSuccTime(pathHash);
+ }
+
private void handleTabletByTypeAndStatus(TabletStatus status,
TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
if (tabletCtx.getType() == Type.REPAIR) {
@@ -1189,6 +1229,21 @@ public class TabletScheduler extends MasterDaemon {
for (TabletSchedCtx tabletCtx : alternativeTablets) {
addTablet(tabletCtx, false);
}
+ if (Config.disable_disk_balance) {
+ LOG.info("disk balance is disabled. skip selecting tablets for
disk balance");
+ return;
+ }
+ List<TabletSchedCtx> diskBalanceTablets = Lists.newArrayList();
+ // if default rebalancer can not get new task or user given prio BEs,
then use disk rebalancer to get task
+ if (diskRebalancer.hasPrioBackends() || alternativeTablets.isEmpty()) {
+ diskBalanceTablets = diskRebalancer.selectAlternativeTablets();
+ }
+ for (TabletSchedCtx tabletCtx : diskBalanceTablets) {
+ // add if task from prio backend or cluster is balanced
+ if (alternativeTablets.isEmpty() || tabletCtx.getOrigPriority() ==
TabletSchedCtx.Priority.NORMAL) {
+ addTablet(tabletCtx, false);
+ }
+ }
}
/**
@@ -1196,7 +1251,18 @@ public class TabletScheduler extends MasterDaemon {
*/
private void doBalance(TabletSchedCtx tabletCtx, AgentBatchTask batchTask)
throws SchedException {
stat.counterBalanceSchedule.incrementAndGet();
- rebalancer.createBalanceTask(tabletCtx, backendsWorkingSlots,
batchTask);
+ AgentTask task = null;
+ if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+ task = diskRebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ checkDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(),
tabletCtx.getSrcPathHash());
+ checkDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(),
tabletCtx.getDestPathHash());
+ } else if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.BE_BALANCE) {
+ task = rebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ } else {
+ throw new SchedException(Status.UNRECOVERABLE,
+ "unknown balance type: " +
tabletCtx.getBalanceType().toString());
+ }
+ batchTask.addTask(task);
}
// choose a path on a backend which is fit for the tablet
@@ -1347,7 +1413,7 @@ public class TabletScheduler extends MasterDaemon {
// get next batch of tablets from queue.
private synchronized List<TabletSchedCtx> getNextTabletCtxBatch() {
List<TabletSchedCtx> list = Lists.newArrayList();
- int count = Math.max(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
+ int count = Math.min(MIN_BATCH_NUM, getCurrentAvailableSlotNum());
while (count > 0) {
TabletSchedCtx tablet = pendingTablets.poll();
if (tablet == null) {
@@ -1368,6 +1434,29 @@ public class TabletScheduler extends MasterDaemon {
return total;
}
+ public boolean finishStorageMediaMigrationTask(StorageMediaMigrationTask
migrationTask,
+ TFinishTaskRequest request) {
+ long tabletId = migrationTask.getTabletId();
+ TabletSchedCtx tabletCtx = takeRunningTablets(tabletId);
+ if (tabletCtx == null) {
+ // tablet does not exist, the task may be created by
ReportHandler.tabletReport(ssd => hdd)
+ LOG.warn("tablet info does not exist: {}", tabletId);
+ return true;
+ }
+ if (tabletCtx.getBalanceType() !=
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+ // this should not happen
+ LOG.warn("task type is not as excepted. tablet {}", tabletId);
+ return true;
+ }
+ if (request.getTaskStatus().getStatusCode() == TStatusCode.OK) {
+ // if we have a success task, then stat must be refreshed before
schedule a new task
+ updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(),
tabletCtx.getSrcPathHash());
+ updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(),
tabletCtx.getDestPathHash());
+ }
+ // we need this function to free slot for this migration task
+ finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED,
"finished");
+ return true;
+ }
/**
* return true if we want to remove the clone task from AgentTaskQueue
*/
@@ -1379,6 +1468,11 @@ public class TabletScheduler extends MasterDaemon {
// tablet does not exist, no need to keep task.
return true;
}
+ if (tabletCtx.getBalanceType() ==
TabletSchedCtx.BalanceType.DISK_BALANCE) {
+ // this should not happen
+ LOG.warn("task type is not as excepted. tablet {}", tabletId);
+ return true;
+ }
Preconditions.checkState(tabletCtx.getState() ==
TabletSchedCtx.State.RUNNING, tabletCtx.getState());
try {
@@ -1706,6 +1800,22 @@ public class TabletScheduler extends MasterDaemon {
slot.balanceSlot++;
slot.rectify();
}
+
+ public synchronized void updateDiskBalanceLastSuccTime(long pathHash) {
+ Slot slot = pathSlots.get(pathHash);
+ if (slot == null) {
+ return;
+ }
+ slot.diskBalanceLastSuccTime = System.currentTimeMillis();
+ }
+
+ public synchronized long getDiskBalanceLastSuccTime(long pathHash) {
+ Slot slot = pathSlots.get(pathHash);
+ if (slot == null) {
+ return 0L;
+ }
+ return slot.diskBalanceLastSuccTime;
+ }
}
public List<List<String>> getSlotsInfo() {
@@ -1726,6 +1836,9 @@ public class TabletScheduler extends MasterDaemon {
public long totalCopySize = 0;
public long totalCopyTimeMs = 0;
+ // for disk balance
+ public long diskBalanceLastSuccTime = 0;
+
public Slot(int total) {
this.total = total;
this.available = total;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index a9a8b9e..1551676 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1133,6 +1133,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean disable_balance = false;
+ /**
+ * if set to true, TabletScheduler will not do disk balance.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean disable_disk_balance = false;
+
// if the number of scheduled tablets in TabletScheduler exceed
max_scheduling_tablets
// skip checking.
@ConfField(mutable = true, masterOnly = true)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 5230f9f..f0ae6a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -47,6 +47,7 @@ import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.task.UploadTask;
import org.apache.doris.thrift.TBackend;
@@ -115,8 +116,8 @@ public class MasterImpl {
AgentTask task = AgentTaskQueue.getTask(backendId, taskType,
signature);
if (task == null) {
- if (taskType != TTaskType.DROP && taskType !=
TTaskType.STORAGE_MEDIUM_MIGRATE
- && taskType != TTaskType.RELEASE_SNAPSHOT && taskType !=
TTaskType.CLEAR_TRANSACTION_TASK) {
+ if (taskType != TTaskType.DROP && taskType !=
TTaskType.RELEASE_SNAPSHOT
+ && taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
String errMsg = "cannot find task. type: " + taskType + ",
backendId: " + backendId
+ ", signature: " + signature;
LOG.warn(errMsg);
@@ -137,7 +138,8 @@ public class MasterImpl {
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType !=
TTaskType.UPLOAD
&& taskType != TTaskType.DOWNLOAD && taskType !=
TTaskType.MOVE
&& taskType != TTaskType.CLONE && taskType !=
TTaskType.PUBLISH_VERSION
- && taskType != TTaskType.CREATE && taskType !=
TTaskType.UPDATE_TABLET_META_INFO) {
+ && taskType != TTaskType.CREATE && taskType !=
TTaskType.UPDATE_TABLET_META_INFO
+ && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
return result;
}
}
@@ -175,6 +177,9 @@ public class MasterImpl {
case CLONE:
finishClone(task, request);
break;
+ case STORAGE_MEDIUM_MIGRATE:
+ finishStorageMediumMigrate(task, request);
+ break;
case CHECK_CONSISTENCY:
finishConsistencyCheck(task, request);
break;
@@ -699,6 +704,12 @@ public class MasterImpl {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE,
task.getSignature());
}
+ private void finishStorageMediumMigrate(AgentTask task, TFinishTaskRequest
request) {
+ StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask)
task;
+
Catalog.getCurrentCatalog().getTabletScheduler().finishStorageMediaMigrationTask(migrationTask,
request);
+ AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.STORAGE_MEDIUM_MIGRATE, task.getSignature());
+ }
+
private void finishConsistencyCheck(AgentTask task, TFinishTaskRequest
request) {
CheckConsistencyTask checkConsistencyTask = (CheckConsistencyTask)
task;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index ea4abd9..e1f1051 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -360,10 +360,12 @@ public class ReportHandler extends Daemon {
// 1. CREATE
// 2. SYNC DELETE
// 3. CHECK_CONSISTENCY
+ // 4. STORAGE_MDEIUM_MIGRATE
if (task.getTaskType() == TTaskType.CREATE
|| (task.getTaskType() == TTaskType.PUSH && ((PushTask)
task).getPushType() == TPushType.DELETE
&& ((PushTask) task).isSyncDelete())
- || task.getTaskType() == TTaskType.CHECK_CONSISTENCY) {
+ || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
+ || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE)
{
continue;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index c66349a..c35a27b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -21,6 +21,8 @@ import org.apache.doris.analysis.AdminCancelRepairTableStmt;
import org.apache.doris.analysis.AdminCheckTabletsStmt;
import org.apache.doris.analysis.AdminCleanTrashStmt;
import org.apache.doris.analysis.AdminCompactTableStmt;
+import org.apache.doris.analysis.AdminCancelRebalanceDiskStmt;
+import org.apache.doris.analysis.AdminRebalanceDiskStmt;
import org.apache.doris.analysis.AdminRepairTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
@@ -281,6 +283,10 @@ public class DdlExecutor {
catalog.getSyncJobManager().stopSyncJob((StopSyncJobStmt) ddlStmt);
} else if (ddlStmt instanceof AdminCleanTrashStmt) {
catalog.cleanTrash((AdminCleanTrashStmt) ddlStmt);
+ } else if (ddlStmt instanceof AdminRebalanceDiskStmt) {
+
catalog.getTabletScheduler().rebalanceDisk((AdminRebalanceDiskStmt) ddlStmt);
+ } else if (ddlStmt instanceof AdminCancelRebalanceDiskStmt) {
+
catalog.getTabletScheduler().cancelRebalanceDisk((AdminCancelRebalanceDiskStmt)
ddlStmt);
} else if (ddlStmt instanceof CreateSqlBlockRuleStmt) {
catalog.getSqlBlockRuleMgr().createSqlBlockRule((CreateSqlBlockRuleStmt)
ddlStmt);
} else if (ddlStmt instanceof AlterSqlBlockRuleStmt) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
index 72aef5a..1ddbde7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/StorageMediaMigrationTask.java
@@ -21,10 +21,14 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageMediumMigrateReq;
import org.apache.doris.thrift.TTaskType;
+import com.google.common.base.Strings;
+
public class StorageMediaMigrationTask extends AgentTask {
private int schemaHash;
private TStorageMedium toStorageMedium;
+ // if dataDir is specified, the toStorageMedium is meaning less
+ private String dataDir;
public StorageMediaMigrationTask(long backendId, long tabletId, int
schemaHash,
TStorageMedium toStorageMedium) {
@@ -36,9 +40,20 @@ public class StorageMediaMigrationTask extends AgentTask {
public TStorageMediumMigrateReq toThrift() {
TStorageMediumMigrateReq request = new
TStorageMediumMigrateReq(tabletId, schemaHash, toStorageMedium);
+ if (!Strings.isNullOrEmpty(dataDir)) {
+ request.setDataDir(dataDir);
+ }
return request;
}
+ public String getDataDir() {
+ return dataDir;
+ }
+
+ public void setDataDir(String dataDir) {
+ this.dataDir = dataDir;
+ }
+
public int getSchemaHash() {
return schemaHash;
}
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex
b/fe/fe-core/src/main/jflex/sql_scanner.flex
index a2eeba9..7ab3c7e 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -171,6 +171,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("distinctpcsa", new
Integer(SqlParserSymbols.KW_DISTINCTPCSA));
keywordMap.put("distributed", new
Integer(SqlParserSymbols.KW_DISTRIBUTED));
keywordMap.put("distribution", new
Integer(SqlParserSymbols.KW_DISTRIBUTION));
+ keywordMap.put("disk", new Integer(SqlParserSymbols.KW_DISK));
keywordMap.put("dynamic", new Integer(SqlParserSymbols.KW_DYNAMIC));
keywordMap.put("div", new Integer(SqlParserSymbols.KW_DIV));
keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
@@ -319,6 +320,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("range", new Integer(SqlParserSymbols.KW_RANGE));
keywordMap.put("read", new Integer(SqlParserSymbols.KW_READ));
keywordMap.put("real", new Integer(SqlParserSymbols.KW_DOUBLE));
+ keywordMap.put("rebalance", new
Integer(SqlParserSymbols.KW_REBALANCE));
keywordMap.put("recover", new Integer(SqlParserSymbols.KW_RECOVER));
keywordMap.put("refresh", new Integer(SqlParserSymbols.KW_REFRESH));
keywordMap.put("regexp", new Integer(SqlParserSymbols.KW_REGEXP));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
new file mode 100644
index 0000000..8e92567
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminCancelRebalanceDiskStmtTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.clone.RebalancerTestUtil;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.mysql.privilege.MockedAuth;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import mockit.Mocked;
+
+public class AdminCancelRebalanceDiskStmtTest {
+
+ private static Analyzer analyzer;
+
+ @Mocked
+ private PaloAuth auth;
+ @Mocked
+ private ConnectContext ctx;
+
+ @Before()
+ public void setUp() {
+ Config.disable_cluster_feature = false;
+ analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+ MockedAuth.mockedAuth(auth);
+ MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
+
+ List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
+ beIds.forEach(id ->
Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id,
2048, 0)));
+ }
+
+ @Test
+ public void testParticularBackends() throws AnalysisException {
+ List<String> backends = Lists.newArrayList(
+ "192.168.0.10003:9051", "192.168.0.10004:9051",
"192.168.0.10005:9051", "192.168.0.10006:9051");
+ final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(backends);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(2, stmt.getBackends().size());
+ }
+
+ @Test
+ public void testEmpty() throws AnalysisException {
+ List<String> backends = Lists.newArrayList();
+ final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(backends);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(0, stmt.getBackends().size());
+ }
+
+ @Test
+ public void testNull() throws AnalysisException {
+ final AdminCancelRebalanceDiskStmt stmt = new
AdminCancelRebalanceDiskStmt(null);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(4, stmt.getBackends().size());
+ }
+
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
new file mode 100644
index 0000000..e83693c
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/analysis/AdminRebalanceDiskStmtTest.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.clone.RebalancerTestUtil;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.mysql.privilege.MockedAuth;
+import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+//import java.util.ArrayList;
+import java.util.List;
+
+import mockit.Mocked;
+
+public class AdminRebalanceDiskStmtTest {
+
+ private static Analyzer analyzer;
+
+ @Mocked
+ private PaloAuth auth;
+ @Mocked
+ private ConnectContext ctx;
+
+ @Before()
+ public void setUp() {
+ Config.disable_cluster_feature = false;
+ analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
+ MockedAuth.mockedAuth(auth);
+ MockedAuth.mockedConnectContext(ctx, "root", "192.168.1.1");
+
+ List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
+ beIds.forEach(id ->
Catalog.getCurrentSystemInfo().addBackend(RebalancerTestUtil.createBackend(id,
2048, 0)));
+ }
+
+ @Test
+ public void testParticularBackends() throws AnalysisException {
+ List<String> backends = Lists.newArrayList(
+ "192.168.0.10003:9051", "192.168.0.10004:9051",
"192.168.0.10005:9051", "192.168.0.10006:9051");
+ final AdminRebalanceDiskStmt stmt = new
AdminRebalanceDiskStmt(backends);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(2, stmt.getBackends().size());
+ }
+
+ @Test
+ public void testEmpty() throws AnalysisException {
+ List<String> backends = Lists.newArrayList();
+ final AdminRebalanceDiskStmt stmt = new
AdminRebalanceDiskStmt(backends);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(0, stmt.getBackends().size());
+ }
+
+ @Test
+ public void testNull() throws AnalysisException {
+ final AdminRebalanceDiskStmt stmt = new AdminRebalanceDiskStmt(null);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(4, stmt.getBackends().size());
+ }
+
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
similarity index 54%
copy from fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
copy to fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
index fc4dd7d..1d7cb00 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java
@@ -21,35 +21,32 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.RangePartitionInfo;
-import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.clone.TabletScheduler.PathSlot;
import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.CloneTask;
-import org.apache.doris.thrift.TFinishTaskRequest;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
-import org.apache.doris.thrift.TTabletInfo;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -61,7 +58,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Map;
+//import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -69,10 +67,10 @@ import java.util.stream.LongStream;
import mockit.Delegate;
import mockit.Expectations;
import mockit.Mocked;
-import static com.google.common.collect.MoreCollectors.onlyElement;
+//import static com.google.common.collect.MoreCollectors.onlyElement;
-public class RebalanceTest {
- private static final Logger LOG =
LogManager.getLogger(RebalanceTest.class);
+public class DiskRebalanceTest {
+ private static final Logger LOG =
LogManager.getLogger(DiskRebalanceTest.class);
@Mocked
private Catalog catalog;
@@ -134,9 +132,32 @@ public class RebalanceTest {
// Test mock validation
Assert.assertEquals(111,
Catalog.getCurrentGlobalTransactionMgr().getTransactionIDGenerator().getNextTransactionId());
Assert.assertTrue(Catalog.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(1,
2, Lists.newArrayList(3L)));
+ }
+
+ private void generateStatisticMap() {
+ ClusterLoadStatistic loadStatistic = new
ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
+ Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
+ loadStatistic.init();
+ statisticMap = HashBasedTable.create();
+ statisticMap.put(SystemInfoService.DEFAULT_CLUSTER,
Tag.DEFAULT_BACKEND_TAG, loadStatistic);
+ }
+
+ private void createPartitionsForTable(OlapTable olapTable,
MaterializedIndex index, Long partitionCount) {
+ // partition id start from 31
+ LongStream.range(0, partitionCount).forEach(idx -> {
+ long id = 31 + idx;
+ Partition partition = new Partition(id, "p" + idx, index, new
HashDistributionInfo());
+ olapTable.addPartition(partition);
+ olapTable.getPartitionInfo().addPartition(id, new
DataProperty(TStorageMedium.HDD),
+ ReplicaAllocation.DEFAULT_ALLOCATION, false);
+ });
+ }
- List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L, 10004L);
- beIds.forEach(id ->
systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048, 0)));
+ @Test
+ public void testDiskRebalancerWithSameUsageDisk() {
+ // init system
+ List<Long> beIds = Lists.newArrayList(10001L, 10002L, 10003L);
+ beIds.forEach(id ->
systemInfoService.addBackend(RebalancerTestUtil.createBackend(id, 2048,
Lists.newArrayList(512L,512L), 2)));
olapTable = new OlapTable(2, "fake table", new ArrayList<>(),
KeysType.DUP_KEYS,
new RangePartitionInfo(), new HashDistributionInfo());
@@ -149,6 +170,7 @@ public class RebalanceTest {
0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
// Tablet distribution: we add them to olapTable & build invertedIndex
manually
+ // all of tablets are in first path of it's backend
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0",
TStorageMedium.HDD,
50000, Lists.newArrayList(10001L, 10002L, 10003L));
@@ -157,57 +179,64 @@ public class RebalanceTest {
RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2",
TStorageMedium.HDD,
70000, Lists.newArrayList(10001L, 10002L, 10003L));
+
+ // case start
+ Configurator.setLevel("org.apache.doris.clone.DiskRebalancer",
Level.DEBUG);
- // be4(10004) doesn't have any replica
-
+ Rebalancer rebalancer = new
DiskRebalancer(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentInvertedIndex());
generateStatisticMap();
+ rebalancer.updateLoadStatistic(statisticMap);
+ List<TabletSchedCtx> alternativeTablets =
rebalancer.selectAlternativeTablets();
+ // check alternativeTablets;
+ Assert.assertTrue(alternativeTablets.isEmpty());
}
- private void generateStatisticMap() {
- ClusterLoadStatistic loadStatistic = new
ClusterLoadStatistic(SystemInfoService.DEFAULT_CLUSTER,
- Tag.DEFAULT_BACKEND_TAG, systemInfoService, invertedIndex);
- loadStatistic.init();
- statisticMap = HashBasedTable.create();
- statisticMap.put(SystemInfoService.DEFAULT_CLUSTER,
Tag.DEFAULT_BACKEND_TAG, loadStatistic);
- }
+ @Test
+ public void testDiskRebalancerWithDiffUsageDisk() {
+ // init system
+ systemInfoService.addBackend(RebalancerTestUtil.createBackend(10001L,
2048, Lists.newArrayList(1024L), 1));
+ systemInfoService.addBackend(RebalancerTestUtil.createBackend(10002L,
2048, Lists.newArrayList(1024L, 512L), 2));
+ systemInfoService.addBackend(RebalancerTestUtil.createBackend(10003L,
2048, Lists.newArrayList(1024L, 512L, 513L), 3));
- private void createPartitionsForTable(OlapTable olapTable,
MaterializedIndex index, Long partitionCount) {
- // partition id start from 31
- LongStream.range(0, partitionCount).forEach(idx -> {
- long id = 31 + idx;
- Partition partition = new Partition(id, "p" + idx, index, new
HashDistributionInfo());
- olapTable.addPartition(partition);
- olapTable.getPartitionInfo().addPartition(id, new
DataProperty(TStorageMedium.HDD),
- ReplicaAllocation.DEFAULT_ALLOCATION, false);
- });
- }
+ olapTable = new OlapTable(2, "fake table", new ArrayList<>(),
KeysType.DUP_KEYS,
+ new RangePartitionInfo(), new HashDistributionInfo());
+ db.createTable(olapTable);
- @Test
- public void testPartitionRebalancer() {
- Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer",
Level.DEBUG);
+ // 1 table, 3 partitions p0,p1,p2
+ MaterializedIndex materializedIndex = new
MaterializedIndex(olapTable.getId(), null);
+ createPartitionsForTable(olapTable, materializedIndex, 3L);
+ olapTable.setIndexMeta(materializedIndex.getId(), "fake index",
Lists.newArrayList(new Column()),
+ 0, 0, (short) 0, TStorageType.COLUMN, KeysType.DUP_KEYS);
- // Disable scheduler's rebalancer adding balance task, add balance
tasks manually
- Config.disable_balance = true;
- // generate statistic map again to create skewmap
- Config.tablet_rebalancer_type = "partition";
- generateStatisticMap();
- // Create a new scheduler & checker for redundant tablets handling
- // Call runAfterCatalogReady manually instead of starting daemon thread
- TabletSchedulerStat stat = new TabletSchedulerStat();
- PartitionRebalancer rebalancer = new
PartitionRebalancer(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentInvertedIndex());
- TabletScheduler tabletScheduler = new TabletScheduler(catalog,
systemInfoService, invertedIndex, stat, "");
- // The rebalancer inside the scheduler will use this rebalancer, for
getToDeleteReplicaId
- Deencapsulation.setField(tabletScheduler, "rebalancer", rebalancer);
+ // Tablet distribution: we add them to olapTable & build invertedIndex
manually
+ // all of tablets are in first path of it's backend
+ RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p0",
TStorageMedium.HDD,
+ 50000, Lists.newArrayList(10001L, 10002L, 10003L),
Lists.newArrayList(0L, 100L, 300L));
+
+ RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p1",
TStorageMedium.HDD,
+ 60000, Lists.newArrayList(10001L, 10002L, 10003L),
Lists.newArrayList(50L, 0L, 200L));
+
+ RebalancerTestUtil.createTablet(invertedIndex, db, olapTable, "p2",
TStorageMedium.HDD,
+ 70000, Lists.newArrayList(10001L, 10002L, 10003L),
Lists.newArrayList(100L, 200L, 0L));
- TabletChecker tabletChecker = new TabletChecker(catalog,
systemInfoService, tabletScheduler, stat);
+ // case start
+ Configurator.setLevel("org.apache.doris.clone.DiskRebalancer",
Level.DEBUG);
+ Rebalancer rebalancer = new
DiskRebalancer(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentInvertedIndex());
+ generateStatisticMap();
rebalancer.updateLoadStatistic(statisticMap);
List<TabletSchedCtx> alternativeTablets =
rebalancer.selectAlternativeTablets();
+ // check alternativeTablets;
+ Assert.assertEquals(2, alternativeTablets.size());
+ Map<Long, PathSlot> backendsWorkingSlots = Maps.newConcurrentMap();
+ for (Backend be :
Catalog.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER))
{
+ if (!backendsWorkingSlots.containsKey(be.getId())) {
+ List<Long> pathHashes =
be.getDisks().values().stream().map(DiskInfo::getPathHash).collect(Collectors.toList());
+ PathSlot slot = new PathSlot(pathHashes,
Config.schedule_slot_num_per_path);
+ backendsWorkingSlots.put(be.getId(), slot);
+ }
+ }
- // Run once for update slots info, scheduler won't select balance cuz
balance is disabled
- tabletScheduler.runAfterCatalogReady();
-
- AgentBatchTask batchTask = new AgentBatchTask();
for (TabletSchedCtx tabletCtx : alternativeTablets) {
LOG.info("try to schedule tablet {}", tabletCtx.getTabletId());
try {
@@ -217,96 +246,17 @@ public class RebalanceTest {
tabletCtx.setSchemaHash(olapTable.getSchemaHashByIndexId(tabletCtx.getIndexId()));
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); //
rebalance tablet should be healthy first
- // createCloneReplicaAndTask, create replica will change
invertedIndex too.
- rebalancer.createBalanceTask(tabletCtx,
tabletScheduler.getBackendsWorkingSlots(), batchTask);
- } catch (SchedException e) {
- LOG.warn("schedule tablet {} failed: {}",
tabletCtx.getTabletId(), e.getMessage());
- }
- }
-
- // Show debug info of MoveInProgressMap detail
- rebalancer.updateLoadStatistic(statisticMap);
- rebalancer.selectAlternativeTablets();
-
- // Get created tasks, and finish them manually
- List<AgentTask> tasks = batchTask.getAllTasks();
- List<Long> needCheckTablets =
tasks.stream().map(AgentTask::getTabletId).collect(Collectors.toList());
- LOG.info("created tasks for tablet: {}", needCheckTablets);
- needCheckTablets.forEach(t -> Assert.assertEquals(4,
invertedIndex.getReplicasByTabletId(t).size()));
-
-// // If clone task execution is too slow, tabletChecker may want to
delete the CLONE replica.
-// tabletChecker.runAfterCatalogReady();
-// Assert.assertTrue(tabletScheduler.containsTablet(50000));
-// // tabletScheduler handle redundant
-// tabletScheduler.runAfterCatalogReady();
-
- for (Long tabletId : needCheckTablets) {
- TabletSchedCtx tabletSchedCtx =
alternativeTablets.stream().filter(ctx -> ctx.getTabletId() ==
tabletId).collect(onlyElement());
- AgentTask task = tasks.stream().filter(t -> t.getTabletId() ==
tabletId).collect(onlyElement());
-
- LOG.info("try to finish tabletCtx {}", tabletId);
- try {
- TFinishTaskRequest fakeReq = new TFinishTaskRequest();
- fakeReq.task_status = new TStatus(TStatusCode.OK);
- fakeReq.finish_tablet_infos = Lists.newArrayList(new
TTabletInfo(tabletSchedCtx.getTabletId(), 5, 1, 0, 0, 0));
- tabletSchedCtx.finishCloneTask((CloneTask) task, fakeReq);
+ AgentTask task = rebalancer.createBalanceTask(tabletCtx,
backendsWorkingSlots);
+ if (tabletCtx.getTabletSize() == 0) {
+ Assert.fail("no exception");
+ } else {
+ Assert.assertTrue(task instanceof
StorageMediaMigrationTask);
+ }
} catch (SchedException e) {
- e.printStackTrace();
+ LOG.info("schedule tablet {} failed: {}",
tabletCtx.getTabletId(), e.getMessage());
}
}
-
- // NeedCheckTablets are redundant, TabletChecker will add them to
TabletScheduler
- tabletChecker.runAfterCatalogReady();
- needCheckTablets.forEach(t -> Assert.assertEquals(4,
invertedIndex.getReplicasByTabletId(t).size()));
- needCheckTablets.forEach(t ->
Assert.assertTrue(tabletScheduler.containsTablet(t)));
-
- // TabletScheduler handle redundant tablet
- tabletScheduler.runAfterCatalogReady();
-
- // One replica is set to DECOMMISSION, still 4 replicas
- needCheckTablets.forEach(t -> {
- List<Replica> replicas = invertedIndex.getReplicasByTabletId(t);
- Assert.assertEquals(4, replicas.size());
- Replica decommissionedReplica = replicas.stream().filter(r ->
r.getState() == Replica.ReplicaState.DECOMMISSION).collect(onlyElement());
- // expected watermarkTxnId is 111
- Assert.assertEquals(111,
decommissionedReplica.getWatermarkTxnId());
- });
-
- // Delete replica should change invertedIndex too
- tabletScheduler.runAfterCatalogReady();
- needCheckTablets.forEach(t -> Assert.assertEquals(3,
invertedIndex.getReplicasByTabletId(t).size()));
-
- // Check moves completed
- rebalancer.selectAlternativeTablets();
- rebalancer.updateLoadStatistic(statisticMap);
- AtomicLong succeeded = Deencapsulation.getField(rebalancer,
"counterBalanceMoveSucceeded");
- Assert.assertEquals(needCheckTablets.size(), succeeded.get());
}
- @Test
- public void testMoveInProgressMap() {
- Configurator.setLevel("org.apache.doris.clone.MovesInProgressCache",
Level.DEBUG);
- MovesCacheMap m = new MovesCacheMap();
- m.updateMapping(statisticMap, 3);
- m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG,
TStorageMedium.HDD).get().put(1L, new Pair<>(null, -1L));
- m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG,
TStorageMedium.SSD).get().put(2L, new Pair<>(null, -1L));
- m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG,
TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
- // Maintenance won't clean up the entries of cache
- m.maintain();
- Assert.assertEquals(3, m.size());
-
- // Reset the expireAfterAccess, the whole cache map will be cleared.
- m.updateMapping(statisticMap, 1);
- Assert.assertEquals(0, m.size());
-
- m.getCache(SystemInfoService.DEFAULT_CLUSTER, Tag.DEFAULT_BACKEND_TAG,
TStorageMedium.SSD).get().put(3L, new Pair<>(null, -1L));
- try {
- Thread.sleep(1000);
- m.maintain();
- Assert.assertEquals(0, m.size());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
index fc4dd7d..e94fa86 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.resource.Tag;
+import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
@@ -183,6 +184,31 @@ public class RebalanceTest {
}
@Test
+ public void testPrioBackends() {
+ Rebalancer rebalancer = new
DiskRebalancer(Catalog.getCurrentSystemInfo(),
Catalog.getCurrentInvertedIndex());
+ // add
+ {
+ List<Backend> backends = Lists.newArrayList();
+ for (int i = 0; i < 3; i++) {
+ backends.add(RebalancerTestUtil.createBackend(10086 + i, 2048,
0));
+ }
+ rebalancer.addPrioBackends(backends, 1000);
+ Assert.assertTrue(rebalancer.hasPrioBackends());
+ }
+
+ // remove
+ for (int i = 0; i < 3; i++) {
+ List<Backend> backends =
Lists.newArrayList(RebalancerTestUtil.createBackend(10086 + i, 2048, 0));
+ rebalancer.removePrioBackends(backends);
+ if (i == 2) {
+ Assert.assertFalse(rebalancer.hasPrioBackends());
+ } else {
+ Assert.assertTrue(rebalancer.hasPrioBackends());
+ }
+ }
+ }
+
+ @Test
public void testPartitionRebalancer() {
Configurator.setLevel("org.apache.doris.clone.PartitionRebalancer",
Level.DEBUG);
@@ -218,7 +244,8 @@ public class RebalanceTest {
tabletCtx.setTabletStatus(Tablet.TabletStatus.HEALTHY); //
rebalance tablet should be healthy first
// createCloneReplicaAndTask, create replica will change
invertedIndex too.
- rebalancer.createBalanceTask(tabletCtx,
tabletScheduler.getBackendsWorkingSlots(), batchTask);
+ AgentTask task = rebalancer.createBalanceTask(tabletCtx,
tabletScheduler.getBackendsWorkingSlots());
+ batchTask.addTask(task);
} catch (SchedException e) {
LOG.warn("schedule tablet {} failed: {}",
tabletCtx.getTabletId(), e.getMessage());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 4b3e4c6..5e02a51 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.doris.clone;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.MaterializedIndex;
@@ -40,14 +41,20 @@ public class RebalancerTestUtil {
// Add only one path, PathHash:id
public static Backend createBackend(long id, long totalCap, long usedCap) {
+ return createBackend(id, totalCap, Lists.newArrayList(usedCap), 1);
+ }
+ // size of usedCaps should equal to diskNum
+ public static Backend createBackend(long id, long totalCap, List<Long>
usedCaps, int diskNum) {
// ip:port won't be checked
Backend be = new Backend(id, "192.168.0." + id, 9051);
Map<String, DiskInfo> disks = Maps.newHashMap();
- DiskInfo diskInfo = new DiskInfo("/path1");
- diskInfo.setPathHash(id);
- diskInfo.setTotalCapacityB(totalCap);
- diskInfo.setDataUsedCapacityB(usedCap);
- disks.put(diskInfo.getRootPath(), diskInfo);
+ for (int i = 0; i < diskNum; i++) {
+ DiskInfo diskInfo = new DiskInfo("/path" + (i + 1));
+ diskInfo.setPathHash(id + i);
+ diskInfo.setTotalCapacityB(totalCap);
+ diskInfo.setDataUsedCapacityB(usedCaps.get(i));
+ disks.put(diskInfo.getRootPath(), diskInfo);
+ }
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
@@ -59,28 +66,37 @@ public class RebalancerTestUtil {
// Only use the partition's baseIndex for simplicity
public static void createTablet(TabletInvertedIndex invertedIndex,
Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
int tabletId, List<Long> beIds) {
+ createTablet(invertedIndex, db, olapTable, partitionName, medium,
tabletId, beIds, null);
+ }
+ public static void createTablet(TabletInvertedIndex invertedIndex,
Database db, OlapTable olapTable, String partitionName, TStorageMedium medium,
+ int tabletId, List<Long> beIds, List<Long>
replicaSizes) {
Partition partition = olapTable.getPartition(partitionName);
MaterializedIndex baseIndex = partition.getBaseIndex();
int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
partition.getId(), baseIndex.getId(),
- schemaHash, medium);
+ schemaHash, medium);
Tablet tablet = new Tablet(tabletId);
// add tablet to olapTable
olapTable.getPartition("p0").getBaseIndex().addTablet(tablet,
tabletMeta);
- createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds);
+ createReplicasAndAddToIndex(invertedIndex, tabletMeta, tablet, beIds,
replicaSizes);
}
// Create replicas on backends which are numbered in beIds.
// The tablet & replicas will be added to invertedIndex.
- public static void createReplicasAndAddToIndex(TabletInvertedIndex
invertedIndex, TabletMeta tabletMeta, Tablet tablet, List<Long> beIds) {
+ public static void createReplicasAndAddToIndex(TabletInvertedIndex
invertedIndex, TabletMeta tabletMeta,
+ Tablet tablet, List<Long>
beIds, List<Long> replicaSizes) {
invertedIndex.addTablet(tablet.getId(), tabletMeta);
IntStream.range(0, beIds.size()).forEach(i -> {
Replica replica = new Replica(tablet.getId() + i, beIds.get(i),
Replica.ReplicaState.NORMAL, 1, tabletMeta.getOldSchemaHash());
// We've set pathHash to beId for simplicity
replica.setPathHash(beIds.get(i));
+ if (replicaSizes != null) {
+ // for disk rebalancer, every beId corresponding to a
replicaSize
+ replica.updateStat(replicaSizes.get(i), 0);
+ }
// isRestore set true, to avoid modifying
Catalog.getCurrentInvertedIndex
tablet.addReplica(replica, true);
invertedIndex.addReplica(tablet.getId(), replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index 375c86f..c1e7d42 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -89,6 +89,7 @@ public class AgentTaskTest {
private AgentTask rollupTask;
private AgentTask schemaChangeTask;
private AgentTask cancelDeleteTask;
+ private AgentTask storageMediaMigrationTask;
@Before
public void setUp() throws AnalysisException {
@@ -140,6 +141,11 @@ public class AgentTaskTest {
new SchemaChangeTask(null, backendId1, dbId, tableId,
partitionId, indexId1,
tabletId1, replicaId1, columns,
schemaHash2, schemaHash1,
shortKeyNum, storageType, null, 0,
TKeysType.AGG_KEYS);
+
+ // storageMediaMigrationTask
+ storageMediaMigrationTask =
+ new StorageMediaMigrationTask(backendId1, tabletId1,
schemaHash1, TStorageMedium.HDD);
+ ((StorageMediaMigrationTask)
storageMediaMigrationTask).setDataDir("/home/a");
}
@Test
@@ -211,6 +217,15 @@ public class AgentTaskTest {
Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType());
Assert.assertEquals(schemaChangeTask.getSignature(),
request6.getSignature());
Assert.assertNotNull(request6.getAlterTabletReq());
+
+ // storageMediaMigrationTask
+ TAgentTaskRequest request7 =
+ (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask,
storageMediaMigrationTask);
+ Assert.assertEquals(TTaskType.STORAGE_MEDIUM_MIGRATE,
request7.getTaskType());
+ Assert.assertEquals(storageMediaMigrationTask.getSignature(),
request7.getSignature());
+ Assert.assertNotNull(request7.getStorageMediumMigrateReq());
+
Assert.assertTrue(request7.getStorageMediumMigrateReq().isSetDataDir());
+
Assert.assertEquals(request7.getStorageMediumMigrateReq().getDataDir(),
"/home/a");
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]