This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e157c2c254 [feature-wip](remote-storage) step3: Support remote
storage, only for be, add migration_task_v2 (#8806)
e157c2c254 is described below
commit e157c2c254a48c53c0ff07a17af6f52fc6b8d0c5
Author: pengxiangyu <[email protected]>
AuthorDate: Fri Apr 22 22:38:10 2022 +0800
[feature-wip](remote-storage) step3: Support remote storage, only for be,
add migration_task_v2 (#8806)
1. Add TStorageMigrationReqV2 and EngineStorageMigrationTask to support
migration action
2. Change TabletManager::create_tablet() for remote storage
3. Change TabletManager::try_delete_unused_tablet_path() for remote storage
---
be/src/agent/agent_server.cpp | 2 +
be/src/agent/agent_server.h | 2 +
be/src/agent/task_worker_pool.cpp | 128 ++++++
be/src/agent/task_worker_pool.h | 8 +-
be/src/common/config.h | 1 +
be/src/common/status.h | 1 +
be/src/olap/CMakeLists.txt | 2 +
be/src/olap/rowset/alpha_rowset.cpp | 2 +-
be/src/olap/rowset/alpha_rowset.h | 2 +-
be/src/olap/rowset/alpha_rowset_writer.cpp | 5 +
be/src/olap/rowset/alpha_rowset_writer.h | 1 +
be/src/olap/rowset/beta_rowset.cpp | 16 +-
be/src/olap/rowset/beta_rowset.h | 5 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 34 ++
be/src/olap/rowset/beta_rowset_writer.h | 2 +
be/src/olap/rowset/rowset.h | 9 +-
be/src/olap/rowset/rowset_writer.h | 2 +
be/src/olap/schema_change.cpp | 4 +-
be/src/olap/storage_engine.cpp | 68 ++-
be/src/olap/storage_engine.h | 5 +-
be/src/olap/storage_migration_v2.cpp | 464 +++++++++++++++++++++
be/src/olap/storage_migration_v2.h | 79 ++++
be/src/olap/tablet.cpp | 6 +-
be/src/olap/tablet_manager.cpp | 67 ++-
be/src/olap/tablet_meta.cpp | 7 +
be/src/olap/tablet_meta.h | 5 +
be/src/olap/task/engine_storage_migration_task.cpp | 2 +-
.../olap/task/engine_storage_migration_task_v2.cpp | 60 +++
.../olap/task/engine_storage_migration_task_v2.h | 44 ++
be/src/runtime/exec_env.h | 1 -
be/src/util/doris_metrics.cpp | 4 +
be/src/util/doris_metrics.h | 3 +
be/src/util/file_utils.h | 2 -
be/src/util/storage_backend_mgr.cpp | 47 ++-
gensrc/thrift/AgentService.thrift | 9 +
gensrc/thrift/Types.thrift | 3 +-
36 files changed, 1041 insertions(+), 61 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index bf536213a7..6e754f941a 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -90,6 +90,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION,
_submit_table_compaction_workers);
+ CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE_V2,
_storage_medium_migrate_v2_workers);
#undef CREATE_AND_START_POOL
#undef CREATE_AND_START_THREAD
@@ -152,6 +153,7 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO,
_update_tablet_meta_info_workers,
update_tablet_meta_info_req);
HANDLE_TYPE(TTaskType::COMPACTION,
_submit_table_compaction_workers, compaction_req);
+ HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE_V2,
_storage_medium_migrate_v2_workers, storage_migration_req_v2);
case TTaskType::REALTIME_PUSH:
case TTaskType::PUSH:
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 8f8c9da072..09144f3dee 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -89,6 +89,8 @@ private:
std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;
+ std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_v2_workers;
+
std::unique_ptr<TopicSubscriber> _topic_subscriber;
};
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 23620861c2..6b22584153 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -44,6 +44,7 @@
#include "olap/task/engine_clone_task.h"
#include "olap/task/engine_publish_version_task.h"
#include "olap/task/engine_storage_migration_task.h"
+#include "olap/task/engine_storage_migration_task_v2.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
#include "runtime/snapshot_loader.h"
@@ -194,6 +195,11 @@ void TaskWorkerPool::start() {
cb =
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
this);
break;
+ case TaskWorkerType::STORAGE_MEDIUM_MIGRATE_V2:
+ _worker_count = 1;
+ cb =
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback,
+ this);
+ break;
default:
// pass
break;
@@ -1650,4 +1656,126 @@ void
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
}
}
+void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
+ while (_is_work) {
+ TAgentTaskRequest agent_task_req;
+ {
+ std::unique_lock<std::mutex>
worker_thread_lock(_worker_thread_lock);
+ while (_is_work && _tasks.empty()) {
+ _worker_thread_condition_variable.wait(worker_thread_lock);
+ }
+ if (!_is_work) {
+ return;
+ }
+
+ agent_task_req = _tasks.front();
+ _tasks.pop_front();
+ }
+ int64_t signature = agent_task_req.signature;
+ LOG(INFO) << "get migration table v2 task, signature: " <<
agent_task_req.signature;
+ bool is_task_timeout = false;
+ if (agent_task_req.__isset.recv_time) {
+ int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
+ if (time_elapsed > config::report_task_interval_seconds * 20) {
+ LOG(INFO) << "task elapsed " << time_elapsed
+ << " seconds since it is inserted to queue, it is
timeout";
+ is_task_timeout = true;
+ }
+ }
+ if (!is_task_timeout) {
+ TFinishTaskRequest finish_task_request;
+ TTaskType::type task_type = agent_task_req.task_type;
+ switch (task_type) {
+ case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
+ _storage_medium_migrate_v2(agent_task_req, signature,
task_type, &finish_task_request);
+ break;
+ default:
+ // pass
+ break;
+ }
+ _finish_task(finish_task_request);
+ }
+ _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+ }
+}
+
+void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest&
agent_task_req, int64_t signature,
+ const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request) {
+ Status status = Status::OK();
+ TStatus task_status;
+ std::vector<string> error_msgs;
+
+ string process_name;
+ switch (task_type) {
+ case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
+ process_name = "StorageMediumMigrationV2";
+ break;
+ default:
+ std::string task_name;
+ EnumToString(TTaskType, task_type, task_name);
+ LOG(WARNING) << "Storage medium migration v2 type invalid. type: "
<< task_name
+ << ", signature: " << signature;
+ status = Status::NotSupported("Storage medium migration v2 type
invalid");
+ break;
+ }
+
+ // Check last storage medium migration v2 status, if failed delete tablet
file
+ // Do not need to adjust delete success or not
+ // Because if delete failed task will failed
+ TTabletId new_tablet_id;
+ TSchemaHash new_schema_hash = 0;
+ if (status.ok()) {
+ new_tablet_id = agent_task_req.storage_migration_req_v2.new_tablet_id;
+ new_schema_hash =
agent_task_req.storage_migration_req_v2.new_schema_hash;
+ EngineStorageMigrationTaskV2
engine_task(agent_task_req.storage_migration_req_v2);
+ Status sc_status = _env->storage_engine()->execute_task(&engine_task);
+ if (!sc_status.ok()) {
+ if (sc_status ==
Status::OLAPInternalError(OLAP_ERR_DATA_QUALITY_ERR)) {
+ error_msgs.push_back("The data quality does not satisfy,
please check your data. ");
+ }
+ status = Status::DataQualityError("The data quality does not
satisfy");
+ } else {
+ status = Status::OK();
+ }
+ }
+
+ if (status.ok()) {
+ ++_s_report_version;
+ LOG(INFO) << process_name << " finished. signature: " << signature;
+ }
+
+ // Return result to fe
+ finish_task_request->__set_backend(_backend);
+ finish_task_request->__set_report_version(_s_report_version);
+ finish_task_request->__set_task_type(task_type);
+ finish_task_request->__set_signature(signature);
+
+ std::vector<TTabletInfo> finish_tablet_infos;
+ if (status.ok()) {
+ TTabletInfo tablet_info;
+ status = _get_tablet_info(new_tablet_id, new_schema_hash, signature,
&tablet_info);
+
+ if (!status.ok()) {
+ LOG(WARNING) << process_name << " success, but get new tablet info
failed."
+ << "tablet_id: " << new_tablet_id << ", schema_hash:
" << new_schema_hash
+ << ", signature: " << signature;
+ } else {
+ finish_tablet_infos.push_back(tablet_info);
+ }
+ }
+
+ if (status.ok()) {
+ finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
+ LOG(INFO) << process_name << " success. signature: " << signature;
+ error_msgs.push_back(process_name + " success");
+ } else {
+ LOG(WARNING) << process_name << " failed. signature: " << signature;
+ error_msgs.push_back(process_name + " failed");
+ error_msgs.push_back("status: " + status.to_string());
+ }
+ task_status.__set_status_code(status.code());
+ task_status.__set_error_msgs(error_msgs);
+ finish_task_request->__set_task_status(task_status);
+}
+
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index d03afb1790..e413211357 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -68,7 +68,8 @@ public:
MOVE,
RECOVER_TABLET,
UPDATE_TABLET_META_INFO,
- SUBMIT_TABLE_COMPACTION
+ SUBMIT_TABLE_COMPACTION,
+ STORAGE_MEDIUM_MIGRATE_V2
};
enum ReportType { TASK, DISK, TABLET };
@@ -124,6 +125,8 @@ public:
return "UPDATE_TABLET_META_INFO";
case SUBMIT_TABLE_COMPACTION:
return "SUBMIT_TABLE_COMPACTION";
+ case STORAGE_MEDIUM_MIGRATE_V2:
+ return "STORAGE_MEDIUM_MIGRATE_V2";
default:
return "Unknown";
}
@@ -187,6 +190,7 @@ private:
void _move_dir_thread_callback();
void _update_tablet_meta_worker_thread_callback();
void _submit_table_compaction_worker_thread_callback();
+ void _storage_medium_migrate_v2_worker_thread_callback();
void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t
signature,
const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
@@ -203,6 +207,8 @@ private:
// random sleep 1~second seconds
void _random_sleep(int second);
+ void _storage_medium_migrate_v2(const TAgentTaskRequest& agent_task_req,
int64_t signature,
+ const TTaskType::type task_type, TFinishTaskRequest*
finish_task_request);
private:
std::string _name;
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a93229cdf8..5bc62f0123 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -189,6 +189,7 @@ CONF_mInt64(column_dictionary_key_ratio_threshold, "0");
CONF_mInt64(column_dictionary_key_size_threshold, "0");
// memory_limitation_per_thread_for_schema_change_bytes unit bytes
CONF_mInt64(memory_limitation_per_thread_for_schema_change_bytes,
"2147483648");
+CONF_mInt64(memory_limitation_per_thread_for_storage_migration_bytes,
"100000000");
// the clean interval of file descriptor cache and segment cache
CONF_mInt32(cache_clean_interval, "1800");
diff --git a/be/src/common/status.h b/be/src/common/status.h
index ec74be3ef0..ad884cee5e 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -231,6 +231,7 @@ namespace doris {
M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true) \
M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \
M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \
+ M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \
enum ErrorCode {
#define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE,
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 4298d48bf9..657ebe3518 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -71,6 +71,7 @@ add_library(Olap STATIC
version_graph.cpp
schema.cpp
schema_change.cpp
+ storage_migration_v2.cpp
serialize.cpp
storage_engine.cpp
data_dir.cpp
@@ -116,6 +117,7 @@ add_library(Olap STATIC
task/engine_checksum_task.cpp
task/engine_clone_task.cpp
task/engine_storage_migration_task.cpp
+ task/engine_storage_migration_task_v2.cpp
task/engine_publish_version_task.cpp
task/engine_alter_tablet_task.cpp
column_vector.cpp
diff --git a/be/src/olap/rowset/alpha_rowset.cpp
b/be/src/olap/rowset/alpha_rowset.cpp
index 80a0a5eebf..62bac2929a 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -100,7 +100,7 @@ Status AlphaRowset::link_files_to(const FilePathDesc&
dir_desc, RowsetId new_row
return Status::OK();
}
-Status AlphaRowset::copy_files_to(const std::string& dir) {
+Status AlphaRowset::copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) {
for (auto& segment_group : _segment_groups) {
Status status = segment_group->copy_files_to(dir);
if (!status.ok()) {
diff --git a/be/src/olap/rowset/alpha_rowset.h
b/be/src/olap/rowset/alpha_rowset.h
index d902e0ca97..73651f920f 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -49,7 +49,7 @@ public:
Status link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id)
override;
- Status copy_files_to(const std::string& dir) override;
+ Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) override;
Status convert_from_old_files(const std::string& snapshot_path,
std::vector<std::string>* success_files);
diff --git a/be/src/olap/rowset/alpha_rowset_writer.cpp
b/be/src/olap/rowset/alpha_rowset_writer.cpp
index 1e0e9c50a8..03cded4aab 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.cpp
+++ b/be/src/olap/rowset/alpha_rowset_writer.cpp
@@ -133,6 +133,11 @@ Status
AlphaRowsetWriter::add_rowset_for_linked_schema_change(
return Status::OK();
}
+Status AlphaRowsetWriter::add_rowset_for_migration(RowsetSharedPtr rowset) {
+ LOG(WARNING) << "alpha_rowset_writer doesn't support
add_rowset_for_migration";
+ return Status::NotSupported("alpha_rowset_writer doesn't support
add_rowset_for_migration");
+}
+
Status AlphaRowsetWriter::flush() {
if (_writer_state == WRITER_FLUSHED) {
return Status::OK();
diff --git a/be/src/olap/rowset/alpha_rowset_writer.h
b/be/src/olap/rowset/alpha_rowset_writer.h
index 590e7b517e..9928aad948 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.h
+++ b/be/src/olap/rowset/alpha_rowset_writer.h
@@ -43,6 +43,7 @@ public:
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
const SchemaMapping&
schema_mapping) override;
+ Status add_rowset_for_migration(RowsetSharedPtr rowset) override;
Status flush() override;
// get a rowset
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index b9cd3aee8a..38a85d3a72 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -132,9 +132,9 @@ Status BetaRowset::link_files_to(const FilePathDesc&
dir_desc, RowsetId new_rows
return Status::OK();
}
-Status BetaRowset::copy_files_to(const std::string& dir) {
+Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) {
for (int i = 0; i < num_segments(); ++i) {
- FilePathDesc dst_path_desc = segment_file_path(dir, rowset_id(), i);
+ FilePathDesc dst_path_desc = segment_file_path(dir, new_rowset_id, i);
Status status = Env::Default()->path_exists(dst_path_desc.filepath);
if (status.ok()) {
LOG(WARNING) << "file already exist: " << dst_path_desc.filepath;
@@ -154,7 +154,8 @@ Status BetaRowset::copy_files_to(const std::string& dir) {
return Status::OK();
}
-Status BetaRowset::upload_files_to(const FilePathDesc& dir_desc) {
+Status BetaRowset::upload_files_to(const FilePathDesc& dir_desc,
+ const RowsetId& new_rowset_id, bool delete_src) {
std::shared_ptr<StorageBackend> storage_backend =
StorageBackendMgr::instance()->
get_storage_backend(dir_desc.storage_name);
if (storage_backend == nullptr) {
@@ -162,13 +163,12 @@ Status BetaRowset::upload_files_to(const FilePathDesc&
dir_desc) {
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
for (int i = 0; i < num_segments(); ++i) {
- FilePathDesc dst_path_desc = segment_file_path(dir_desc, rowset_id(),
i);
+ FilePathDesc dst_path_desc = segment_file_path(dir_desc,
new_rowset_id, i);
Status status = storage_backend->exist(dst_path_desc.remote_path);
if (status.ok()) {
LOG(WARNING) << "file already exist: " <<
dst_path_desc.remote_path;
return Status::OLAPInternalError(OLAP_ERR_FILE_ALREADY_EXIST);
- }
- if (!status.is_not_found()) {
+ } else if (!status.is_not_found()) {
LOG(WARNING) << "file check exist error: " <<
dst_path_desc.remote_path;
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
@@ -179,6 +179,10 @@ Status BetaRowset::upload_files_to(const FilePathDesc&
dir_desc) {
<< dst_path_desc.remote_path << ", errno=" <<
Errno::no();
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
+ if (delete_src &&
!Env::Default()->delete_file(src_path_desc.filepath).ok()) {
+ LOG(WARNING) << "fail to delete local file: " <<
src_path_desc.filepath << ", errno=" << Errno::no();
+ return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ }
LOG(INFO) << "succeed to upload file. from " << src_path_desc.filepath
<< " to "
<< dst_path_desc.remote_path;
}
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index d7e38389bd..ffb6467c57 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -50,9 +50,10 @@ public:
Status link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id)
override;
- Status copy_files_to(const std::string& dir) override;
+ Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) override;
- Status upload_files_to(const FilePathDesc& dir_desc) override;
+ Status upload_files_to(const FilePathDesc& dir_desc,
+ const RowsetId& new_rowset_id, bool delete_src =
false) override;
// only applicable to alpha rowset, no op here
Status remove_old_files(std::vector<std::string>* files_to_remove)
override {
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5c6b35c438..43b7c61004 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -141,6 +141,40 @@ Status
BetaRowsetWriter::add_rowset_for_linked_schema_change(
return add_rowset(rowset);
}
+Status BetaRowsetWriter::add_rowset_for_migration(RowsetSharedPtr rowset) {
+ Status res = Status::OK();
+ assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
+ if (!rowset->rowset_path_desc().is_remote() &&
!_context.path_desc.is_remote()) {
+ res = rowset->copy_files_to(_context.path_desc.filepath,
_context.rowset_id);
+ if (!res.ok()) {
+ LOG(WARNING) << "copy_files failed. src: " <<
rowset->rowset_path_desc().filepath
+ << ", dest: " << _context.path_desc.filepath;
+ return res;
+ }
+ } else if (!rowset->rowset_path_desc().is_remote() &&
_context.path_desc.is_remote()) {
+ res = rowset->upload_files_to(_context.path_desc, _context.rowset_id);
+ if (!res.ok()) {
+ LOG(WARNING) << "upload_files failed. src: " <<
rowset->rowset_path_desc().debug_string()
+ << ", dest: " << _context.path_desc.debug_string();
+ return res;
+ }
+ } else {
+ LOG(WARNING) << "add_rowset_for_migration failed. storage_medium is
invalid. src: "
+ << rowset->rowset_path_desc().debug_string() << ", dest: " <<
_context.path_desc.debug_string();
+ return Status::OLAPInternalError(OLAP_ERR_ROWSET_ADD_MIGRATION_V2);
+ }
+
+ _num_rows_written += rowset->num_rows();
+ _total_data_size += rowset->rowset_meta()->data_disk_size();
+ _total_index_size += rowset->rowset_meta()->index_disk_size();
+ _num_segment += rowset->num_segments();
+ // TODO update zonemap
+ if (rowset->rowset_meta()->has_delete_predicate()) {
+
_rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
+ }
+ return Status::OK();
+}
+
Status BetaRowsetWriter::flush() {
if (_segment_writer != nullptr) {
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 23f7d207c1..8f9b54b51e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -49,6 +49,8 @@ public:
Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
const SchemaMapping&
schema_mapping) override;
+ Status add_rowset_for_migration(RowsetSharedPtr rowset) override;
+
Status flush() override;
// Return the file size flushed to disk in "flush_size"
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 5674c3703a..2e91f98588 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -196,9 +196,10 @@ public:
virtual Status link_files_to(const FilePathDesc& dir_desc, RowsetId
new_rowset_id) = 0;
// copy all files to `dir`
- virtual Status copy_files_to(const std::string& dir) = 0;
+ virtual Status copy_files_to(const std::string& dir, const RowsetId&
new_rowset_id) = 0;
- virtual Status upload_files_to(const FilePathDesc& dir_desc) { return
Status::OK(); }
+ virtual Status upload_files_to(const FilePathDesc& dir_desc,
+ const RowsetId&, bool delete_src = false) { return
Status::OK(); }
virtual Status remove_old_files(std::vector<std::string>* files_to_remove)
= 0;
@@ -216,6 +217,10 @@ public:
bool contains_version(Version version) { return
rowset_meta()->version().contains(version); }
+ FilePathDesc rowset_path_desc() {
+ return _rowset_path_desc;
+ }
+
static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr&
right) {
return left->end_version() < right->end_version();
}
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 43d1837362..6fb290e3dc 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -50,6 +50,8 @@ public:
virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
const
SchemaMapping& schema_mapping) = 0;
+ virtual Status add_rowset_for_migration(RowsetSharedPtr rowset) = 0;
+
// explicit flush all buffered rows into segment file.
// note that `add_row` could also trigger flush when certain conditions
are met
virtual Status flush() = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 11b280be7b..742987f064 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1461,8 +1461,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
{
std::lock_guard<std::mutex>
base_tablet_lock(base_tablet->get_push_lock());
std::lock_guard<std::mutex>
new_tablet_lock(new_tablet->get_push_lock());
- std::lock_guard<std::shared_mutex>
base_tablet_rdlock(base_tablet->get_header_lock());
- std::lock_guard<std::shared_mutex>
new_tablet_rdlock(new_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex>
base_tablet_wlock(base_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex>
new_tablet_wlock(new_tablet->get_header_lock());
// check if the tablet has alter task
// if it has alter task, it means it is under old alter process
size_t num_cols = base_tablet->tablet_schema().num_columns();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 65359ea61d..9816b1b6b5 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -26,6 +26,7 @@
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/trim.hpp>
#include <cstdio>
#include <filesystem>
#include <new>
@@ -36,6 +37,7 @@
#include "agent/cgroups_mgr.h"
#include "agent/task_worker_pool.h"
#include "env/env.h"
+#include "env/env_util.h"
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/data_dir.h"
@@ -58,6 +60,8 @@
#include "util/file_utils.h"
#include "util/pretty_printer.h"
#include "util/scoped_cleanup.h"
+#include "util/storage_backend.h"
+#include "util/storage_backend_mgr.h"
#include "util/time.h"
#include "util/trace.h"
@@ -120,6 +124,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
MemTrackerLevel::OVERVIEW)),
_schema_change_mem_tracker(MemTracker::create_tracker(
-1, "StorageEngine::SchemaChange", nullptr,
MemTrackerLevel::OVERVIEW)),
+ _storage_migration_mem_tracker(MemTracker::create_tracker(
+ -1, "StorageEngine::StorageMigration", nullptr,
MemTrackerLevel::OVERVIEW)),
_clone_mem_tracker(MemTracker::create_tracker(-1,
"StorageEngine::Clone", nullptr,
MemTrackerLevel::OVERVIEW)),
_batch_load_mem_tracker(MemTracker::create_tracker(-1,
"StorageEngine::BatchLoad",
@@ -480,7 +486,9 @@ std::vector<DataDir*>
StorageEngine::get_stores_for_create_tablet(
for (auto& it : _store_map) {
if (it.second->is_used()) {
if (_available_storage_medium_type_count == 1 ||
- it.second->storage_medium() == storage_medium) {
+ it.second->storage_medium() == storage_medium ||
+ (it.second->storage_medium() ==
TStorageMedium::REMOTE_CACHE
+ && FilePathDesc::is_remote(storage_medium))) {
stores.push_back(it.second);
}
}
@@ -684,18 +692,20 @@ Status StorageEngine::start_trash_sweep(double* usage,
bool ignore_guard) {
tmp_usage = std::max(tmp_usage, curr_usage);
Status curr_res = Status::OK();
- string snapshot_path = info.path_desc.filepath + SNAPSHOT_PREFIX;
- curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
+ FilePathDesc snapshot_path_desc(info.path_desc.filepath +
SNAPSHOT_PREFIX);
+ curr_res = _do_sweep(snapshot_path_desc, local_now, snapshot_expire);
if (!curr_res.ok()) {
- LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path
+ LOG(WARNING) << "failed to sweep snapshot. path=" <<
snapshot_path_desc.filepath
<< ", err_code=" << curr_res;
res = curr_res;
}
- string trash_path = info.path_desc.filepath + TRASH_PREFIX;
- curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ?
0 : trash_expire);
+ FilePathDescStream trash_path_desc_s;
+ trash_path_desc_s << info.path_desc << TRASH_PREFIX;
+ FilePathDesc trash_path_desc = trash_path_desc_s.path_desc();
+ curr_res = _do_sweep(trash_path_desc, local_now, curr_usage >
guard_space ? 0 : trash_expire);
if (!curr_res.ok()) {
- LOG(WARNING) << "failed to sweep trash. [path=%s" << trash_path
+ LOG(WARNING) << "failed to sweep trash. path=" <<
trash_path_desc.filepath
<< ", err_code=" << curr_res;
res = curr_res;
}
@@ -804,10 +814,10 @@ void StorageEngine::_clean_unused_txns() {
}
}
-Status StorageEngine::_do_sweep(const string& scan_root, const time_t&
local_now,
+Status StorageEngine::_do_sweep(const FilePathDesc& scan_root_desc, const
time_t& local_now,
const int32_t expire) {
Status res = Status::OK();
- if (!FileUtils::check_exist(scan_root)) {
+ if (!FileUtils::check_exist(scan_root_desc.filepath)) {
// dir not existed. no need to sweep trash.
return res;
}
@@ -815,7 +825,7 @@ Status StorageEngine::_do_sweep(const string& scan_root,
const time_t& local_now
try {
// Sort pathes by name, that is by delete time.
std::vector<path> sorted_pathes;
- std::copy(directory_iterator(path(scan_root)), directory_iterator(),
+ std::copy(directory_iterator(path(scan_root_desc.filepath)),
directory_iterator(),
std::back_inserter(sorted_pathes));
std::sort(sorted_pathes.begin(), sorted_pathes.end());
for (const auto& sorted_path : sorted_pathes) {
@@ -840,10 +850,42 @@ Status StorageEngine::_do_sweep(const string& scan_root,
const time_t& local_now
string path_name = sorted_path.string();
if (difftime(local_now, mktime(&local_tm_create)) >=
actual_expire) {
+ std::string storage_name_path = path_name + "/" + STORAGE_NAME;
+ if (scan_root_desc.is_remote() &&
FileUtils::check_exist(storage_name_path)) {
+ FilePathDesc remote_path_desc = scan_root_desc;
+ if (!env_util::read_file_to_string(Env::Default(),
storage_name_path, &(remote_path_desc.storage_name)).ok()) {
+ LOG(WARNING) << "read storage_name failed: " <<
storage_name_path;
+ continue;
+ }
+ boost::algorithm::trim(remote_path_desc.storage_name);
+ std::shared_ptr<StorageBackend> storage_backend =
StorageBackendMgr::instance()->
+ get_storage_backend(remote_path_desc.storage_name);
+ // if storage_backend is nullptr, the remote storage is
invalid.
+ // Only the local path need to be removed.
+ if (storage_backend != nullptr) {
+ std::string remote_root_path;
+ if (!StorageBackendMgr::instance()->get_root_path(
+ remote_path_desc.storage_name,
&remote_root_path)) {
+ LOG(WARNING) << "read storage root_path failed: "
<< remote_path_desc.storage_name;
+ continue;
+ }
+ remote_path_desc.remote_path = remote_root_path +
TRASH_PREFIX;
+ std::filesystem::path local_path(path_name);
+ std::stringstream remote_file_stream;
+ remote_file_stream << remote_path_desc.remote_path <<
"/" << local_path.filename().string();
+ Status ret =
storage_backend->rmdir(remote_file_stream.str());
+ if (!ret.ok()) {
+ LOG(WARNING) << "fail to remove file or directory.
path=" << remote_file_stream.str()
+ << ", error=" << ret.to_string();
+ res = Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+ continue;
+ }
+ }
+ }
Status ret = FileUtils::remove_all(path_name);
if (!ret.ok()) {
- LOG(WARNING) << "fail to remove file or directory. path="
<< path_name
- << ", error=" << ret.to_string();
+ LOG(WARNING) << "fail to remove file or directory.
path_desc: "
+ << scan_root_desc.debug_string() << ",
error=" << ret.to_string();
res = Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
continue;
}
@@ -853,7 +895,7 @@ Status StorageEngine::_do_sweep(const string& scan_root,
const time_t& local_now
}
}
} catch (...) {
- LOG(WARNING) << "Exception occur when scan directory. path=" <<
scan_root;
+ LOG(WARNING) << "Exception occur when scan directory. path_desc=" <<
scan_root_desc.debug_string();
res = Status::OLAPInternalError(OLAP_ERR_IO_ERROR);
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 0cc0076007..5b6760252f 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -182,6 +182,7 @@ public:
std::shared_ptr<MemTracker> compaction_mem_tracker() { return
_compaction_mem_tracker; }
std::shared_ptr<MemTracker> tablet_mem_tracker() { return
_tablet_mem_tracker; }
std::shared_ptr<MemTracker> schema_change_mem_tracker() { return
_schema_change_mem_tracker; }
+ std::shared_ptr<MemTracker> storage_migration_mem_tracker() { return
_storage_migration_mem_tracker; }
std::shared_ptr<MemTracker> clone_mem_tracker() { return
_clone_mem_tracker; }
std::shared_ptr<MemTracker> batch_load_mem_tracker() { return
_batch_load_mem_tracker; }
std::shared_ptr<MemTracker> consistency_mem_tracker() { return
_consistency_mem_tracker; }
@@ -214,7 +215,7 @@ private:
void _clean_unused_rowset_metas();
- Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
+ Status _do_sweep(const FilePathDesc& scan_root_desc, const time_t&
local_tm_now,
const int32_t expire);
// All these xxx_callback() functions are for Background threads
@@ -327,6 +328,8 @@ private:
std::shared_ptr<MemTracker> _tablet_mem_tracker;
// Count the memory consumption of all SchemaChange tasks.
std::shared_ptr<MemTracker> _schema_change_mem_tracker;
+ // Count the memory consumption of all StorageMigration tasks.
+ std::shared_ptr<MemTracker> _storage_migration_mem_tracker;
// Count the memory consumption of all EngineCloneTask.
// Note: Memory that does not contain make/release snapshots.
std::shared_ptr<MemTracker> _clone_mem_tracker;
diff --git a/be/src/olap/storage_migration_v2.cpp
b/be/src/olap/storage_migration_v2.cpp
new file mode 100644
index 0000000000..a0c5716bc5
--- /dev/null
+++ b/be/src/olap/storage_migration_v2.cpp
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/storage_migration_v2.h"
+
+#include <pthread.h>
+#include <signal.h>
+
+#include <algorithm>
+#include <vector>
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/stringbuffer.h"
+
+#include "agent/cgroups_mgr.h"
+#include "common/resource_tls.h"
+#include "env/env_util.h"
+#include "olap/merger.h"
+#include "olap/row.h"
+#include "olap/row_block.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/wrapper_field.h"
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "util/defer_op.h"
+
+using std::deque;
+using std::list;
+using std::nothrow;
+using std::pair;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace doris {
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(storage_migration_mem_consumption,
MetricUnit::BYTES, "",
+ mem_consumption, Labels({{"type",
"storage_migration"}}));
+
+
+StorageMigrationV2Handler::StorageMigrationV2Handler()
+ : _mem_tracker(MemTracker::create_tracker(
+ -1, "StorageMigrationV2Handler",
StorageEngine::instance()->storage_migration_mem_tracker())) {
+ REGISTER_HOOK_METRIC(storage_migration_mem_consumption,
+ [this]() { return _mem_tracker->consumption(); });
+}
+
+StorageMigrationV2Handler::~StorageMigrationV2Handler() {
+ DEREGISTER_HOOK_METRIC(storage_migration_mem_consumption);
+}
+
+Status StorageMigrationV2Handler::process_storage_migration_v2(const
TStorageMigrationReqV2& request) {
+ LOG(INFO) << "begin to do request storage_migration: base_tablet_id=" <<
request.base_tablet_id
+ << ", new_tablet_id=" << request.new_tablet_id
+ << ", migration_version=" << request.migration_version;
+
+ TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ request.base_tablet_id, request.base_schema_hash);
+ if (base_tablet == nullptr) {
+ LOG(WARNING) << "fail to find base tablet. base_tablet=" <<
request.base_tablet_id;
+ return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+ }
+ // Lock schema_change_lock util schema change info is stored in tablet
header
+ std::unique_lock<std::mutex>
schema_change_lock(base_tablet->get_schema_change_lock(), std::try_to_lock);
+ if (!schema_change_lock.owns_lock()) {
+ LOG(WARNING) << "failed to obtain schema change lock. "
+ << "base_tablet=" << request.base_tablet_id;
+ return Status::OLAPInternalError(OLAP_ERR_TRY_LOCK_FAILED);
+ }
+
+ Status res = _do_process_storage_migration_v2(request);
+ LOG(INFO) << "finished storage_migration process, res=" << res;
+ return res;
+}
+
+Status StorageMigrationV2Handler::_do_process_storage_migration_v2(const
TStorageMigrationReqV2& request) {
+ Status res = Status::OK();
+ TabletSharedPtr base_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ request.base_tablet_id, request.base_schema_hash);
+ if (base_tablet == nullptr) {
+ LOG(WARNING) << "fail to find base tablet. base_tablet=" <<
request.base_tablet_id
+ << ", base_schema_hash=" << request.base_schema_hash;
+ return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+ }
+
+ // new tablet has to exist
+ TabletSharedPtr new_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
+ request.new_tablet_id, request.new_schema_hash);
+ if (new_tablet == nullptr) {
+ LOG(WARNING) << "fail to find new tablet."
+ << " new_tablet=" << request.new_tablet_id
+ << ", new_schema_hash=" << request.new_schema_hash;
+ return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+ }
+
+ // check if tablet's state is not_ready, if it is ready, it means the
tablet already finished
+ // check whether the tablet's max continuous version == request.version
+ if (new_tablet->tablet_state() != TABLET_NOTREADY) {
+ res = _validate_migration_result(new_tablet, request);
+ LOG(INFO) << "tablet's state=" << new_tablet->tablet_state()
+ << " the convert job already finished, check its version"
+ << " res=" << res;
+ return res;
+ }
+
+ LOG(INFO) << "finish to validate storage_migration request. begin to
migrate data from base tablet "
+ "to new tablet"
+ << " base_tablet=" << base_tablet->full_name()
+ << " new_tablet=" << new_tablet->full_name();
+
+ std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(),
std::try_to_lock);
+ if (!base_migration_rlock.owns_lock()) {
+ return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
+ }
+ std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(),
std::try_to_lock);
+ if (!new_migration_rlock.owns_lock()) {
+ return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
+ }
+
+ std::vector<Version> versions_to_be_changed;
+ std::vector<RowsetReaderSharedPtr> rs_readers;
+ // delete handlers for new tablet
+ DeleteHandler delete_handler;
+ std::vector<ColumnId> return_columns;
+
+ // begin to find deltas to convert from base tablet to new tablet so that
+ // obtain base tablet and new tablet's push lock and header write lock to
prevent loading data
+ {
+ std::lock_guard<std::mutex>
base_tablet_lock(base_tablet->get_push_lock());
+ std::lock_guard<std::mutex>
new_tablet_lock(new_tablet->get_push_lock());
+ std::lock_guard<std::shared_mutex>
base_tablet_wlock(base_tablet->get_header_lock());
+ std::lock_guard<std::shared_mutex>
new_tablet_wlock(new_tablet->get_header_lock());
+ // check if the tablet has alter task
+ // if it has alter task, it means it is under old alter process
+ size_t num_cols = base_tablet->tablet_schema().num_columns();
+ return_columns.resize(num_cols);
+ for (int i = 0; i < num_cols; ++i) {
+ return_columns[i] = i;
+ }
+
+ // reader_context is stack variables, it's lifetime should keep the
same
+ // with rs_readers
+ RowsetReaderContext reader_context;
+ reader_context.reader_type = READER_ALTER_TABLE;
+ reader_context.tablet_schema = &base_tablet->tablet_schema();
+ reader_context.need_ordered_result = true;
+ reader_context.delete_handler = &delete_handler;
+ reader_context.return_columns = &return_columns;
+ // for schema change, seek_columns is the same to return_columns
+ reader_context.seek_columns = &return_columns;
+ reader_context.sequence_id_idx =
reader_context.tablet_schema->sequence_col_idx();
+
+ do {
+ // get history data to be converted and it will check if there is
hold in base tablet
+ res = _get_versions_to_be_changed(base_tablet,
&versions_to_be_changed);
+ if (!res.ok()) {
+ LOG(WARNING) << "fail to get version to be changed. res=" <<
res;
+ break;
+ }
+
+ // should check the max_version >= request.migration_version, if
not the convert is useless
+ RowsetSharedPtr max_rowset =
base_tablet->rowset_with_max_version();
+ if (max_rowset == nullptr || max_rowset->end_version() <
request.migration_version) {
+ LOG(WARNING) << "base tablet's max version="
+ << (max_rowset == nullptr ? 0 :
max_rowset->end_version())
+ << " is less than request version=" <<
request.migration_version;
+ res = Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST);
+ break;
+ }
+ // before calculating version_to_be_changed,
+ // remove all data from new tablet, prevent to rewrite data(those
double pushed when wait)
+ LOG(INFO) << "begin to remove all data from new tablet to prevent
rewrite."
+ << " new_tablet=" << new_tablet->full_name();
+ std::vector<RowsetSharedPtr> rowsets_to_delete;
+ std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+ new_tablet->acquire_version_and_rowsets(&version_rowsets);
+ for (auto& pair : version_rowsets) {
+ if (pair.first.second <= max_rowset->end_version()) {
+ rowsets_to_delete.push_back(pair.second);
+ }
+ }
+ std::vector<RowsetSharedPtr> empty_vec;
+ new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
+ // inherit cumulative_layer_point from base_tablet
+ // check if new_tablet.ce_point > base_tablet.ce_point?
+ new_tablet->set_cumulative_layer_point(-1);
+ // save tablet meta
+ new_tablet->save_meta();
+ for (auto& rowset : rowsets_to_delete) {
+ // do not call rowset.remove directly, using gc thread to
delete it
+ StorageEngine::instance()->add_unused_rowset(rowset);
+ }
+
+ // init one delete handler
+ int32_t end_version = -1;
+ for (auto& version : versions_to_be_changed) {
+ if (version.second > end_version) {
+ end_version = version.second;
+ }
+ }
+
+ res = delete_handler.init(base_tablet->tablet_schema(),
base_tablet->delete_predicates(),
+ end_version);
+ if (!res.ok()) {
+ LOG(WARNING) << "init delete handler failed. base_tablet=" <<
base_tablet->full_name()
+ << ", end_version=" << end_version;
+
+ // release delete handlers which have been inited successfully.
+ delete_handler.finalize();
+ break;
+ }
+
+ // acquire data sources correspond to history versions
+ base_tablet->capture_rs_readers(versions_to_be_changed,
&rs_readers);
+ if (rs_readers.size() < 1) {
+ LOG(WARNING) << "fail to acquire all data sources. "
+ << "version_num=" << versions_to_be_changed.size()
+ << ", data_source_num=" << rs_readers.size();
+ res =
Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
+ break;
+ }
+
+ for (auto& rs_reader : rs_readers) {
+ res = rs_reader->init(&reader_context);
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to init rowset reader: " <<
base_tablet->full_name();
+ break;
+ }
+ }
+
+ } while (0);
+ }
+
+ do {
+ if (!res.ok()) {
+ break;
+ }
+ StorageMigrationParams sm_params;
+ sm_params.base_tablet = base_tablet;
+ sm_params.new_tablet = new_tablet;
+ sm_params.ref_rowset_readers = rs_readers;
+ sm_params.delete_handler = &delete_handler;
+
+ res = _convert_historical_rowsets(sm_params);
+ if (!res.ok()) {
+ break;
+ }
+ // set state to ready
+ std::lock_guard<std::shared_mutex>
new_wlock(new_tablet->get_header_lock());
+ res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
+ if (!res.ok()) {
+ break;
+ }
+ new_tablet->save_meta();
+ } while (0);
+
+ if (res.ok()) {
+ // _validate_migration_result should be outside the above while loop.
+ // to avoid requiring the header lock twice.
+ res = _validate_migration_result(new_tablet, request);
+ }
+
+ // if failed convert history data, then just remove the new tablet
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to alter tablet. base_tablet=" <<
base_tablet->full_name()
+ << ", drop new_tablet=" << new_tablet->full_name();
+ // do not drop the new tablet and its data. GC thread will
+ }
+
+ return res;
+}
+
+Status StorageMigrationV2Handler::_get_versions_to_be_changed(
+ TabletSharedPtr base_tablet, std::vector<Version>*
versions_to_be_changed) {
+ RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
+ if (rowset == nullptr) {
+ LOG(WARNING) << "Tablet has no version. base_tablet=" <<
base_tablet->full_name();
+ return Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
+ }
+
+ std::vector<Version> span_versions;
+ RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0,
rowset->version().second),
+ &span_versions));
+ versions_to_be_changed->insert(versions_to_be_changed->end(),
span_versions.begin(),
+ span_versions.end());
+
+ return Status::OK();
+}
+
+Status StorageMigrationV2Handler::_convert_historical_rowsets(const
StorageMigrationParams& sm_params) {
+ LOG(INFO) << "begin to convert historical rowsets for new_tablet from
base_tablet."
+ << " base_tablet=" << sm_params.base_tablet->full_name()
+ << ", new_tablet=" << sm_params.new_tablet->full_name();
+
+ // find end version
+ int32_t end_version = -1;
+ for (size_t i = 0; i < sm_params.ref_rowset_readers.size(); ++i) {
+ if (sm_params.ref_rowset_readers[i]->version().second > end_version) {
+ end_version = sm_params.ref_rowset_readers[i]->version().second;
+ }
+ }
+
+ Status res = Status::OK();
+ for (auto& rs_reader : sm_params.ref_rowset_readers) {
+ VLOG_TRACE << "begin to convert a history rowset. version=" <<
rs_reader->version().first
+ << "-" << rs_reader->version().second;
+
+ TabletSharedPtr new_tablet = sm_params.new_tablet;
+
+ RowsetWriterContext writer_context;
+ writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
+ writer_context.tablet_uid = new_tablet->tablet_uid();
+ writer_context.tablet_id = new_tablet->tablet_id();
+ writer_context.partition_id = new_tablet->partition_id();
+ writer_context.tablet_schema_hash = new_tablet->schema_hash();
+ // linked schema change can't change rowset type, therefore we
preserve rowset type in schema change now
+ writer_context.rowset_type =
rs_reader->rowset()->rowset_meta()->rowset_type();
+ if (sm_params.new_tablet->tablet_meta()->preferred_rowset_type() ==
BETA_ROWSET) {
+ writer_context.rowset_type = BETA_ROWSET;
+ }
+ writer_context.path_desc = new_tablet->tablet_path_desc();
+ writer_context.tablet_schema = &(new_tablet->tablet_schema());
+ writer_context.rowset_state = VISIBLE;
+ writer_context.version = rs_reader->version();
+ writer_context.segments_overlap =
rs_reader->rowset()->rowset_meta()->segments_overlap();
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ Status status = RowsetFactory::create_rowset_writer(writer_context,
&rowset_writer);
+ if (!status.ok()) {
+ res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
+ goto PROCESS_ALTER_EXIT;
+ }
+
+ if ((res =
_generate_rowset_writer(sm_params.base_tablet->tablet_path_desc(),
+ sm_params.new_tablet->tablet_path_desc(),
+ rs_reader, rowset_writer.get(), new_tablet)) != OLAP_SUCCESS) {
+ LOG(WARNING) << "failed to add_rowset. version=" <<
rs_reader->version().first << "-"
+ << rs_reader->version().second;
+ new_tablet->data_dir()->remove_pending_ids(
+ ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+ goto PROCESS_ALTER_EXIT;
+ }
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+
rowset_writer->rowset_id().to_string());
+ // Add the new version of the data to the header
+ // In order to prevent the occurrence of deadlock, we must first lock
the old table, and then lock the new table
+ std::lock_guard<std::mutex>
lock(sm_params.new_tablet->get_push_lock());
+ RowsetSharedPtr new_rowset = rowset_writer->build();
+ if (new_rowset == nullptr) {
+ LOG(WARNING) << "failed to build rowset, exit alter process";
+ goto PROCESS_ALTER_EXIT;
+ }
+ res = sm_params.new_tablet->add_rowset(new_rowset, false);
+ if (res ==
Status::OLAPInternalError(OLAP_ERR_PUSH_VERSION_ALREADY_EXIST)) {
+ LOG(WARNING) << "version already exist, version revert occurred. "
+ << "tablet=" << sm_params.new_tablet->full_name() <<
", version='"
+ << rs_reader->version().first << "-" <<
rs_reader->version().second;
+ StorageEngine::instance()->add_unused_rowset(new_rowset);
+ res = Status::OK();
+ } else if (!res.ok()) {
+ LOG(WARNING) << "failed to register new version. "
+ << " tablet=" << sm_params.new_tablet->full_name()
+ << ", version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
+ StorageEngine::instance()->add_unused_rowset(new_rowset);
+ goto PROCESS_ALTER_EXIT;
+ } else {
+ VLOG_NOTICE << "register new version. tablet=" <<
sm_params.new_tablet->full_name()
+ << ", version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
+ }
+
+ VLOG_TRACE << "succeed to convert a history version."
+ << " version=" << rs_reader->version().first << "-"
+ << rs_reader->version().second;
+ }
+ // XXX:The SchemaChange state should not be canceled at this time, because
the new Delta has to be converted to the old and new Schema version
+ PROCESS_ALTER_EXIT : {
+ // save tablet meta here because rowset meta is not saved during add rowset
+ std::lock_guard<std::shared_mutex>
new_wlock(sm_params.new_tablet->get_header_lock());
+ sm_params.new_tablet->save_meta();
+}
+ if (res.ok()) {
+ Version test_version(0, end_version);
+ res = sm_params.new_tablet->check_version_integrity(test_version);
+ }
+
+ LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
+ << "base_tablet=" << sm_params.base_tablet->full_name()
+ << ", new_tablet=" << sm_params.new_tablet->full_name();
+ return res;
+}
+
+Status StorageMigrationV2Handler::_validate_migration_result(TabletSharedPtr
new_tablet,
+ const
TStorageMigrationReqV2& request) {
+ Version max_continuous_version = {-1, 0};
+ new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
+ LOG(INFO) << "find max continuous version of tablet=" <<
new_tablet->full_name()
+ << ", start_version=" << max_continuous_version.first
+ << ", end_version=" << max_continuous_version.second;
+ if (max_continuous_version.second < request.migration_version) {
+ return Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST);
+ }
+
+ std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+ {
+ std::shared_lock rdlock(new_tablet->get_header_lock(),
std::try_to_lock);
+ new_tablet->acquire_version_and_rowsets(&version_rowsets);
+ }
+ for (auto& pair : version_rowsets) {
+ RowsetSharedPtr rowset = pair.second;
+ if (!rowset->check_file_exist()) {
+ return Status::OLAPInternalError(OLAP_ERR_FILE_NOT_EXIST);
+ }
+ }
+ return Status::OK();
+}
+
+Status StorageMigrationV2Handler::_generate_rowset_writer(
+ const FilePathDesc& src_desc, const FilePathDesc& dst_desc,
+ RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
TabletSharedPtr new_tablet) {
+ if (!src_desc.is_remote() && dst_desc.is_remote()) {
+ string remote_file_param_path = dst_desc.filepath + REMOTE_FILE_PARAM;
+ rapidjson::StringBuffer strbuf;
+ rapidjson::PrettyWriter <rapidjson::StringBuffer> writer(strbuf);
+ writer.StartObject();
+ writer.Key(TABLET_UID.c_str());
+ writer.String(TabletUid(new_tablet->tablet_uid()).to_string().c_str());
+ writer.Key(STORAGE_NAME.c_str());
+ writer.String(dst_desc.storage_name.c_str());
+ writer.EndObject();
+ Status st = env_util::write_string_to_file(
+ Env::Default(), Slice(std::string(strbuf.GetString())),
remote_file_param_path);
+ // strbuf.GetString() format: {"tablet_uid":
"a84cfb67d3ad3d62-87fd8b3ae9bdad84", "storage_name": "s3_name"}
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to write tablet_uid and storage_name. path="
<< remote_file_param_path
+ << ", error:" << st.to_string();
+ return Status::OLAPInternalError(OLAP_ERR_COPY_FILE_ERROR);
+ }
+ LOG(INFO) << "write storage_param successfully: " <<
remote_file_param_path;
+ }
+
+ return
new_rowset_writer->add_rowset_for_migration(rowset_reader->rowset());
+}
+
+} // namespace doris
diff --git a/be/src/olap/storage_migration_v2.h
b/be/src/olap/storage_migration_v2.h
new file mode 100644
index 0000000000..47ca08d7e3
--- /dev/null
+++ b/be/src/olap/storage_migration_v2.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
+#define DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
+
+#include <deque>
+#include <functional>
+#include <queue>
+#include <utility>
+#include <vector>
+
+#include "gen_cpp/AgentService_types.h"
+#include "olap/column_mapping.h"
+#include "olap/delete_handler.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/tablet.h"
+
+namespace doris {
+
+class StorageMigrationV2Handler {
+public:
+ static StorageMigrationV2Handler* instance() {
+ static StorageMigrationV2Handler instance;
+ return &instance;
+ }
+
+ // schema change v2, it will not set alter task in base tablet
+ Status process_storage_migration_v2(const TStorageMigrationReqV2& request);
+
+private:
+
+ Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
+ std::vector<Version>*
versions_to_be_changed);
+
+ struct StorageMigrationParams {
+ TabletSharedPtr base_tablet;
+ TabletSharedPtr new_tablet;
+ std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
+ DeleteHandler* delete_handler = nullptr;
+ };
+
+ Status _do_process_storage_migration_v2(const TStorageMigrationReqV2&
request);
+
+ Status _validate_migration_result(TabletSharedPtr new_tablet, const
TStorageMigrationReqV2& request);
+
+ Status _convert_historical_rowsets(const StorageMigrationParams&
sm_params);
+
+ Status _generate_rowset_writer(
+ const FilePathDesc& src_desc, const FilePathDesc& dst_desc,
+ RowsetReaderSharedPtr rowset_reader, RowsetWriter*
new_rowset_writer, TabletSharedPtr new_tablet);
+
+private:
+ StorageMigrationV2Handler();
+ virtual ~StorageMigrationV2Handler();
+ StorageMigrationV2Handler(const StorageMigrationV2Handler&) = delete;
+ StorageMigrationV2Handler& operator=(const StorageMigrationV2Handler&) =
delete;
+
+ std::shared_ptr<MemTracker> _mem_tracker;
+};
+
+} // namespace doris
+
+#endif // DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 31624dee34..e0d9068a86 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1279,7 +1279,11 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info) {
// Useless but it is a required filed in TTabletInfo
tablet_info->version_hash = 0;
tablet_info->__set_partition_id(_tablet_meta->partition_id());
- tablet_info->__set_storage_medium(_data_dir->storage_medium());
+ if (FilePathDesc::is_remote(_data_dir->storage_medium())) {
+
tablet_info->__set_storage_medium(fs::fs_util::get_t_storage_medium(_tablet_meta->storage_medium()));
+ } else {
+ tablet_info->__set_storage_medium(_data_dir->storage_medium());
+ }
tablet_info->__set_version_count(_tablet_meta->version_count());
tablet_info->__set_path_hash(_data_dir->path_hash());
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index de1a1f9cc7..4e8019e5c0 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -28,6 +28,9 @@
#include <cstdio>
#include <cstdlib>
#include <filesystem>
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/stringbuffer.h"
#include "env/env.h"
#include "env/env_util.h"
@@ -220,6 +223,20 @@ Status TabletManager::create_tablet(const
TCreateTabletReq& request,
int64_t tablet_id = request.tablet_id;
LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id;
+ if (FilePathDesc::is_remote(request.storage_medium)) {
+ FilePathDesc path_desc;
+ path_desc.storage_medium = request.storage_medium;
+ path_desc.storage_name = request.storage_param.storage_name;
+ StorageParamPB storage_param;
+ Status st =
StorageBackendMgr::instance()->get_storage_param(request.storage_param.storage_name,
&storage_param);
+ if (!st.ok() || storage_param.DebugString() !=
fs::fs_util::get_storage_param_pb(request.storage_param).DebugString()) {
+ LOG(INFO) << "remote storage need to change, create it.
storage_name: " << request.storage_param.storage_name;
+
RETURN_NOT_OK_STATUS_WITH_WARN(StorageBackendMgr::instance()->create_remote_storage(
+ fs::fs_util::get_storage_param_pb(request.storage_param)),
+ "remote storage create failed. storage_name: " +
request.storage_param.storage_name);
+ }
+ }
+
std::lock_guard<std::shared_mutex>
wrlock(_get_tablets_shard_lock(tablet_id));
TRACE("got tablets shard lock");
// Make create_tablet operation to be idempotent:
@@ -250,8 +267,11 @@ Status TabletManager::create_tablet(const
TCreateTabletReq& request,
// If we are doing schema-change, we should use the same data dir
// TODO(lingbin): A litter trick here, the directory should be
determined before
// entering this method
- stores.clear();
- stores.push_back(base_tablet->data_dir());
+ if (request.storage_medium ==
base_tablet->data_dir()->path_desc().storage_medium
+ || (FilePathDesc::is_remote(request.storage_medium) &&
base_tablet->data_dir()->is_remote())) {
+ stores.clear();
+ stores.push_back(base_tablet->data_dir());
+ }
}
// set alter type to schema-change. it is useless
@@ -1014,8 +1034,49 @@ void
TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
if (Env::Default()->path_exists(schema_hash_path).ok()) {
LOG(INFO) << "start to move tablet to trash. tablet_path = " <<
schema_hash_path;
FilePathDesc segment_desc(schema_hash_path);
+ string remote_file_param_path = schema_hash_path + REMOTE_FILE_PARAM;
+ if (data_dir->is_remote() &&
FileUtils::check_exist(remote_file_param_path)) {
+ // it means you must remove remote file for this segment first
+ string json_buf;
+ Status s = env_util::read_file_to_string(Env::Default(),
remote_file_param_path, &json_buf);
+ if (!s.ok()) {
+ LOG(WARNING) << "delete unused file error when read
remote_file_param_path: "
+ << remote_file_param_path;
+ return;
+ }
+ // json_buf format: {"tablet_uid":
"a84cfb67d3ad3d62-87fd8b3ae9bdad84", "storage_name": "s3_name"}
+ std::string storage_name = nullptr;
+ std::string tablet_uid = nullptr;
+ rapidjson::Document dom;
+ if (!dom.Parse(json_buf.c_str()).HasParseError()) {
+ if (dom.HasMember(TABLET_UID.c_str()) &&
dom[TABLET_UID.c_str()].IsString()
+ && dom.HasMember(STORAGE_NAME.c_str()) &&
dom[STORAGE_NAME.c_str()].IsString()) {
+ storage_name = dom[STORAGE_NAME.c_str()].GetString();
+ tablet_uid = dom[TABLET_UID.c_str()].GetString();
+ }
+ }
+ if (!tablet_uid.empty() && !storage_name.empty()) {
+ segment_desc.storage_name = storage_name;
+ StorageParamPB storage_param;
+ if
(StorageBackendMgr::instance()->get_storage_param(storage_name, &storage_param)
!= OLAP_SUCCESS) {
+ LOG(WARNING) << "storage_name is invalid: " <<
storage_name;
+ return;
+ }
+
+ // remote file may be exist, check and mv it to trash
+ std::filesystem::path local_segment_path(schema_hash_path);
+ std::stringstream remote_file_stream;
+ remote_file_stream << data_dir->path_desc().remote_path <<
DATA_PREFIX
+ << "/" <<
local_segment_path.parent_path().parent_path().filename().string() // shard
+ << "/" <<
local_segment_path.parent_path().filename().string() //
tablet_path
+ << "/" <<
local_segment_path.filename().string() //
segment_path
+ << "/" << tablet_uid;
+ segment_desc.storage_medium =
fs::fs_util::get_t_storage_medium(storage_param.storage_medium());
+ segment_desc.remote_path = remote_file_stream.str();
+ }
+ }
Status rm_st = data_dir->move_to_trash(segment_desc);
- if (rm_st != Status::OK()) {
+ if (!rm_st.ok()) {
LOG(WARNING) << "fail to move dir to trash. dir=" <<
schema_hash_path;
} else {
LOG(INFO) << "move path " << schema_hash_path << " to trash
successfully";
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 060119cae1..68f8ca9eb2 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -406,6 +406,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
}
_remote_storage_name = tablet_meta_pb.remote_storage_name();
+ _storage_medium = tablet_meta_pb.storage_medium();
}
void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -450,6 +451,10 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
if (_preferred_rowset_type == BETA_ROWSET) {
tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
}
+
+ tablet_meta_pb->set_remote_storage_name(_remote_storage_name);
+ tablet_meta_pb->set_storage_medium(_storage_medium);
+
}
uint32_t TabletMeta::mem_size() const {
@@ -663,6 +668,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
}
if (a._in_restore_mode != b._in_restore_mode) return false;
if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
+ if (a._storage_medium != b._storage_medium) return false;
+ if (a._remote_storage_name != b._remote_storage_name) return false;
return true;
}
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 73dfc68743..bc883c103b 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -175,6 +175,10 @@ public:
return _remote_storage_name;
}
+ StorageMediumPB storage_medium() const {
+ return _storage_medium;
+ }
+
private:
Status _save_meta(DataDir* data_dir);
void _init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn,
ColumnPB* column);
@@ -209,6 +213,7 @@ private:
bool _in_restore_mode = false;
RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
std::string _remote_storage_name;
+ StorageMediumPB _storage_medium;
std::shared_mutex _meta_lock;
};
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 2e55fbb6e1..3af4e189b7 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -341,7 +341,7 @@ Status
EngineStorageMigrationTask::_copy_index_and_data_files(
const string& full_path, const std::vector<RowsetSharedPtr>&
consistent_rowsets) const {
Status status = Status::OK();
for (const auto& rs : consistent_rowsets) {
- status = rs->copy_files_to(full_path);
+ status = rs->copy_files_to(full_path, rs->rowset_id());
if (!status.ok()) {
Status ret = FileUtils::remove_all(full_path);
if (!ret.ok()) {
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp
b/be/src/olap/task/engine_storage_migration_task_v2.cpp
new file mode 100644
index 0000000000..5e865c9849
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_storage_migration_task_v2.h"
+
+#include "olap/storage_migration_v2.h"
+#include "runtime/mem_tracker.h"
+
+namespace doris {
+
+using std::to_string;
+
+EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const
TStorageMigrationReqV2& request)
+ : _storage_migration_req(request) {
+ _mem_tracker = MemTracker::create_tracker(
+ config::memory_limitation_per_thread_for_storage_migration_bytes,
+ fmt::format("EngineStorageMigrationTaskV2: {}-{}",
+ std::to_string(_storage_migration_req.base_tablet_id),
+ std::to_string(_storage_migration_req.new_tablet_id)),
+ StorageEngine::instance()->storage_migration_mem_tracker(),
MemTrackerLevel::TASK);
+}
+
+Status EngineStorageMigrationTaskV2::execute() {
+ DorisMetrics::instance()->storage_migrate_v2_requests_total->increment(1);
+ StorageMigrationV2Handler* storage_migration_v2_handler =
StorageMigrationV2Handler::instance();
+ Status res =
storage_migration_v2_handler->process_storage_migration_v2(_storage_migration_req);
+
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to do storage migration task. res=" << res
+ << " base_tablet_id=" <<
_storage_migration_req.base_tablet_id
+ << ", base_schema_hash=" <<
_storage_migration_req.base_schema_hash
+ << ", new_tablet_id=" <<
_storage_migration_req.new_tablet_id
+ << ", new_schema_hash=" <<
_storage_migration_req.new_schema_hash;
+
DorisMetrics::instance()->storage_migrate_v2_requests_failed->increment(1);
+ return res;
+ }
+
+ LOG(INFO) << "success to create new storage migration v2. res=" << res
+ << " base_tablet_id=" << _storage_migration_req.base_tablet_id
<< ", base_schema_hash"
+ << _storage_migration_req.base_schema_hash
+ << ", new_tablet_id=" << _storage_migration_req.new_tablet_id
+ << ", new_schema_hash=" <<
_storage_migration_req.new_schema_hash;
+ return res;
+} // execute
+
+} // namespace doris
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.h
b/be/src/olap/task/engine_storage_migration_task_v2.h
new file mode 100644
index 0000000000..81d57ff636
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task_v2.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
+#define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
+
+#include "gen_cpp/AgentService_types.h"
+#include "olap/olap_define.h"
+#include "olap/task/engine_task.h"
+
+namespace doris {
+
+// base class for storage engine
+// add "Engine" as task prefix to prevent duplicate name with agent task
+class EngineStorageMigrationTaskV2 : public EngineTask {
+public:
+ virtual Status execute();
+
+public:
+ EngineStorageMigrationTaskV2(const TStorageMigrationReqV2& request);
+ ~EngineStorageMigrationTaskV2() {}
+
+private:
+ const TStorageMigrationReqV2& _storage_migration_req;
+
+ std::shared_ptr<MemTracker> _mem_tracker;
+}; // EngineTask
+
+} // namespace doris
+#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 36c0528ca1..7caf55ac8b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -153,7 +153,6 @@ public:
const std::vector<StorePath>& store_paths() const { return _store_paths; }
size_t store_path_to_index(const std::string& path) { return
_store_path_map[path]; }
- void set_store_paths(const std::vector<StorePath>& paths) { _store_paths =
paths; }
StorageEngine* storage_engine() { return _storage_engine; }
void set_storage_engine(StorageEngine* storage_engine) { _storage_engine =
storage_engine; }
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index a1d781a944..0163815c76 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -64,6 +64,8 @@ DEFINE_ENGINE_COUNTER_METRIC(schema_change_requests_failed,
schema_change, faile
DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_total, create_rollup,
total);
DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_failed, create_rollup,
failed);
DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_requests_total, storage_migrate,
total);
+DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_v2_requests_total,
storage_migrate_v2, total);
+DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_v2_requests_failed,
storage_migrate_v2, failed);
DEFINE_ENGINE_COUNTER_METRIC(delete_requests_total, delete, total);
DEFINE_ENGINE_COUNTER_METRIC(delete_requests_failed, delete, failed);
DEFINE_ENGINE_COUNTER_METRIC(clone_requests_total, clone, total);
@@ -208,6 +210,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
create_rollup_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
create_rollup_requests_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
storage_migrate_requests_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
storage_migrate_v2_requests_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
storage_migrate_v2_requests_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_failed);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index ca5d05cc8c..aa59d1770b 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -76,6 +76,8 @@ public:
IntCounter* create_rollup_requests_total;
IntCounter* create_rollup_requests_failed;
IntCounter* storage_migrate_requests_total;
+ IntCounter* storage_migrate_v2_requests_total;
+ IntCounter* storage_migrate_v2_requests_failed;
IntCounter* delete_requests_total;
IntCounter* delete_requests_failed;
IntCounter* clone_requests_total;
@@ -190,6 +192,7 @@ public:
UIntGauge* load_channel_mem_consumption;
UIntGauge* query_mem_consumption;
UIntGauge* schema_change_mem_consumption;
+ UIntGauge* storage_migration_mem_consumption;
UIntGauge* tablet_meta_mem_consumption;
// Cache metrics
diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h
index 0e56ec8eb2..14d1742ae6 100644
--- a/be/src/util/file_utils.h
+++ b/be/src/util/file_utils.h
@@ -53,8 +53,6 @@ public:
static Status create_dir(const std::string& dir_path, Env* env);
// Delete file recursively.
- static Status remove_all(const std::string& dir_path, TStorageMedium::type
store);
-
static Status remove_all(const std::string& dir_path);
static Status remove(const std::string& path);
diff --git a/be/src/util/storage_backend_mgr.cpp
b/be/src/util/storage_backend_mgr.cpp
index ab907e114f..53a49c1d5d 100644
--- a/be/src/util/storage_backend_mgr.cpp
+++ b/be/src/util/storage_backend_mgr.cpp
@@ -111,25 +111,25 @@ Status
StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB&
}
std::map<std::string, std::string> storage_prop;
switch (storage_param_pb.storage_medium()) {
- case StorageMediumPB::S3:
- default:
- S3StorageParamPB s3_storage_param =
storage_param_pb.s3_storage_param();
- if (s3_storage_param.s3_ak().empty() ||
s3_storage_param.s3_sk().empty() ||
- s3_storage_param.s3_endpoint().empty() ||
s3_storage_param.s3_region().empty()) {
- return Status::InternalError("s3_storage_param param is invalid");
- }
- storage_prop[S3_AK] = s3_storage_param.s3_ak();
- storage_prop[S3_SK] = s3_storage_param.s3_sk();
- storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
- storage_prop[S3_REGION] = s3_storage_param.s3_region();
- storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
- storage_prop[S3_REQUEST_TIMEOUT_MS] =
s3_storage_param.s3_request_timeout_ms();
- storage_prop[S3_CONN_TIMEOUT_MS] =
s3_storage_param.s3_conn_timeout_ms();
-
- if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
- return Status::InternalError("s3_storage_param is invalid");
- }
- _storage_backend_map[storage_name] =
std::make_shared<S3StorageBackend>(storage_prop);
+ case StorageMediumPB::S3:
+ default:
+ S3StorageParamPB s3_storage_param =
storage_param_pb.s3_storage_param();
+ if (s3_storage_param.s3_ak().empty() ||
s3_storage_param.s3_sk().empty()
+ || s3_storage_param.s3_endpoint().empty() ||
s3_storage_param.s3_region().empty()) {
+ return Status::InternalError("s3_storage_param param is
invalid");
+ }
+ storage_prop[S3_AK] = s3_storage_param.s3_ak();
+ storage_prop[S3_SK] = s3_storage_param.s3_sk();
+ storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
+ storage_prop[S3_REGION] = s3_storage_param.s3_region();
+ storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
+ storage_prop[S3_REQUEST_TIMEOUT_MS] =
s3_storage_param.s3_request_timeout_ms();
+ storage_prop[S3_CONN_TIMEOUT_MS] =
s3_storage_param.s3_conn_timeout_ms();
+
+ if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
+ return Status::InternalError("s3_storage_param is invalid");
+ }
+ _storage_backend_map[storage_name] =
std::make_shared<S3StorageBackend>(storage_prop);
}
_storage_param_map[storage_name] = storage_param_pb;
_storage_backend_active_time[storage_name] = time(nullptr);
@@ -168,10 +168,11 @@ Status StorageBackendMgr::get_root_path(const
std::string& storage_name, std::st
std::string StorageBackendMgr::get_root_path_from_param(const StorageParamPB&
storage_param) {
switch (storage_param.storage_medium()) {
- case StorageMediumPB::S3:
- default: {
- return storage_param.s3_storage_param().root_path();
- }
+ case StorageMediumPB::S3:
+ default:
+ {
+ return storage_param.s3_storage_param().root_path();
+ }
}
}
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index ae5113920c..1eb01e8c5f 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -144,6 +144,14 @@ struct TAlterMaterializedViewParam {
3: optional Exprs.TExpr mv_expr
}
+struct TStorageMigrationReqV2 {
+ 1: optional Types.TTabletId base_tablet_id
+ 2: optional Types.TTabletId new_tablet_id
+ 3: optional Types.TSchemaHash base_schema_hash
+ 4: optional Types.TSchemaHash new_schema_hash
+ 5: optional Types.TVersion migration_version
+}
+
struct TClusterInfo {
1: required string user
2: required string password
@@ -350,6 +358,7 @@ struct TAgentTaskRequest {
25: optional i64 recv_time // time the task is inserted to queue
26: optional TUpdateTabletMetaInfoReq update_tablet_meta_info_req
27: optional TCompactionReq compaction_req
+ 28: optional TStorageMigrationReqV2 storage_migration_req_v2
}
struct TAgentResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 2cd74860f0..95f3cc4c1e 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -187,7 +187,8 @@ enum TTaskType {
ALTER,
INSTALL_PLUGIN,
UNINSTALL_PLUGIN,
- COMPACTION
+ COMPACTION,
+ STORAGE_MEDIUM_MIGRATE_V2
}
enum TStmtType {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]