This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e157c2c254 [feature-wip](remote-storage) step3: Support remote 
storage, only for be, add migration_task_v2 (#8806)
e157c2c254 is described below

commit e157c2c254a48c53c0ff07a17af6f52fc6b8d0c5
Author: pengxiangyu <[email protected]>
AuthorDate: Fri Apr 22 22:38:10 2022 +0800

    [feature-wip](remote-storage) step3: Support remote storage, only for be, 
add migration_task_v2 (#8806)
    
    1. Add TStorageMigrationReqV2 and EngineStorageMigrationTask to support 
migration action
    2. Change TabletManager::create_tablet() for remote storage
    3. Change TabletManager::try_delete_unused_tablet_path() for remote storage
---
 be/src/agent/agent_server.cpp                      |   2 +
 be/src/agent/agent_server.h                        |   2 +
 be/src/agent/task_worker_pool.cpp                  | 128 ++++++
 be/src/agent/task_worker_pool.h                    |   8 +-
 be/src/common/config.h                             |   1 +
 be/src/common/status.h                             |   1 +
 be/src/olap/CMakeLists.txt                         |   2 +
 be/src/olap/rowset/alpha_rowset.cpp                |   2 +-
 be/src/olap/rowset/alpha_rowset.h                  |   2 +-
 be/src/olap/rowset/alpha_rowset_writer.cpp         |   5 +
 be/src/olap/rowset/alpha_rowset_writer.h           |   1 +
 be/src/olap/rowset/beta_rowset.cpp                 |  16 +-
 be/src/olap/rowset/beta_rowset.h                   |   5 +-
 be/src/olap/rowset/beta_rowset_writer.cpp          |  34 ++
 be/src/olap/rowset/beta_rowset_writer.h            |   2 +
 be/src/olap/rowset/rowset.h                        |   9 +-
 be/src/olap/rowset/rowset_writer.h                 |   2 +
 be/src/olap/schema_change.cpp                      |   4 +-
 be/src/olap/storage_engine.cpp                     |  68 ++-
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/olap/storage_migration_v2.cpp               | 464 +++++++++++++++++++++
 be/src/olap/storage_migration_v2.h                 |  79 ++++
 be/src/olap/tablet.cpp                             |   6 +-
 be/src/olap/tablet_manager.cpp                     |  67 ++-
 be/src/olap/tablet_meta.cpp                        |   7 +
 be/src/olap/tablet_meta.h                          |   5 +
 be/src/olap/task/engine_storage_migration_task.cpp |   2 +-
 .../olap/task/engine_storage_migration_task_v2.cpp |  60 +++
 .../olap/task/engine_storage_migration_task_v2.h   |  44 ++
 be/src/runtime/exec_env.h                          |   1 -
 be/src/util/doris_metrics.cpp                      |   4 +
 be/src/util/doris_metrics.h                        |   3 +
 be/src/util/file_utils.h                           |   2 -
 be/src/util/storage_backend_mgr.cpp                |  47 ++-
 gensrc/thrift/AgentService.thrift                  |   9 +
 gensrc/thrift/Types.thrift                         |   3 +-
 36 files changed, 1041 insertions(+), 61 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index bf536213a7..6e754f941a 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -90,6 +90,7 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
     CREATE_AND_START_THREAD(REPORT_DISK_STATE, _report_disk_state_workers);
     CREATE_AND_START_THREAD(REPORT_OLAP_TABLE, _report_tablet_workers);
     CREATE_AND_START_POOL(SUBMIT_TABLE_COMPACTION, 
_submit_table_compaction_workers);
+    CREATE_AND_START_POOL(STORAGE_MEDIUM_MIGRATE_V2, 
_storage_medium_migrate_v2_workers);
 #undef CREATE_AND_START_POOL
 #undef CREATE_AND_START_THREAD
 
@@ -152,6 +153,7 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
             HANDLE_TYPE(TTaskType::UPDATE_TABLET_META_INFO, 
_update_tablet_meta_info_workers,
                         update_tablet_meta_info_req);
             HANDLE_TYPE(TTaskType::COMPACTION, 
_submit_table_compaction_workers, compaction_req);
+            HANDLE_TYPE(TTaskType::STORAGE_MEDIUM_MIGRATE_V2, 
_storage_medium_migrate_v2_workers, storage_migration_req_v2);
 
         case TTaskType::REALTIME_PUSH:
         case TTaskType::PUSH:
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 8f8c9da072..09144f3dee 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -89,6 +89,8 @@ private:
 
     std::unique_ptr<TaskWorkerPool> _submit_table_compaction_workers;
 
+    std::unique_ptr<TaskWorkerPool> _storage_medium_migrate_v2_workers;
+
     std::unique_ptr<TopicSubscriber> _topic_subscriber;
 };
 
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 23620861c2..6b22584153 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -44,6 +44,7 @@
 #include "olap/task/engine_clone_task.h"
 #include "olap/task/engine_publish_version_task.h"
 #include "olap/task/engine_storage_migration_task.h"
+#include "olap/task/engine_storage_migration_task_v2.h"
 #include "olap/utils.h"
 #include "runtime/exec_env.h"
 #include "runtime/snapshot_loader.h"
@@ -194,6 +195,11 @@ void TaskWorkerPool::start() {
         cb = 
std::bind<void>(&TaskWorkerPool::_submit_table_compaction_worker_thread_callback,
                              this);
         break;
+    case TaskWorkerType::STORAGE_MEDIUM_MIGRATE_V2:
+        _worker_count = 1;
+        cb = 
std::bind<void>(&TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback,
+                             this);
+        break;
     default:
         // pass
         break;
@@ -1650,4 +1656,126 @@ void 
TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
     }
 }
 
+void TaskWorkerPool::_storage_medium_migrate_v2_worker_thread_callback() {
+    while (_is_work) {
+        TAgentTaskRequest agent_task_req;
+        {
+            std::unique_lock<std::mutex> 
worker_thread_lock(_worker_thread_lock);
+            while (_is_work && _tasks.empty()) {
+                _worker_thread_condition_variable.wait(worker_thread_lock);
+            }
+            if (!_is_work) {
+                return;
+            }
+
+            agent_task_req = _tasks.front();
+            _tasks.pop_front();
+        }
+        int64_t signature = agent_task_req.signature;
+        LOG(INFO) << "get migration table v2 task, signature: " << 
agent_task_req.signature;
+        bool is_task_timeout = false;
+        if (agent_task_req.__isset.recv_time) {
+            int64_t time_elapsed = time(nullptr) - agent_task_req.recv_time;
+            if (time_elapsed > config::report_task_interval_seconds * 20) {
+                LOG(INFO) << "task elapsed " << time_elapsed
+                          << " seconds since it is inserted to queue, it is 
timeout";
+                is_task_timeout = true;
+            }
+        }
+        if (!is_task_timeout) {
+            TFinishTaskRequest finish_task_request;
+            TTaskType::type task_type = agent_task_req.task_type;
+            switch (task_type) {
+                case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
+                    _storage_medium_migrate_v2(agent_task_req, signature, 
task_type, &finish_task_request);
+                    break;
+                default:
+                    // pass
+                    break;
+            }
+            _finish_task(finish_task_request);
+        }
+        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
+    }
+}
+
+void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& 
agent_task_req, int64_t signature,
+        const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request) {
+    Status status = Status::OK();
+    TStatus task_status;
+    std::vector<string> error_msgs;
+
+    string process_name;
+    switch (task_type) {
+        case TTaskType::STORAGE_MEDIUM_MIGRATE_V2:
+            process_name = "StorageMediumMigrationV2";
+            break;
+        default:
+            std::string task_name;
+            EnumToString(TTaskType, task_type, task_name);
+            LOG(WARNING) << "Storage medium migration v2 type invalid. type: " 
<< task_name
+                         << ", signature: " << signature;
+            status = Status::NotSupported("Storage medium migration v2 type 
invalid");
+            break;
+    }
+
+    // Check last storage medium migration v2 status, if failed delete tablet 
file
+    // Do not need to adjust delete success or not
+    // Because if delete failed task will failed
+    TTabletId new_tablet_id;
+    TSchemaHash new_schema_hash = 0;
+    if (status.ok()) {
+        new_tablet_id = agent_task_req.storage_migration_req_v2.new_tablet_id;
+        new_schema_hash = 
agent_task_req.storage_migration_req_v2.new_schema_hash;
+        EngineStorageMigrationTaskV2 
engine_task(agent_task_req.storage_migration_req_v2);
+        Status sc_status = _env->storage_engine()->execute_task(&engine_task);
+        if (!sc_status.ok()) {
+            if (sc_status == 
Status::OLAPInternalError(OLAP_ERR_DATA_QUALITY_ERR)) {
+                error_msgs.push_back("The data quality does not satisfy, 
please check your data. ");
+            }
+            status = Status::DataQualityError("The data quality does not 
satisfy");
+        } else {
+            status = Status::OK();
+        }
+    }
+
+    if (status.ok()) {
+        ++_s_report_version;
+        LOG(INFO) << process_name << " finished. signature: " << signature;
+    }
+
+    // Return result to fe
+    finish_task_request->__set_backend(_backend);
+    finish_task_request->__set_report_version(_s_report_version);
+    finish_task_request->__set_task_type(task_type);
+    finish_task_request->__set_signature(signature);
+
+    std::vector<TTabletInfo> finish_tablet_infos;
+    if (status.ok()) {
+        TTabletInfo tablet_info;
+        status = _get_tablet_info(new_tablet_id, new_schema_hash, signature, 
&tablet_info);
+
+        if (!status.ok()) {
+            LOG(WARNING) << process_name << " success, but get new tablet info 
failed."
+                         << "tablet_id: " << new_tablet_id << ", schema_hash: 
" << new_schema_hash
+                         << ", signature: " << signature;
+        } else {
+            finish_tablet_infos.push_back(tablet_info);
+        }
+    }
+
+    if (status.ok()) {
+        finish_task_request->__set_finish_tablet_infos(finish_tablet_infos);
+        LOG(INFO) << process_name << " success. signature: " << signature;
+        error_msgs.push_back(process_name + " success");
+    } else {
+        LOG(WARNING) << process_name << " failed. signature: " << signature;
+        error_msgs.push_back(process_name + " failed");
+        error_msgs.push_back("status: " + status.to_string());
+    }
+    task_status.__set_status_code(status.code());
+    task_status.__set_error_msgs(error_msgs);
+    finish_task_request->__set_task_status(task_status);
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index d03afb1790..e413211357 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -68,7 +68,8 @@ public:
         MOVE,
         RECOVER_TABLET,
         UPDATE_TABLET_META_INFO,
-        SUBMIT_TABLE_COMPACTION
+        SUBMIT_TABLE_COMPACTION,
+        STORAGE_MEDIUM_MIGRATE_V2
     };
 
     enum ReportType { TASK, DISK, TABLET };
@@ -124,6 +125,8 @@ public:
             return "UPDATE_TABLET_META_INFO";
         case SUBMIT_TABLE_COMPACTION:
             return "SUBMIT_TABLE_COMPACTION";
+        case STORAGE_MEDIUM_MIGRATE_V2:
+            return "STORAGE_MEDIUM_MIGRATE_V2";
         default:
             return "Unknown";
         }
@@ -187,6 +190,7 @@ private:
     void _move_dir_thread_callback();
     void _update_tablet_meta_worker_thread_callback();
     void _submit_table_compaction_worker_thread_callback();
+    void _storage_medium_migrate_v2_worker_thread_callback();
 
     void _alter_tablet(const TAgentTaskRequest& alter_tablet_request, int64_t 
signature,
                        const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request);
@@ -203,6 +207,8 @@ private:
     // random sleep 1~second seconds
     void _random_sleep(int second);
 
+    void _storage_medium_migrate_v2(const TAgentTaskRequest& agent_task_req, 
int64_t signature,
+            const TTaskType::type task_type, TFinishTaskRequest* 
finish_task_request);
 private:
     std::string _name;
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index a93229cdf8..5bc62f0123 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -189,6 +189,7 @@ CONF_mInt64(column_dictionary_key_ratio_threshold, "0");
 CONF_mInt64(column_dictionary_key_size_threshold, "0");
 // memory_limitation_per_thread_for_schema_change_bytes unit bytes
 CONF_mInt64(memory_limitation_per_thread_for_schema_change_bytes, 
"2147483648");
+CONF_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, 
"100000000");
 
 // the clean interval of file descriptor cache and segment cache
 CONF_mInt32(cache_clean_interval, "1800");
diff --git a/be/src/common/status.h b/be/src/common/status.h
index ec74be3ef0..ad884cee5e 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -231,6 +231,7 @@ namespace doris {
     M(OLAP_ERR_ROWSET_READ_FAILED, -3111, "", true) \
     M(OLAP_ERR_ROWSET_INVALID_STATE_TRANSITION, -3112, "", true) \
     M(OLAP_ERR_STRING_OVERFLOW_IN_VEC_ENGINE, -3113, "", true) \
+    M(OLAP_ERR_ROWSET_ADD_MIGRATION_V2, -3114, "", true) \
 
 enum ErrorCode {
 #define M(NAME, ERRORCODE, DESC, STACKTRACEENABLED) NAME = ERRORCODE,
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index 4298d48bf9..657ebe3518 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -71,6 +71,7 @@ add_library(Olap STATIC
     version_graph.cpp
     schema.cpp
     schema_change.cpp
+    storage_migration_v2.cpp
     serialize.cpp
     storage_engine.cpp
     data_dir.cpp
@@ -116,6 +117,7 @@ add_library(Olap STATIC
     task/engine_checksum_task.cpp
     task/engine_clone_task.cpp
     task/engine_storage_migration_task.cpp
+    task/engine_storage_migration_task_v2.cpp
     task/engine_publish_version_task.cpp
     task/engine_alter_tablet_task.cpp
     column_vector.cpp
diff --git a/be/src/olap/rowset/alpha_rowset.cpp 
b/be/src/olap/rowset/alpha_rowset.cpp
index 80a0a5eebf..62bac2929a 100644
--- a/be/src/olap/rowset/alpha_rowset.cpp
+++ b/be/src/olap/rowset/alpha_rowset.cpp
@@ -100,7 +100,7 @@ Status AlphaRowset::link_files_to(const FilePathDesc& 
dir_desc, RowsetId new_row
     return Status::OK();
 }
 
-Status AlphaRowset::copy_files_to(const std::string& dir) {
+Status AlphaRowset::copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) {
     for (auto& segment_group : _segment_groups) {
         Status status = segment_group->copy_files_to(dir);
         if (!status.ok()) {
diff --git a/be/src/olap/rowset/alpha_rowset.h 
b/be/src/olap/rowset/alpha_rowset.h
index d902e0ca97..73651f920f 100644
--- a/be/src/olap/rowset/alpha_rowset.h
+++ b/be/src/olap/rowset/alpha_rowset.h
@@ -49,7 +49,7 @@ public:
 
     Status link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) 
override;
 
-    Status copy_files_to(const std::string& dir) override;
+    Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) override;
 
     Status convert_from_old_files(const std::string& snapshot_path,
                                       std::vector<std::string>* success_files);
diff --git a/be/src/olap/rowset/alpha_rowset_writer.cpp 
b/be/src/olap/rowset/alpha_rowset_writer.cpp
index 1e0e9c50a8..03cded4aab 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.cpp
+++ b/be/src/olap/rowset/alpha_rowset_writer.cpp
@@ -133,6 +133,11 @@ Status 
AlphaRowsetWriter::add_rowset_for_linked_schema_change(
     return Status::OK();
 }
 
+Status AlphaRowsetWriter::add_rowset_for_migration(RowsetSharedPtr rowset) {
+    LOG(WARNING) << "alpha_rowset_writer doesn't support 
add_rowset_for_migration";
+    return Status::NotSupported("alpha_rowset_writer doesn't support 
add_rowset_for_migration");
+}
+
 Status AlphaRowsetWriter::flush() {
     if (_writer_state == WRITER_FLUSHED) {
         return Status::OK();
diff --git a/be/src/olap/rowset/alpha_rowset_writer.h 
b/be/src/olap/rowset/alpha_rowset_writer.h
index 590e7b517e..9928aad948 100644
--- a/be/src/olap/rowset/alpha_rowset_writer.h
+++ b/be/src/olap/rowset/alpha_rowset_writer.h
@@ -43,6 +43,7 @@ public:
     Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
                                                    const SchemaMapping& 
schema_mapping) override;
 
+    Status add_rowset_for_migration(RowsetSharedPtr rowset) override;
     Status flush() override;
 
     // get a rowset
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index b9cd3aee8a..38a85d3a72 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -132,9 +132,9 @@ Status BetaRowset::link_files_to(const FilePathDesc& 
dir_desc, RowsetId new_rows
     return Status::OK();
 }
 
-Status BetaRowset::copy_files_to(const std::string& dir) {
+Status BetaRowset::copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) {
     for (int i = 0; i < num_segments(); ++i) {
-        FilePathDesc dst_path_desc = segment_file_path(dir, rowset_id(), i);
+        FilePathDesc dst_path_desc = segment_file_path(dir, new_rowset_id, i);
         Status status = Env::Default()->path_exists(dst_path_desc.filepath);
         if (status.ok()) {
             LOG(WARNING) << "file already exist: " << dst_path_desc.filepath;
@@ -154,7 +154,8 @@ Status BetaRowset::copy_files_to(const std::string& dir) {
     return Status::OK();
 }
 
-Status BetaRowset::upload_files_to(const FilePathDesc& dir_desc) {
+Status BetaRowset::upload_files_to(const FilePathDesc& dir_desc,
+        const RowsetId& new_rowset_id, bool delete_src) {
     std::shared_ptr<StorageBackend> storage_backend = 
StorageBackendMgr::instance()->
             get_storage_backend(dir_desc.storage_name);
     if (storage_backend == nullptr) {
@@ -162,13 +163,12 @@ Status BetaRowset::upload_files_to(const FilePathDesc& 
dir_desc) {
         return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
     }
     for (int i = 0; i < num_segments(); ++i) {
-        FilePathDesc dst_path_desc = segment_file_path(dir_desc, rowset_id(), 
i);
+        FilePathDesc dst_path_desc = segment_file_path(dir_desc, 
new_rowset_id, i);
         Status status = storage_backend->exist(dst_path_desc.remote_path);
         if (status.ok()) {
             LOG(WARNING) << "file already exist: " << 
dst_path_desc.remote_path;
             return Status::OLAPInternalError(OLAP_ERR_FILE_ALREADY_EXIST);
-        }
-        if (!status.is_not_found()) {
+        } else if (!status.is_not_found()) {
             LOG(WARNING) << "file check exist error: " << 
dst_path_desc.remote_path;
             return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
         }
@@ -179,6 +179,10 @@ Status BetaRowset::upload_files_to(const FilePathDesc& 
dir_desc) {
                          << dst_path_desc.remote_path << ", errno=" << 
Errno::no();
             return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
         }
+        if (delete_src && 
!Env::Default()->delete_file(src_path_desc.filepath).ok()) {
+            LOG(WARNING) << "fail to delete local file: " << 
src_path_desc.filepath << ", errno=" << Errno::no();
+            return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+        }
         LOG(INFO) << "succeed to upload file. from " << src_path_desc.filepath 
<< " to "
                   << dst_path_desc.remote_path;
     }
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index d7e38389bd..ffb6467c57 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -50,9 +50,10 @@ public:
 
     Status link_files_to(const FilePathDesc& dir_desc, RowsetId new_rowset_id) 
override;
 
-    Status copy_files_to(const std::string& dir) override;
+    Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) override;
 
-    Status upload_files_to(const FilePathDesc& dir_desc) override;
+    Status upload_files_to(const FilePathDesc& dir_desc,
+                           const RowsetId& new_rowset_id, bool delete_src = 
false) override;
 
     // only applicable to alpha rowset, no op here
     Status remove_old_files(std::vector<std::string>* files_to_remove) 
override {
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5c6b35c438..43b7c61004 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -141,6 +141,40 @@ Status 
BetaRowsetWriter::add_rowset_for_linked_schema_change(
     return add_rowset(rowset);
 }
 
+Status BetaRowsetWriter::add_rowset_for_migration(RowsetSharedPtr rowset) {
+    Status res = Status::OK();
+    assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
+    if (!rowset->rowset_path_desc().is_remote() && 
!_context.path_desc.is_remote()) {
+        res = rowset->copy_files_to(_context.path_desc.filepath, 
_context.rowset_id);
+        if (!res.ok()) {
+            LOG(WARNING) << "copy_files failed. src: " << 
rowset->rowset_path_desc().filepath
+                         << ", dest: " << _context.path_desc.filepath;
+            return res;
+        }
+    } else if (!rowset->rowset_path_desc().is_remote() && 
_context.path_desc.is_remote()) {
+        res = rowset->upload_files_to(_context.path_desc, _context.rowset_id);
+        if (!res.ok()) {
+            LOG(WARNING) << "upload_files failed. src: " << 
rowset->rowset_path_desc().debug_string()
+                         << ", dest: " << _context.path_desc.debug_string();
+            return res;
+        }
+    } else {
+        LOG(WARNING) << "add_rowset_for_migration failed. storage_medium is 
invalid. src: "
+                << rowset->rowset_path_desc().debug_string() << ", dest: " << 
_context.path_desc.debug_string();
+        return Status::OLAPInternalError(OLAP_ERR_ROWSET_ADD_MIGRATION_V2);
+    }
+
+    _num_rows_written += rowset->num_rows();
+    _total_data_size += rowset->rowset_meta()->data_disk_size();
+    _total_index_size += rowset->rowset_meta()->index_disk_size();
+    _num_segment += rowset->num_segments();
+    // TODO update zonemap
+    if (rowset->rowset_meta()->has_delete_predicate()) {
+        
_rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate());
+    }
+    return Status::OK();
+}
+
 Status BetaRowsetWriter::flush() {
     if (_segment_writer != nullptr) {
         RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 23f7d207c1..8f9b54b51e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -49,6 +49,8 @@ public:
     Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
                                                    const SchemaMapping& 
schema_mapping) override;
 
+    Status add_rowset_for_migration(RowsetSharedPtr rowset) override;
+
     Status flush() override;
 
     // Return the file size flushed to disk in "flush_size"
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 5674c3703a..2e91f98588 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -196,9 +196,10 @@ public:
     virtual Status link_files_to(const FilePathDesc& dir_desc, RowsetId 
new_rowset_id) = 0;
 
     // copy all files to `dir`
-    virtual Status copy_files_to(const std::string& dir) = 0;
+    virtual Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) = 0;
 
-    virtual Status upload_files_to(const FilePathDesc& dir_desc) { return 
Status::OK(); }
+    virtual Status upload_files_to(const FilePathDesc& dir_desc,
+                const RowsetId&, bool delete_src = false) { return 
Status::OK(); }
 
     virtual Status remove_old_files(std::vector<std::string>* files_to_remove) 
= 0;
 
@@ -216,6 +217,10 @@ public:
 
     bool contains_version(Version version) { return 
rowset_meta()->version().contains(version); }
 
+    FilePathDesc rowset_path_desc() {
+        return _rowset_path_desc;
+    }
+
     static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& 
right) {
         return left->end_version() < right->end_version();
     }
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index 43d1837362..6fb290e3dc 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -50,6 +50,8 @@ public:
     virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset,
                                                            const 
SchemaMapping& schema_mapping) = 0;
 
+    virtual Status add_rowset_for_migration(RowsetSharedPtr rowset) = 0;
+
     // explicit flush all buffered rows into segment file.
     // note that `add_row` could also trigger flush when certain conditions 
are met
     virtual Status flush() = 0;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 11b280be7b..742987f064 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1461,8 +1461,8 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
     {
         std::lock_guard<std::mutex> 
base_tablet_lock(base_tablet->get_push_lock());
         std::lock_guard<std::mutex> 
new_tablet_lock(new_tablet->get_push_lock());
-        std::lock_guard<std::shared_mutex> 
base_tablet_rdlock(base_tablet->get_header_lock());
-        std::lock_guard<std::shared_mutex> 
new_tablet_rdlock(new_tablet->get_header_lock());
+        std::lock_guard<std::shared_mutex> 
base_tablet_wlock(base_tablet->get_header_lock());
+        std::lock_guard<std::shared_mutex> 
new_tablet_wlock(new_tablet->get_header_lock());
         // check if the tablet has alter task
         // if it has alter task, it means it is under old alter process
         size_t num_cols = base_tablet->tablet_schema().num_columns();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 65359ea61d..9816b1b6b5 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -26,6 +26,7 @@
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/classification.hpp>
 #include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/trim.hpp>
 #include <cstdio>
 #include <filesystem>
 #include <new>
@@ -36,6 +37,7 @@
 #include "agent/cgroups_mgr.h"
 #include "agent/task_worker_pool.h"
 #include "env/env.h"
+#include "env/env_util.h"
 #include "olap/base_compaction.h"
 #include "olap/cumulative_compaction.h"
 #include "olap/data_dir.h"
@@ -58,6 +60,8 @@
 #include "util/file_utils.h"
 #include "util/pretty_printer.h"
 #include "util/scoped_cleanup.h"
+#include "util/storage_backend.h"
+#include "util/storage_backend_mgr.h"
 #include "util/time.h"
 #include "util/trace.h"
 
@@ -120,6 +124,8 @@ StorageEngine::StorageEngine(const EngineOptions& options)
                                                          
MemTrackerLevel::OVERVIEW)),
           _schema_change_mem_tracker(MemTracker::create_tracker(
                   -1, "StorageEngine::SchemaChange", nullptr, 
MemTrackerLevel::OVERVIEW)),
+          _storage_migration_mem_tracker(MemTracker::create_tracker(
+                  -1, "StorageEngine::StorageMigration", nullptr, 
MemTrackerLevel::OVERVIEW)),
           _clone_mem_tracker(MemTracker::create_tracker(-1, 
"StorageEngine::Clone", nullptr,
                                                         
MemTrackerLevel::OVERVIEW)),
           _batch_load_mem_tracker(MemTracker::create_tracker(-1, 
"StorageEngine::BatchLoad",
@@ -480,7 +486,9 @@ std::vector<DataDir*> 
StorageEngine::get_stores_for_create_tablet(
         for (auto& it : _store_map) {
             if (it.second->is_used()) {
                 if (_available_storage_medium_type_count == 1 ||
-                    it.second->storage_medium() == storage_medium) {
+                    it.second->storage_medium() == storage_medium ||
+                    (it.second->storage_medium() == 
TStorageMedium::REMOTE_CACHE
+                            && FilePathDesc::is_remote(storage_medium))) {
                     stores.push_back(it.second);
                 }
             }
@@ -684,18 +692,20 @@ Status StorageEngine::start_trash_sweep(double* usage, 
bool ignore_guard) {
         tmp_usage = std::max(tmp_usage, curr_usage);
 
         Status curr_res = Status::OK();
-        string snapshot_path = info.path_desc.filepath + SNAPSHOT_PREFIX;
-        curr_res = _do_sweep(snapshot_path, local_now, snapshot_expire);
+        FilePathDesc snapshot_path_desc(info.path_desc.filepath + 
SNAPSHOT_PREFIX);
+        curr_res = _do_sweep(snapshot_path_desc, local_now, snapshot_expire);
         if (!curr_res.ok()) {
-            LOG(WARNING) << "failed to sweep snapshot. path=" << snapshot_path
+            LOG(WARNING) << "failed to sweep snapshot. path=" << 
snapshot_path_desc.filepath
                          << ", err_code=" << curr_res;
             res = curr_res;
         }
 
-        string trash_path = info.path_desc.filepath + TRASH_PREFIX;
-        curr_res = _do_sweep(trash_path, local_now, curr_usage > guard_space ? 
0 : trash_expire);
+        FilePathDescStream trash_path_desc_s;
+        trash_path_desc_s << info.path_desc << TRASH_PREFIX;
+        FilePathDesc trash_path_desc = trash_path_desc_s.path_desc();
+        curr_res = _do_sweep(trash_path_desc, local_now, curr_usage > 
guard_space ? 0 : trash_expire);
         if (!curr_res.ok()) {
-            LOG(WARNING) << "failed to sweep trash. [path=%s" << trash_path
+            LOG(WARNING) << "failed to sweep trash. path=" << 
trash_path_desc.filepath
                          << ", err_code=" << curr_res;
             res = curr_res;
         }
@@ -804,10 +814,10 @@ void StorageEngine::_clean_unused_txns() {
     }
 }
 
-Status StorageEngine::_do_sweep(const string& scan_root, const time_t& 
local_now,
+Status StorageEngine::_do_sweep(const FilePathDesc& scan_root_desc, const 
time_t& local_now,
                                     const int32_t expire) {
     Status res = Status::OK();
-    if (!FileUtils::check_exist(scan_root)) {
+    if (!FileUtils::check_exist(scan_root_desc.filepath)) {
         // dir not existed. no need to sweep trash.
         return res;
     }
@@ -815,7 +825,7 @@ Status StorageEngine::_do_sweep(const string& scan_root, 
const time_t& local_now
     try {
         // Sort pathes by name, that is by delete time.
         std::vector<path> sorted_pathes;
-        std::copy(directory_iterator(path(scan_root)), directory_iterator(),
+        std::copy(directory_iterator(path(scan_root_desc.filepath)), 
directory_iterator(),
                   std::back_inserter(sorted_pathes));
         std::sort(sorted_pathes.begin(), sorted_pathes.end());
         for (const auto& sorted_path : sorted_pathes) {
@@ -840,10 +850,42 @@ Status StorageEngine::_do_sweep(const string& scan_root, 
const time_t& local_now
 
             string path_name = sorted_path.string();
             if (difftime(local_now, mktime(&local_tm_create)) >= 
actual_expire) {
+                std::string storage_name_path = path_name + "/" + STORAGE_NAME;
+                if (scan_root_desc.is_remote() && 
FileUtils::check_exist(storage_name_path)) {
+                    FilePathDesc remote_path_desc = scan_root_desc;
+                    if (!env_util::read_file_to_string(Env::Default(), 
storage_name_path, &(remote_path_desc.storage_name)).ok()) {
+                        LOG(WARNING) << "read storage_name failed: " << 
storage_name_path;
+                        continue;
+                    }
+                    boost::algorithm::trim(remote_path_desc.storage_name);
+                    std::shared_ptr<StorageBackend> storage_backend = 
StorageBackendMgr::instance()->
+                            get_storage_backend(remote_path_desc.storage_name);
+                    // if storage_backend is nullptr, the remote storage is 
invalid.
+                    // Only the local path need to be removed.
+                    if (storage_backend != nullptr) {
+                        std::string remote_root_path;
+                        if (!StorageBackendMgr::instance()->get_root_path(
+                                remote_path_desc.storage_name, 
&remote_root_path)) {
+                            LOG(WARNING) << "read storage root_path failed: " 
<< remote_path_desc.storage_name;
+                            continue;
+                        }
+                        remote_path_desc.remote_path = remote_root_path + 
TRASH_PREFIX;
+                        std::filesystem::path local_path(path_name);
+                        std::stringstream remote_file_stream;
+                        remote_file_stream << remote_path_desc.remote_path << 
"/" << local_path.filename().string();
+                        Status ret = 
storage_backend->rmdir(remote_file_stream.str());
+                        if (!ret.ok()) {
+                            LOG(WARNING) << "fail to remove file or directory. 
path=" << remote_file_stream.str()
+                                         << ", error=" << ret.to_string();
+                            res = Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
+                            continue;
+                        }
+                    }
+                }
                 Status ret = FileUtils::remove_all(path_name);
                 if (!ret.ok()) {
-                    LOG(WARNING) << "fail to remove file or directory. path=" 
<< path_name
-                                 << ", error=" << ret.to_string();
+                    LOG(WARNING) << "fail to remove file or directory. 
path_desc: "
+                                 << scan_root_desc.debug_string() << ", 
error=" << ret.to_string();
                     res = Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
                     continue;
                 }
@@ -853,7 +895,7 @@ Status StorageEngine::_do_sweep(const string& scan_root, 
const time_t& local_now
             }
         }
     } catch (...) {
-        LOG(WARNING) << "Exception occur when scan directory. path=" << 
scan_root;
+        LOG(WARNING) << "Exception occur when scan directory. path_desc=" << 
scan_root_desc.debug_string();
         res = Status::OLAPInternalError(OLAP_ERR_IO_ERROR);
     }
 
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 0cc0076007..5b6760252f 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -182,6 +182,7 @@ public:
     std::shared_ptr<MemTracker> compaction_mem_tracker() { return 
_compaction_mem_tracker; }
     std::shared_ptr<MemTracker> tablet_mem_tracker() { return 
_tablet_mem_tracker; }
     std::shared_ptr<MemTracker> schema_change_mem_tracker() { return 
_schema_change_mem_tracker; }
+    std::shared_ptr<MemTracker> storage_migration_mem_tracker() { return 
_storage_migration_mem_tracker; }
     std::shared_ptr<MemTracker> clone_mem_tracker() { return 
_clone_mem_tracker; }
     std::shared_ptr<MemTracker> batch_load_mem_tracker() { return 
_batch_load_mem_tracker; }
     std::shared_ptr<MemTracker> consistency_mem_tracker() { return 
_consistency_mem_tracker; }
@@ -214,7 +215,7 @@ private:
 
     void _clean_unused_rowset_metas();
 
-    Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
+    Status _do_sweep(const FilePathDesc& scan_root_desc, const time_t& 
local_tm_now,
                          const int32_t expire);
 
     // All these xxx_callback() functions are for Background threads
@@ -327,6 +328,8 @@ private:
     std::shared_ptr<MemTracker> _tablet_mem_tracker;
     // Count the memory consumption of all SchemaChange tasks.
     std::shared_ptr<MemTracker> _schema_change_mem_tracker;
+    // Count the memory consumption of all StorageMigration tasks.
+    std::shared_ptr<MemTracker> _storage_migration_mem_tracker;
     // Count the memory consumption of all EngineCloneTask.
     // Note: Memory that does not contain make/release snapshots.
     std::shared_ptr<MemTracker> _clone_mem_tracker;
diff --git a/be/src/olap/storage_migration_v2.cpp 
b/be/src/olap/storage_migration_v2.cpp
new file mode 100644
index 0000000000..a0c5716bc5
--- /dev/null
+++ b/be/src/olap/storage_migration_v2.cpp
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/storage_migration_v2.h"
+
+#include <pthread.h>
+#include <signal.h>
+
+#include <algorithm>
+#include <vector>
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/stringbuffer.h"
+
+#include "agent/cgroups_mgr.h"
+#include "common/resource_tls.h"
+#include "env/env_util.h"
+#include "olap/merger.h"
+#include "olap/row.h"
+#include "olap/row_block.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/wrapper_field.h"
+#include "runtime/exec_env.h"
+#include "runtime/thread_context.h"
+#include "util/defer_op.h"
+
+using std::deque;
+using std::list;
+using std::nothrow;
+using std::pair;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+namespace doris {
+
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(storage_migration_mem_consumption, 
MetricUnit::BYTES, "",
+                                   mem_consumption, Labels({{"type", 
"storage_migration"}}));
+
+
+StorageMigrationV2Handler::StorageMigrationV2Handler()
+        : _mem_tracker(MemTracker::create_tracker(
+                -1, "StorageMigrationV2Handler", 
StorageEngine::instance()->storage_migration_mem_tracker())) {
+    REGISTER_HOOK_METRIC(storage_migration_mem_consumption,
+                         [this]() { return _mem_tracker->consumption(); });
+}
+
+StorageMigrationV2Handler::~StorageMigrationV2Handler() {
+    DEREGISTER_HOOK_METRIC(storage_migration_mem_consumption);
+}
+
+Status StorageMigrationV2Handler::process_storage_migration_v2(const 
TStorageMigrationReqV2& request) {
+    LOG(INFO) << "begin to do request storage_migration: base_tablet_id=" << 
request.base_tablet_id
+              << ", new_tablet_id=" << request.new_tablet_id
+              << ", migration_version=" << request.migration_version;
+
+    TabletSharedPtr base_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+            request.base_tablet_id, request.base_schema_hash);
+    if (base_tablet == nullptr) {
+        LOG(WARNING) << "fail to find base tablet. base_tablet=" << 
request.base_tablet_id;
+        return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+    }
+    // Lock schema_change_lock util schema change info is stored in tablet 
header
+    std::unique_lock<std::mutex> 
schema_change_lock(base_tablet->get_schema_change_lock(), std::try_to_lock);
+    if (!schema_change_lock.owns_lock()) {
+        LOG(WARNING) << "failed to obtain schema change lock. "
+                     << "base_tablet=" << request.base_tablet_id;
+        return Status::OLAPInternalError(OLAP_ERR_TRY_LOCK_FAILED);
+    }
+
+    Status res = _do_process_storage_migration_v2(request);
+    LOG(INFO) << "finished storage_migration process, res=" << res;
+    return res;
+}
+
+Status StorageMigrationV2Handler::_do_process_storage_migration_v2(const 
TStorageMigrationReqV2& request) {
+    Status res = Status::OK();
+    TabletSharedPtr base_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+            request.base_tablet_id, request.base_schema_hash);
+    if (base_tablet == nullptr) {
+        LOG(WARNING) << "fail to find base tablet. base_tablet=" << 
request.base_tablet_id
+                     << ", base_schema_hash=" << request.base_schema_hash;
+        return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+    }
+
+    // new tablet has to exist
+    TabletSharedPtr new_tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
+            request.new_tablet_id, request.new_schema_hash);
+    if (new_tablet == nullptr) {
+        LOG(WARNING) << "fail to find new tablet."
+                     << " new_tablet=" << request.new_tablet_id
+                     << ", new_schema_hash=" << request.new_schema_hash;
+        return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
+    }
+
+    // check if tablet's state is not_ready, if it is ready, it means the 
tablet already finished
+    // check whether the tablet's max continuous version == request.version
+    if (new_tablet->tablet_state() != TABLET_NOTREADY) {
+        res = _validate_migration_result(new_tablet, request);
+        LOG(INFO) << "tablet's state=" << new_tablet->tablet_state()
+                  << " the convert job already finished, check its version"
+                  << " res=" << res;
+        return res;
+    }
+
+    LOG(INFO) << "finish to validate storage_migration request. begin to 
migrate data from base tablet "
+                 "to new tablet"
+              << " base_tablet=" << base_tablet->full_name()
+              << " new_tablet=" << new_tablet->full_name();
+
+    std::shared_lock base_migration_rlock(base_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!base_migration_rlock.owns_lock()) {
+        return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
+    }
+    std::shared_lock new_migration_rlock(new_tablet->get_migration_lock(), 
std::try_to_lock);
+    if (!new_migration_rlock.owns_lock()) {
+        return Status::OLAPInternalError(OLAP_ERR_RWLOCK_ERROR);
+    }
+
+    std::vector<Version> versions_to_be_changed;
+    std::vector<RowsetReaderSharedPtr> rs_readers;
+    // delete handlers for new tablet
+    DeleteHandler delete_handler;
+    std::vector<ColumnId> return_columns;
+
+    // begin to find deltas to convert from base tablet to new tablet so that
+    // obtain base tablet and new tablet's push lock and header write lock to 
prevent loading data
+    {
+        std::lock_guard<std::mutex> 
base_tablet_lock(base_tablet->get_push_lock());
+        std::lock_guard<std::mutex> 
new_tablet_lock(new_tablet->get_push_lock());
+        std::lock_guard<std::shared_mutex> 
base_tablet_wlock(base_tablet->get_header_lock());
+        std::lock_guard<std::shared_mutex> 
new_tablet_wlock(new_tablet->get_header_lock());
+        // check if the tablet has alter task
+        // if it has alter task, it means it is under old alter process
+        size_t num_cols = base_tablet->tablet_schema().num_columns();
+        return_columns.resize(num_cols);
+        for (int i = 0; i < num_cols; ++i) {
+            return_columns[i] = i;
+        }
+
+        // reader_context is stack variables, it's lifetime should keep the 
same
+        // with rs_readers
+        RowsetReaderContext reader_context;
+        reader_context.reader_type = READER_ALTER_TABLE;
+        reader_context.tablet_schema = &base_tablet->tablet_schema();
+        reader_context.need_ordered_result = true;
+        reader_context.delete_handler = &delete_handler;
+        reader_context.return_columns = &return_columns;
+        // for schema change, seek_columns is the same to return_columns
+        reader_context.seek_columns = &return_columns;
+        reader_context.sequence_id_idx = 
reader_context.tablet_schema->sequence_col_idx();
+
+        do {
+            // get history data to be converted and it will check if there is 
hold in base tablet
+            res = _get_versions_to_be_changed(base_tablet, 
&versions_to_be_changed);
+            if (!res.ok()) {
+                LOG(WARNING) << "fail to get version to be changed. res=" << 
res;
+                break;
+            }
+
+            // should check the max_version >= request.migration_version, if 
not the convert is useless
+            RowsetSharedPtr max_rowset = 
base_tablet->rowset_with_max_version();
+            if (max_rowset == nullptr || max_rowset->end_version() < 
request.migration_version) {
+                LOG(WARNING) << "base tablet's max version="
+                             << (max_rowset == nullptr ? 0 : 
max_rowset->end_version())
+                             << " is less than request version=" << 
request.migration_version;
+                res = Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST);
+                break;
+            }
+            // before calculating version_to_be_changed,
+            // remove all data from new tablet, prevent to rewrite data(those 
double pushed when wait)
+            LOG(INFO) << "begin to remove all data from new tablet to prevent 
rewrite."
+                      << " new_tablet=" << new_tablet->full_name();
+            std::vector<RowsetSharedPtr> rowsets_to_delete;
+            std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+            new_tablet->acquire_version_and_rowsets(&version_rowsets);
+            for (auto& pair : version_rowsets) {
+                if (pair.first.second <= max_rowset->end_version()) {
+                    rowsets_to_delete.push_back(pair.second);
+                }
+            }
+            std::vector<RowsetSharedPtr> empty_vec;
+            new_tablet->modify_rowsets(empty_vec, rowsets_to_delete);
+            // inherit cumulative_layer_point from base_tablet
+            // check if new_tablet.ce_point > base_tablet.ce_point?
+            new_tablet->set_cumulative_layer_point(-1);
+            // save tablet meta
+            new_tablet->save_meta();
+            for (auto& rowset : rowsets_to_delete) {
+                // do not call rowset.remove directly, using gc thread to 
delete it
+                StorageEngine::instance()->add_unused_rowset(rowset);
+            }
+
+            // init one delete handler
+            int32_t end_version = -1;
+            for (auto& version : versions_to_be_changed) {
+                if (version.second > end_version) {
+                    end_version = version.second;
+                }
+            }
+
+            res = delete_handler.init(base_tablet->tablet_schema(), 
base_tablet->delete_predicates(),
+                                      end_version);
+            if (!res.ok()) {
+                LOG(WARNING) << "init delete handler failed. base_tablet=" << 
base_tablet->full_name()
+                             << ", end_version=" << end_version;
+
+                // release delete handlers which have been inited successfully.
+                delete_handler.finalize();
+                break;
+            }
+
+            // acquire data sources correspond to history versions
+            base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_readers);
+            if (rs_readers.size() < 1) {
+                LOG(WARNING) << "fail to acquire all data sources. "
+                             << "version_num=" << versions_to_be_changed.size()
+                             << ", data_source_num=" << rs_readers.size();
+                res = 
Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
+                break;
+            }
+
+            for (auto& rs_reader : rs_readers) {
+                res = rs_reader->init(&reader_context);
+                if (!res.ok()) {
+                    LOG(WARNING) << "failed to init rowset reader: " << 
base_tablet->full_name();
+                    break;
+                }
+            }
+
+        } while (0);
+    }
+
+    do {
+        if (!res.ok()) {
+            break;
+        }
+        StorageMigrationParams sm_params;
+        sm_params.base_tablet = base_tablet;
+        sm_params.new_tablet = new_tablet;
+        sm_params.ref_rowset_readers = rs_readers;
+        sm_params.delete_handler = &delete_handler;
+
+        res = _convert_historical_rowsets(sm_params);
+        if (!res.ok()) {
+            break;
+        }
+        // set state to ready
+        std::lock_guard<std::shared_mutex> 
new_wlock(new_tablet->get_header_lock());
+        res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
+        if (!res.ok()) {
+            break;
+        }
+        new_tablet->save_meta();
+    } while (0);
+
+    if (res.ok()) {
+        // _validate_migration_result should be outside the above while loop.
+        // to avoid requiring the header lock twice.
+        res = _validate_migration_result(new_tablet, request);
+    }
+
+    // if failed convert history data, then just remove the new tablet
+    if (!res.ok()) {
+        LOG(WARNING) << "failed to alter tablet. base_tablet=" << 
base_tablet->full_name()
+                     << ", drop new_tablet=" << new_tablet->full_name();
+        // do not drop the new tablet and its data. GC thread will
+    }
+
+    return res;
+}
+
+Status StorageMigrationV2Handler::_get_versions_to_be_changed(
+        TabletSharedPtr base_tablet, std::vector<Version>* 
versions_to_be_changed) {
+    RowsetSharedPtr rowset = base_tablet->rowset_with_max_version();
+    if (rowset == nullptr) {
+        LOG(WARNING) << "Tablet has no version. base_tablet=" << 
base_tablet->full_name();
+        return Status::OLAPInternalError(OLAP_ERR_ALTER_DELTA_DOES_NOT_EXISTS);
+    }
+
+    std::vector<Version> span_versions;
+    RETURN_NOT_OK(base_tablet->capture_consistent_versions(Version(0, 
rowset->version().second),
+                                                           &span_versions));
+    versions_to_be_changed->insert(versions_to_be_changed->end(), 
span_versions.begin(),
+                                   span_versions.end());
+
+    return Status::OK();
+}
+
+Status StorageMigrationV2Handler::_convert_historical_rowsets(const 
StorageMigrationParams& sm_params) {
+    LOG(INFO) << "begin to convert historical rowsets for new_tablet from 
base_tablet."
+              << " base_tablet=" << sm_params.base_tablet->full_name()
+              << ", new_tablet=" << sm_params.new_tablet->full_name();
+
+    // find end version
+    int32_t end_version = -1;
+    for (size_t i = 0; i < sm_params.ref_rowset_readers.size(); ++i) {
+        if (sm_params.ref_rowset_readers[i]->version().second > end_version) {
+            end_version = sm_params.ref_rowset_readers[i]->version().second;
+        }
+    }
+
+    Status res = Status::OK();
+    for (auto& rs_reader : sm_params.ref_rowset_readers) {
+        VLOG_TRACE << "begin to convert a history rowset. version=" << 
rs_reader->version().first
+                   << "-" << rs_reader->version().second;
+
+        TabletSharedPtr new_tablet = sm_params.new_tablet;
+
+        RowsetWriterContext writer_context;
+        writer_context.rowset_id = StorageEngine::instance()->next_rowset_id();
+        writer_context.tablet_uid = new_tablet->tablet_uid();
+        writer_context.tablet_id = new_tablet->tablet_id();
+        writer_context.partition_id = new_tablet->partition_id();
+        writer_context.tablet_schema_hash = new_tablet->schema_hash();
+        // linked schema change can't change rowset type, therefore we 
preserve rowset type in schema change now
+        writer_context.rowset_type = 
rs_reader->rowset()->rowset_meta()->rowset_type();
+        if (sm_params.new_tablet->tablet_meta()->preferred_rowset_type() == 
BETA_ROWSET) {
+            writer_context.rowset_type = BETA_ROWSET;
+        }
+        writer_context.path_desc = new_tablet->tablet_path_desc();
+        writer_context.tablet_schema = &(new_tablet->tablet_schema());
+        writer_context.rowset_state = VISIBLE;
+        writer_context.version = rs_reader->version();
+        writer_context.segments_overlap = 
rs_reader->rowset()->rowset_meta()->segments_overlap();
+
+        std::unique_ptr<RowsetWriter> rowset_writer;
+        Status status = RowsetFactory::create_rowset_writer(writer_context, 
&rowset_writer);
+        if (!status.ok()) {
+            res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
+            goto PROCESS_ALTER_EXIT;
+        }
+
+        if ((res = 
_generate_rowset_writer(sm_params.base_tablet->tablet_path_desc(),
+                sm_params.new_tablet->tablet_path_desc(),
+                rs_reader, rowset_writer.get(), new_tablet)) != OLAP_SUCCESS) {
+            LOG(WARNING) << "failed to add_rowset. version=" << 
rs_reader->version().first << "-"
+                         << rs_reader->version().second;
+            new_tablet->data_dir()->remove_pending_ids(
+                    ROWSET_ID_PREFIX + rowset_writer->rowset_id().to_string());
+            goto PROCESS_ALTER_EXIT;
+        }
+        new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+                                                   
rowset_writer->rowset_id().to_string());
+        // Add the new version of the data to the header
+        // In order to prevent the occurrence of deadlock, we must first lock 
the old table, and then lock the new table
+        std::lock_guard<std::mutex> 
lock(sm_params.new_tablet->get_push_lock());
+        RowsetSharedPtr new_rowset = rowset_writer->build();
+        if (new_rowset == nullptr) {
+            LOG(WARNING) << "failed to build rowset, exit alter process";
+            goto PROCESS_ALTER_EXIT;
+        }
+        res = sm_params.new_tablet->add_rowset(new_rowset, false);
+        if (res == 
Status::OLAPInternalError(OLAP_ERR_PUSH_VERSION_ALREADY_EXIST)) {
+            LOG(WARNING) << "version already exist, version revert occurred. "
+                         << "tablet=" << sm_params.new_tablet->full_name() << 
", version='"
+                         << rs_reader->version().first << "-" << 
rs_reader->version().second;
+            StorageEngine::instance()->add_unused_rowset(new_rowset);
+            res = Status::OK();
+        } else if (!res.ok()) {
+            LOG(WARNING) << "failed to register new version. "
+                         << " tablet=" << sm_params.new_tablet->full_name()
+                         << ", version=" << rs_reader->version().first << "-"
+                         << rs_reader->version().second;
+            StorageEngine::instance()->add_unused_rowset(new_rowset);
+            goto PROCESS_ALTER_EXIT;
+        } else {
+            VLOG_NOTICE << "register new version. tablet=" << 
sm_params.new_tablet->full_name()
+                        << ", version=" << rs_reader->version().first << "-"
+                        << rs_reader->version().second;
+        }
+
+        VLOG_TRACE << "succeed to convert a history version."
+                   << " version=" << rs_reader->version().first << "-"
+                   << rs_reader->version().second;
+    }
+    // XXX:The SchemaChange state should not be canceled at this time, because 
the new Delta has to be converted to the old and new Schema version
+    PROCESS_ALTER_EXIT : {
+    // save tablet meta here because rowset meta is not saved during add rowset
+    std::lock_guard<std::shared_mutex> 
new_wlock(sm_params.new_tablet->get_header_lock());
+    sm_params.new_tablet->save_meta();
+}
+    if (res.ok()) {
+        Version test_version(0, end_version);
+        res = sm_params.new_tablet->check_version_integrity(test_version);
+    }
+
+    LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
+              << "base_tablet=" << sm_params.base_tablet->full_name()
+              << ", new_tablet=" << sm_params.new_tablet->full_name();
+    return res;
+}
+
+Status StorageMigrationV2Handler::_validate_migration_result(TabletSharedPtr 
new_tablet,
+                                                       const 
TStorageMigrationReqV2& request) {
+    Version max_continuous_version = {-1, 0};
+    new_tablet->max_continuous_version_from_beginning(&max_continuous_version);
+    LOG(INFO) << "find max continuous version of tablet=" << 
new_tablet->full_name()
+              << ", start_version=" << max_continuous_version.first
+              << ", end_version=" << max_continuous_version.second;
+    if (max_continuous_version.second < request.migration_version) {
+        return Status::OLAPInternalError(OLAP_ERR_VERSION_NOT_EXIST);
+    }
+
+    std::vector<std::pair<Version, RowsetSharedPtr>> version_rowsets;
+    {
+        std::shared_lock rdlock(new_tablet->get_header_lock(), 
std::try_to_lock);
+        new_tablet->acquire_version_and_rowsets(&version_rowsets);
+    }
+    for (auto& pair : version_rowsets) {
+        RowsetSharedPtr rowset = pair.second;
+        if (!rowset->check_file_exist()) {
+            return Status::OLAPInternalError(OLAP_ERR_FILE_NOT_EXIST);
+        }
+    }
+    return Status::OK();
+}
+
+Status StorageMigrationV2Handler::_generate_rowset_writer(
+        const FilePathDesc& src_desc, const FilePathDesc& dst_desc,
+        RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer, 
TabletSharedPtr new_tablet) {
+    if (!src_desc.is_remote() && dst_desc.is_remote()) {
+        string remote_file_param_path = dst_desc.filepath + REMOTE_FILE_PARAM;
+        rapidjson::StringBuffer strbuf;
+        rapidjson::PrettyWriter <rapidjson::StringBuffer> writer(strbuf);
+        writer.StartObject();
+        writer.Key(TABLET_UID.c_str());
+        writer.String(TabletUid(new_tablet->tablet_uid()).to_string().c_str());
+        writer.Key(STORAGE_NAME.c_str());
+        writer.String(dst_desc.storage_name.c_str());
+        writer.EndObject();
+        Status st = env_util::write_string_to_file(
+                Env::Default(), Slice(std::string(strbuf.GetString())), 
remote_file_param_path);
+        // strbuf.GetString() format: {"tablet_uid": 
"a84cfb67d3ad3d62-87fd8b3ae9bdad84", "storage_name": "s3_name"}
+        if (!st.ok()) {
+            LOG(WARNING) << "fail to write tablet_uid and storage_name. path=" 
<< remote_file_param_path
+                         << ", error:" << st.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_COPY_FILE_ERROR);
+        }
+        LOG(INFO) << "write storage_param successfully: " << 
remote_file_param_path;
+    }
+
+    return 
new_rowset_writer->add_rowset_for_migration(rowset_reader->rowset());
+}
+
+} // namespace doris
diff --git a/be/src/olap/storage_migration_v2.h 
b/be/src/olap/storage_migration_v2.h
new file mode 100644
index 0000000000..47ca08d7e3
--- /dev/null
+++ b/be/src/olap/storage_migration_v2.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
+#define DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
+
+#include <deque>
+#include <functional>
+#include <queue>
+#include <utility>
+#include <vector>
+
+#include "gen_cpp/AgentService_types.h"
+#include "olap/column_mapping.h"
+#include "olap/delete_handler.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/tablet.h"
+
+namespace doris {
+
+class StorageMigrationV2Handler {
+public:
+    static StorageMigrationV2Handler* instance() {
+        static StorageMigrationV2Handler instance;
+        return &instance;
+    }
+
+    // schema change v2, it will not set alter task in base tablet
+    Status process_storage_migration_v2(const TStorageMigrationReqV2& request);
+
+private:
+
+    Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
+                                       std::vector<Version>* 
versions_to_be_changed);
+
+    struct StorageMigrationParams {
+        TabletSharedPtr base_tablet;
+        TabletSharedPtr new_tablet;
+        std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
+        DeleteHandler* delete_handler = nullptr;
+    };
+
+    Status _do_process_storage_migration_v2(const TStorageMigrationReqV2& 
request);
+
+    Status _validate_migration_result(TabletSharedPtr new_tablet, const 
TStorageMigrationReqV2& request);
+
+    Status _convert_historical_rowsets(const StorageMigrationParams& 
sm_params);
+
+    Status _generate_rowset_writer(
+            const FilePathDesc& src_desc, const FilePathDesc& dst_desc,
+            RowsetReaderSharedPtr rowset_reader, RowsetWriter* 
new_rowset_writer, TabletSharedPtr new_tablet);
+
+private:
+    StorageMigrationV2Handler();
+    virtual ~StorageMigrationV2Handler();
+    StorageMigrationV2Handler(const StorageMigrationV2Handler&) = delete;
+    StorageMigrationV2Handler& operator=(const StorageMigrationV2Handler&) = 
delete;
+
+    std::shared_ptr<MemTracker> _mem_tracker;
+};
+
+} // namespace doris
+
+#endif // DORIS_BE_SRC_OLAP_STORAGE_MIGRATION_V2_H
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 31624dee34..e0d9068a86 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1279,7 +1279,11 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info) {
     // Useless but it is a required filed in TTabletInfo
     tablet_info->version_hash = 0;
     tablet_info->__set_partition_id(_tablet_meta->partition_id());
-    tablet_info->__set_storage_medium(_data_dir->storage_medium());
+    if (FilePathDesc::is_remote(_data_dir->storage_medium())) {
+        
tablet_info->__set_storage_medium(fs::fs_util::get_t_storage_medium(_tablet_meta->storage_medium()));
+    } else {
+        tablet_info->__set_storage_medium(_data_dir->storage_medium());
+    }
     tablet_info->__set_version_count(_tablet_meta->version_count());
     tablet_info->__set_path_hash(_data_dir->path_hash());
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema().is_in_memory());
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index de1a1f9cc7..4e8019e5c0 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -28,6 +28,9 @@
 #include <cstdio>
 #include <cstdlib>
 #include <filesystem>
+#include "rapidjson/document.h"
+#include "rapidjson/prettywriter.h"
+#include "rapidjson/stringbuffer.h"
 
 #include "env/env.h"
 #include "env/env_util.h"
@@ -220,6 +223,20 @@ Status TabletManager::create_tablet(const 
TCreateTabletReq& request,
     int64_t tablet_id = request.tablet_id;
     LOG(INFO) << "begin to create tablet. tablet_id=" << tablet_id;
 
+    if (FilePathDesc::is_remote(request.storage_medium)) {
+        FilePathDesc path_desc;
+        path_desc.storage_medium = request.storage_medium;
+        path_desc.storage_name = request.storage_param.storage_name;
+        StorageParamPB storage_param;
+        Status st = 
StorageBackendMgr::instance()->get_storage_param(request.storage_param.storage_name,
 &storage_param);
+        if (!st.ok() || storage_param.DebugString() != 
fs::fs_util::get_storage_param_pb(request.storage_param).DebugString()) {
+            LOG(INFO) << "remote storage need to change, create it. 
storage_name: " << request.storage_param.storage_name;
+            
RETURN_NOT_OK_STATUS_WITH_WARN(StorageBackendMgr::instance()->create_remote_storage(
+                    fs::fs_util::get_storage_param_pb(request.storage_param)),
+                            "remote storage create failed. storage_name: " + 
request.storage_param.storage_name);
+        }
+    }
+
     std::lock_guard<std::shared_mutex> 
wrlock(_get_tablets_shard_lock(tablet_id));
     TRACE("got tablets shard lock");
     // Make create_tablet operation to be idempotent:
@@ -250,8 +267,11 @@ Status TabletManager::create_tablet(const 
TCreateTabletReq& request,
         // If we are doing schema-change, we should use the same data dir
         // TODO(lingbin): A litter trick here, the directory should be 
determined before
         // entering this method
-        stores.clear();
-        stores.push_back(base_tablet->data_dir());
+        if (request.storage_medium == 
base_tablet->data_dir()->path_desc().storage_medium
+                || (FilePathDesc::is_remote(request.storage_medium) && 
base_tablet->data_dir()->is_remote())) {
+            stores.clear();
+            stores.push_back(base_tablet->data_dir());
+        }
     }
 
     // set alter type to schema-change. it is useless
@@ -1014,8 +1034,49 @@ void 
TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId t
     if (Env::Default()->path_exists(schema_hash_path).ok()) {
         LOG(INFO) << "start to move tablet to trash. tablet_path = " << 
schema_hash_path;
         FilePathDesc segment_desc(schema_hash_path);
+        string remote_file_param_path = schema_hash_path + REMOTE_FILE_PARAM;
+        if (data_dir->is_remote() && 
FileUtils::check_exist(remote_file_param_path)) {
+            // it means you must remove remote file for this segment first
+            string json_buf;
+            Status s = env_util::read_file_to_string(Env::Default(), 
remote_file_param_path, &json_buf);
+            if (!s.ok()) {
+                LOG(WARNING) << "delete unused file error when read 
remote_file_param_path: "
+                             << remote_file_param_path;
+                return;
+            }
+            // json_buf format: {"tablet_uid": 
"a84cfb67d3ad3d62-87fd8b3ae9bdad84", "storage_name": "s3_name"}
+            std::string storage_name = nullptr;
+            std::string tablet_uid = nullptr;
+            rapidjson::Document dom;
+            if (!dom.Parse(json_buf.c_str()).HasParseError()) {
+                if (dom.HasMember(TABLET_UID.c_str()) && 
dom[TABLET_UID.c_str()].IsString()
+                    && dom.HasMember(STORAGE_NAME.c_str()) && 
dom[STORAGE_NAME.c_str()].IsString()) {
+                    storage_name = dom[STORAGE_NAME.c_str()].GetString();
+                    tablet_uid = dom[TABLET_UID.c_str()].GetString();
+                }
+            }
+            if (!tablet_uid.empty() && !storage_name.empty()) {
+                segment_desc.storage_name = storage_name;
+                StorageParamPB storage_param;
+                if 
(StorageBackendMgr::instance()->get_storage_param(storage_name, &storage_param) 
!= OLAP_SUCCESS) {
+                    LOG(WARNING) << "storage_name is invalid: " << 
storage_name;
+                    return;
+                }
+
+                // remote file may be exist, check and mv it to trash
+                std::filesystem::path local_segment_path(schema_hash_path);
+                std::stringstream remote_file_stream;
+                remote_file_stream << data_dir->path_desc().remote_path << 
DATA_PREFIX
+                                   << "/" << 
local_segment_path.parent_path().parent_path().filename().string()  // shard
+                                   << "/" << 
local_segment_path.parent_path().filename().string()                // 
tablet_path
+                                   << "/" << 
local_segment_path.filename().string()                             // 
segment_path
+                                   << "/" << tablet_uid;
+                segment_desc.storage_medium = 
fs::fs_util::get_t_storage_medium(storage_param.storage_medium());
+                segment_desc.remote_path = remote_file_stream.str();
+            }
+        }
         Status rm_st = data_dir->move_to_trash(segment_desc);
-        if (rm_st != Status::OK()) {
+        if (!rm_st.ok()) {
             LOG(WARNING) << "fail to move dir to trash. dir=" << 
schema_hash_path;
         } else {
             LOG(INFO) << "move path " << schema_hash_path << " to trash 
successfully";
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 060119cae1..68f8ca9eb2 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -406,6 +406,7 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     }
 
     _remote_storage_name = tablet_meta_pb.remote_storage_name();
+    _storage_medium = tablet_meta_pb.storage_medium();
 }
 
 void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
@@ -450,6 +451,10 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     if (_preferred_rowset_type == BETA_ROWSET) {
         tablet_meta_pb->set_preferred_rowset_type(_preferred_rowset_type);
     }
+
+    tablet_meta_pb->set_remote_storage_name(_remote_storage_name);
+    tablet_meta_pb->set_storage_medium(_storage_medium);
+
 }
 
 uint32_t TabletMeta::mem_size() const {
@@ -663,6 +668,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) {
     }
     if (a._in_restore_mode != b._in_restore_mode) return false;
     if (a._preferred_rowset_type != b._preferred_rowset_type) return false;
+    if (a._storage_medium != b._storage_medium) return false;
+    if (a._remote_storage_name != b._remote_storage_name) return false;
     return true;
 }
 
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 73dfc68743..bc883c103b 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -175,6 +175,10 @@ public:
         return _remote_storage_name;
     }
 
+    StorageMediumPB storage_medium() const {
+        return _storage_medium;
+    }
+
 private:
     Status _save_meta(DataDir* data_dir);
     void _init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, 
ColumnPB* column);
@@ -209,6 +213,7 @@ private:
     bool _in_restore_mode = false;
     RowsetTypePB _preferred_rowset_type = BETA_ROWSET;
     std::string _remote_storage_name;
+    StorageMediumPB _storage_medium;
 
     std::shared_mutex _meta_lock;
 };
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 2e55fbb6e1..3af4e189b7 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -341,7 +341,7 @@ Status 
EngineStorageMigrationTask::_copy_index_and_data_files(
         const string& full_path, const std::vector<RowsetSharedPtr>& 
consistent_rowsets) const {
     Status status = Status::OK();
     for (const auto& rs : consistent_rowsets) {
-        status = rs->copy_files_to(full_path);
+        status = rs->copy_files_to(full_path, rs->rowset_id());
         if (!status.ok()) {
             Status ret = FileUtils::remove_all(full_path);
             if (!ret.ok()) {
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp 
b/be/src/olap/task/engine_storage_migration_task_v2.cpp
new file mode 100644
index 0000000000..5e865c9849
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_storage_migration_task_v2.h"
+
+#include "olap/storage_migration_v2.h"
+#include "runtime/mem_tracker.h"
+
+namespace doris {
+
+using std::to_string;
+
+EngineStorageMigrationTaskV2::EngineStorageMigrationTaskV2(const 
TStorageMigrationReqV2& request)
+        : _storage_migration_req(request) {
+    _mem_tracker = MemTracker::create_tracker(
+            config::memory_limitation_per_thread_for_storage_migration_bytes,
+            fmt::format("EngineStorageMigrationTaskV2: {}-{}",
+                        std::to_string(_storage_migration_req.base_tablet_id),
+                        std::to_string(_storage_migration_req.new_tablet_id)),
+            StorageEngine::instance()->storage_migration_mem_tracker(), 
MemTrackerLevel::TASK);
+}
+
+Status EngineStorageMigrationTaskV2::execute() {
+    DorisMetrics::instance()->storage_migrate_v2_requests_total->increment(1);
+    StorageMigrationV2Handler* storage_migration_v2_handler = 
StorageMigrationV2Handler::instance();
+    Status res = 
storage_migration_v2_handler->process_storage_migration_v2(_storage_migration_req);
+
+    if (!res.ok()) {
+        LOG(WARNING) << "failed to do storage migration task. res=" << res
+                     << " base_tablet_id=" << 
_storage_migration_req.base_tablet_id
+                     << ", base_schema_hash=" << 
_storage_migration_req.base_schema_hash
+                     << ", new_tablet_id=" << 
_storage_migration_req.new_tablet_id
+                     << ", new_schema_hash=" << 
_storage_migration_req.new_schema_hash;
+        
DorisMetrics::instance()->storage_migrate_v2_requests_failed->increment(1);
+        return res;
+    }
+
+    LOG(INFO) << "success to create new storage migration v2. res=" << res
+              << " base_tablet_id=" << _storage_migration_req.base_tablet_id 
<< ", base_schema_hash"
+              << _storage_migration_req.base_schema_hash
+              << ", new_tablet_id=" << _storage_migration_req.new_tablet_id
+              << ", new_schema_hash=" << 
_storage_migration_req.new_schema_hash;
+    return res;
+} // execute
+
+} // namespace doris
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.h 
b/be/src/olap/task/engine_storage_migration_task_v2.h
new file mode 100644
index 0000000000..81d57ff636
--- /dev/null
+++ b/be/src/olap/task/engine_storage_migration_task_v2.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
+#define DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
+
+#include "gen_cpp/AgentService_types.h"
+#include "olap/olap_define.h"
+#include "olap/task/engine_task.h"
+
+namespace doris {
+
+// base class for storage engine
+// add "Engine" as task prefix to prevent duplicate name with agent task
+class EngineStorageMigrationTaskV2 : public EngineTask {
+public:
+    virtual Status execute();
+
+public:
+    EngineStorageMigrationTaskV2(const TStorageMigrationReqV2& request);
+    ~EngineStorageMigrationTaskV2() {}
+
+private:
+    const TStorageMigrationReqV2& _storage_migration_req;
+
+    std::shared_ptr<MemTracker> _mem_tracker;
+}; // EngineTask
+
+} // namespace doris
+#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_STORAGE_MIGRATION_TASK_V2_H
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 36c0528ca1..7caf55ac8b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -153,7 +153,6 @@ public:
 
     const std::vector<StorePath>& store_paths() const { return _store_paths; }
     size_t store_path_to_index(const std::string& path) { return 
_store_path_map[path]; }
-    void set_store_paths(const std::vector<StorePath>& paths) { _store_paths = 
paths; }
     StorageEngine* storage_engine() { return _storage_engine; }
     void set_storage_engine(StorageEngine* storage_engine) { _storage_engine = 
storage_engine; }
 
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index a1d781a944..0163815c76 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -64,6 +64,8 @@ DEFINE_ENGINE_COUNTER_METRIC(schema_change_requests_failed, 
schema_change, faile
 DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_total, create_rollup, 
total);
 DEFINE_ENGINE_COUNTER_METRIC(create_rollup_requests_failed, create_rollup, 
failed);
 DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_requests_total, storage_migrate, 
total);
+DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_v2_requests_total, 
storage_migrate_v2, total);
+DEFINE_ENGINE_COUNTER_METRIC(storage_migrate_v2_requests_failed, 
storage_migrate_v2, failed);
 DEFINE_ENGINE_COUNTER_METRIC(delete_requests_total, delete, total);
 DEFINE_ENGINE_COUNTER_METRIC(delete_requests_failed, delete, failed);
 DEFINE_ENGINE_COUNTER_METRIC(clone_requests_total, clone, total);
@@ -208,6 +210,8 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
create_rollup_requests_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
create_rollup_requests_failed);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
storage_migrate_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
storage_migrate_v2_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
storage_migrate_v2_requests_failed);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_total);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, delete_requests_failed);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, clone_requests_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index ca5d05cc8c..aa59d1770b 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -76,6 +76,8 @@ public:
     IntCounter* create_rollup_requests_total;
     IntCounter* create_rollup_requests_failed;
     IntCounter* storage_migrate_requests_total;
+    IntCounter* storage_migrate_v2_requests_total;
+    IntCounter* storage_migrate_v2_requests_failed;
     IntCounter* delete_requests_total;
     IntCounter* delete_requests_failed;
     IntCounter* clone_requests_total;
@@ -190,6 +192,7 @@ public:
     UIntGauge* load_channel_mem_consumption;
     UIntGauge* query_mem_consumption;
     UIntGauge* schema_change_mem_consumption;
+    UIntGauge* storage_migration_mem_consumption;
     UIntGauge* tablet_meta_mem_consumption;
 
     // Cache metrics
diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h
index 0e56ec8eb2..14d1742ae6 100644
--- a/be/src/util/file_utils.h
+++ b/be/src/util/file_utils.h
@@ -53,8 +53,6 @@ public:
     static Status create_dir(const std::string& dir_path, Env* env);
 
     // Delete file recursively.
-    static Status remove_all(const std::string& dir_path, TStorageMedium::type 
store);
-
     static Status remove_all(const std::string& dir_path);
 
     static Status remove(const std::string& path);
diff --git a/be/src/util/storage_backend_mgr.cpp 
b/be/src/util/storage_backend_mgr.cpp
index ab907e114f..53a49c1d5d 100644
--- a/be/src/util/storage_backend_mgr.cpp
+++ b/be/src/util/storage_backend_mgr.cpp
@@ -111,25 +111,25 @@ Status 
StorageBackendMgr::_create_remote_storage_internal(const StorageParamPB&
     }
     std::map<std::string, std::string> storage_prop;
     switch (storage_param_pb.storage_medium()) {
-    case StorageMediumPB::S3:
-    default:
-        S3StorageParamPB s3_storage_param = 
storage_param_pb.s3_storage_param();
-        if (s3_storage_param.s3_ak().empty() || 
s3_storage_param.s3_sk().empty() ||
-            s3_storage_param.s3_endpoint().empty() || 
s3_storage_param.s3_region().empty()) {
-            return Status::InternalError("s3_storage_param param is invalid");
-        }
-        storage_prop[S3_AK] = s3_storage_param.s3_ak();
-        storage_prop[S3_SK] = s3_storage_param.s3_sk();
-        storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
-        storage_prop[S3_REGION] = s3_storage_param.s3_region();
-        storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
-        storage_prop[S3_REQUEST_TIMEOUT_MS] = 
s3_storage_param.s3_request_timeout_ms();
-        storage_prop[S3_CONN_TIMEOUT_MS] = 
s3_storage_param.s3_conn_timeout_ms();
-
-        if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
-            return Status::InternalError("s3_storage_param is invalid");
-        }
-        _storage_backend_map[storage_name] = 
std::make_shared<S3StorageBackend>(storage_prop);
+        case StorageMediumPB::S3:
+        default:
+            S3StorageParamPB s3_storage_param = 
storage_param_pb.s3_storage_param();
+            if (s3_storage_param.s3_ak().empty() || 
s3_storage_param.s3_sk().empty()
+                || s3_storage_param.s3_endpoint().empty() || 
s3_storage_param.s3_region().empty()) {
+                return Status::InternalError("s3_storage_param param is 
invalid");
+            }
+            storage_prop[S3_AK] = s3_storage_param.s3_ak();
+            storage_prop[S3_SK] = s3_storage_param.s3_sk();
+            storage_prop[S3_ENDPOINT] = s3_storage_param.s3_endpoint();
+            storage_prop[S3_REGION] = s3_storage_param.s3_region();
+            storage_prop[S3_MAX_CONN_SIZE] = s3_storage_param.s3_max_conn();
+            storage_prop[S3_REQUEST_TIMEOUT_MS] = 
s3_storage_param.s3_request_timeout_ms();
+            storage_prop[S3_CONN_TIMEOUT_MS] = 
s3_storage_param.s3_conn_timeout_ms();
+
+            if (!ClientFactory::is_s3_conf_valid(storage_prop)) {
+                return Status::InternalError("s3_storage_param is invalid");
+            }
+            _storage_backend_map[storage_name] = 
std::make_shared<S3StorageBackend>(storage_prop);
     }
     _storage_param_map[storage_name] = storage_param_pb;
     _storage_backend_active_time[storage_name] = time(nullptr);
@@ -168,10 +168,11 @@ Status StorageBackendMgr::get_root_path(const 
std::string& storage_name, std::st
 
 std::string StorageBackendMgr::get_root_path_from_param(const StorageParamPB& 
storage_param) {
     switch (storage_param.storage_medium()) {
-    case StorageMediumPB::S3:
-    default: {
-        return storage_param.s3_storage_param().root_path();
-    }
+        case StorageMediumPB::S3:
+        default:
+        {
+            return storage_param.s3_storage_param().root_path();
+        }
     }
 }
 
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index ae5113920c..1eb01e8c5f 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -144,6 +144,14 @@ struct TAlterMaterializedViewParam {
     3: optional Exprs.TExpr mv_expr
 }
 
+struct TStorageMigrationReqV2 {
+    1: optional Types.TTabletId base_tablet_id
+    2: optional Types.TTabletId new_tablet_id
+    3: optional Types.TSchemaHash base_schema_hash
+    4: optional Types.TSchemaHash new_schema_hash
+    5: optional Types.TVersion migration_version
+}
+
 struct TClusterInfo {
     1: required string user
     2: required string password
@@ -350,6 +358,7 @@ struct TAgentTaskRequest {
     25: optional i64 recv_time // time the task is inserted to queue
     26: optional TUpdateTabletMetaInfoReq update_tablet_meta_info_req
     27: optional TCompactionReq compaction_req
+    28: optional TStorageMigrationReqV2 storage_migration_req_v2
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 2cd74860f0..95f3cc4c1e 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -187,7 +187,8 @@ enum TTaskType {
     ALTER,
     INSTALL_PLUGIN,
     UNINSTALL_PLUGIN,
-    COMPACTION
+    COMPACTION,
+    STORAGE_MEDIUM_MIGRATE_V2
 }
 
 enum TStmtType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to