This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 333352a4051 [feature](merge-cloud) Add cloud meta manager rpc 
implementation (#29965)
333352a4051 is described below

commit 333352a40511a4ff0427563b390fdc498e55a281
Author: walter <[email protected]>
AuthorDate: Mon Jan 15 21:54:12 2024 +0800

    [feature](merge-cloud) Add cloud meta manager rpc implementation (#29965)
    
    Co-authored-by: plat1ko <[email protected]>
    Co-authored-by: Gavin Chou <[email protected]>
    Co-authored-by: Xin Liao <[email protected]>
    Co-authored-by: Xiaocc <[email protected]>
    Co-authored-by: deardeng <[email protected]>
    Co-authored-by: Lei Zhang <[email protected]>
    Co-authored-by: Lightman <[email protected]>
    Co-authored-by: Luwei <[email protected]>
    Co-authored-by: Yongqiang YANG <[email protected]>
    Co-authored-by: YueW <[email protected]>
    Co-authored-by: bobhan1 <[email protected]>
---
 be/src/cloud/cloud_meta_mgr.cpp | 308 +++++++++++++++++++++++++++++++++-------
 be/src/cloud/cloud_meta_mgr.h   |  67 +++++----
 be/src/cloud/meta_mgr.h         |  86 -----------
 be/src/common/status.h          |   3 +-
 be/src/util/s3_util.h           |   5 +
 5 files changed, 304 insertions(+), 165 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index e6915eae914..cbe6ab8ae24 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -26,6 +26,7 @@
 #include <mutex>
 #include <random>
 #include <shared_mutex>
+#include <type_traits>
 #include <vector>
 
 #include "cloud/config.h"
@@ -46,7 +47,9 @@
 namespace doris::cloud {
 using namespace ErrorCode;
 
-bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr", "get_rowset");
+static bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr", 
"get_rowset");
+
+static constexpr int BRPC_RETRY_TIMES = 3;
 
 class MetaServiceProxy {
 public:
@@ -166,53 +169,105 @@ private:
     std::shared_ptr<MetaService_Stub> _stub;
 };
 
-Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* 
tablet_meta) {
-    VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
-    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", 
Status::OK(), tablet_id,
-                                      tablet_meta);
+template <typename T, typename... Ts>
+struct is_any : std::disjunction<std::is_same<T, Ts>...> {};
+
+template <typename T, typename... Ts>
+constexpr bool is_any_v = is_any<T, Ts...>::value;
+
+template <typename Request>
+static std::string debug_info(const Request& req) {
+    if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, 
PrecommitTxnRequest>) {
+        return fmt::format(" txn_id={}", req.txn_id());
+    } else if constexpr (is_any_v<Request, StartTabletJobRequest, 
FinishTabletJobRequest>) {
+        return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
+    } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
+        return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), 
req.lock_id());
+    } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
+        return fmt::format(" table_id={}, lock_id={}", req.table_id(), 
req.lock_id());
+    } else if constexpr (is_any_v<Request, GetTabletRequest>) {
+        return fmt::format(" tablet_id={}", req.tablet_id());
+    } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) {
+        return "";
+    } else {
+        static_assert(!sizeof(Request));
+    }
+}
+
+static inline std::default_random_engine make_random_engine() {
+    return std::default_random_engine(
+            
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+}
 
+template <typename Request, typename Response>
+using MetaServiceMethod = void 
(MetaService_Stub::*)(::google::protobuf::RpcController*,
+                                                     const Request*, Response*,
+                                                     
::google::protobuf::Closure*);
+
+template <typename Request, typename Response>
+static Status retry_rpc(std::string_view op_name, const Request& req, 
Response* res,
+                        MetaServiceMethod<Request, Response> method) {
+    static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
+    static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
+
+    int retry_times = 0;
+    uint32_t duration_ms = 0;
+    std::string error_msg;
+    std::default_random_engine rng = make_random_engine();
+    std::uniform_int_distribution<uint32_t> u(20, 200);
+    std::uniform_int_distribution<uint32_t> u2(500, 1000);
     std::shared_ptr<MetaService_Stub> stub;
     RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
-    int tried = 0;
     while (true) {
         brpc::Controller cntl;
         cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
-        GetTabletRequest req;
-        GetTabletResponse resp;
-        req.set_cloud_unique_id(config::cloud_unique_id);
-        req.set_tablet_id(tablet_id);
-        stub->get_tablet(&cntl, &req, &resp, nullptr);
-        int retry_times = config::meta_service_rpc_retry_times;
-        if (cntl.Failed()) {
-            if (tried++ < retry_times) {
-                auto rng = std::default_random_engine(static_cast<uint32_t>(
-                        
std::chrono::steady_clock::now().time_since_epoch().count()));
-                std::uniform_int_distribution<uint32_t> u(20, 200);
-                std::uniform_int_distribution<uint32_t> u1(500, 1000);
-                uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
-                
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
-                LOG_INFO("failed to get tablet meta")
-                        .tag("reason", cntl.ErrorText())
-                        .tag("tablet_id", tablet_id)
-                        .tag("tried", tried)
-                        .tag("sleep", duration_ms);
-                continue;
-            }
-            return Status::RpcError("failed to get tablet meta: {}", 
cntl.ErrorText());
+        cntl.set_max_retry(BRPC_RETRY_TIMES);
+        res->Clear();
+        (stub.get()->*method)(&cntl, &req, res, nullptr);
+        if (UNLIKELY(cntl.Failed())) {
+            error_msg = cntl.ErrorText();
+        } else if (res->status().code() == MetaServiceCode::OK) {
+            return Status::OK();
+        } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
+            return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to 
{}: {}", op_name,
+                                                                   
res->status().msg());
+        } else {
+            error_msg = res->status().msg();
         }
+
+        if (++retry_times > config::meta_service_rpc_retry_times) {
+            break;
+        }
+
+        duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
+        LOG(WARNING) << "failed to " << op_name << debug_info(req) << " 
retry_times=" << retry_times
+                     << " sleep=" << duration_ms << "ms : " << 
cntl.ErrorText();
+        bthread_usleep(duration_ms * 1000);
+    }
+    return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, 
error_msg);
+}
+
+Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* 
tablet_meta) {
+    VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", 
Status::OK(), tablet_id,
+                                      tablet_meta);
+    GetTabletRequest req;
+    GetTabletResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_tablet_id(tablet_id);
+    Status st = retry_rpc("get tablet meta", req, &resp, 
&MetaService_Stub::get_tablet);
+    if (!st.ok()) {
         if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
             return Status::NotFound("failed to get tablet meta: {}", 
resp.status().msg());
         }
-        if (resp.status().code() != MetaServiceCode::OK) {
-            return Status::InternalError("failed to get tablet meta: {}", 
resp.status().msg());
-        }
-        *tablet_meta = std::make_shared<TabletMeta>();
-        (*tablet_meta)
-                
->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
-        VLOG_DEBUG << "get tablet meta, tablet_id: " << 
(*tablet_meta)->tablet_id();
-        return Status::OK();
+        return st;
     }
+
+    *tablet_meta = std::make_shared<TabletMeta>();
+    (*tablet_meta)
+            
->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
+    VLOG_DEBUG << "get tablet meta, tablet_id: " << 
(*tablet_meta)->tablet_id();
+    return Status::OK();
 }
 
 Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool 
warmup_delta_data) {
@@ -226,12 +281,12 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(
     return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is 
not implemented");
 }
 
-Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
                                     RowsetMetaSharedPtr* existed_rs_meta) {
     return Status::NotSupported("CloudMetaMgr::prepare_rowset is not 
implemented");
 }
 
-Status CloudMetaMgr::commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
                                    RowsetMetaSharedPtr* existed_rs_meta) {
     return Status::NotSupported("CloudMetaMgr::commit_rowset is not 
implemented");
 }
@@ -240,50 +295,195 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& 
rs_meta) {
     return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not 
implemented");
 }
 
-Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) {
-    return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented");
+Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
+    VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << 
ctx.txn_id
+               << ", label: " << ctx.label << ", is_2pc: " << is_2pc;
+    CommitTxnRequest req;
+    CommitTxnResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_db_id(ctx.db_id);
+    req.set_txn_id(ctx.txn_id);
+    req.set_is_2pc(is_2pc);
+    return retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn);
 }
 
-Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) {
-    return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented");
+Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
+    VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << 
ctx.txn_id
+               << ", label: " << ctx.label;
+    AbortTxnRequest req;
+    AbortTxnResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    if (ctx.db_id > 0 && !ctx.label.empty()) {
+        req.set_db_id(ctx.db_id);
+        req.set_label(ctx.label);
+    } else {
+        req.set_txn_id(ctx.txn_id);
+    }
+    return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
 }
 
-Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) {
-    return Status::NotSupported("CloudMetaMgr::precommit_txn is not 
implemented");
+Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
+    VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << 
ctx.txn_id
+               << ", label: " << ctx.label;
+    PrecommitTxnRequest req;
+    PrecommitTxnResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_db_id(ctx.db_id);
+    req.set_txn_id(ctx.txn_id);
+    return retry_rpc("precommit txn", req, &res, 
&MetaService_Stub::precommit_txn);
 }
 
 Status CloudMetaMgr::get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* 
s3_infos) {
-    return Status::NotSupported("CloudMetaMgr::get_s3_info is not 
implemented");
+    GetObjStoreInfoRequest req;
+    GetObjStoreInfoResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    Status s = retry_rpc("get s3 info", req, &resp, 
&MetaService_Stub::get_obj_store_info);
+    if (!s.ok()) {
+        return s;
+    }
+
+    for (const auto& obj_store : resp.obj_info()) {
+        S3Conf s3_conf;
+        s3_conf.ak = obj_store.ak();
+        s3_conf.sk = obj_store.sk();
+        s3_conf.endpoint = obj_store.endpoint();
+        s3_conf.region = obj_store.region();
+        s3_conf.bucket = obj_store.bucket();
+        s3_conf.prefix = obj_store.prefix();
+        s3_conf.sse_enabled = obj_store.sse_enabled();
+        s3_conf.provider = obj_store.provider();
+        s3_infos->emplace_back(obj_store.id(), std::move(s3_conf));
+    }
+    return Status::OK();
 }
 
 Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, 
StartTabletJobResponse* res) {
-    return Status::NotSupported("CloudMetaMgr::prepare_tablet_job is not 
implemented");
+    VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString();
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", 
Status::OK(), job, res);
+
+    StartTabletJobRequest req;
+    req.mutable_job()->CopyFrom(job);
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    return retry_rpc("start tablet job", req, res, 
&MetaService_Stub::start_tablet_job);
 }
 
 Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, 
FinishTabletJobResponse* res) {
-    return Status::NotSupported("CloudMetaMgr::commit_tablet_job is not 
implemented");
+    VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString();
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", 
Status::OK(), job, res);
+
+    FinishTabletJobRequest req;
+    req.mutable_job()->CopyFrom(job);
+    req.set_action(FinishTabletJobRequest::COMMIT);
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    return retry_rpc("commit tablet job", req, res, 
&MetaService_Stub::finish_tablet_job);
 }
 
 Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
-    return Status::NotSupported("CloudMetaMgr::alter_tablet_job is not 
implemented");
+    VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString();
+    FinishTabletJobRequest req;
+    FinishTabletJobResponse res;
+    req.mutable_job()->CopyFrom(job);
+    req.set_action(FinishTabletJobRequest::ABORT);
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    return retry_rpc("abort tablet job", req, &res, 
&MetaService_Stub::finish_tablet_job);
 }
 
 Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
-    return Status::NotSupported("CloudMetaMgr::lease_tablet_job is not 
implemented");
+    VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString();
+    FinishTabletJobRequest req;
+    FinishTabletJobResponse res;
+    req.mutable_job()->CopyFrom(job);
+    req.set_action(FinishTabletJobRequest::LEASE);
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    return retry_rpc("lease tablet job", req, &res, 
&MetaService_Stub::finish_tablet_job);
 }
 
-Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const 
TabletSchema* tablet_schema) {
-    return Status::NotSupported("CloudMetaMgr::update_tablet_schema is not 
implemented");
+Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const 
TabletSchema& tablet_schema) {
+    VLOG_DEBUG << "send UpdateTabletSchemaRequest, tablet_id: " << tablet_id;
+
+    std::shared_ptr<MetaService_Stub> stub;
+    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+    brpc::Controller cntl;
+    cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
+    UpdateTabletSchemaRequest req;
+    UpdateTabletSchemaResponse resp;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_tablet_id(tablet_id);
+
+    TabletSchemaPB tablet_schema_pb;
+    tablet_schema.to_schema_pb(&tablet_schema_pb);
+    doris_tablet_schema_to_cloud(req.mutable_tablet_schema(), 
std::move(tablet_schema_pb));
+    stub->update_tablet_schema(&cntl, &req, &resp, nullptr);
+    if (cntl.Failed()) {
+        return Status::RpcError("failed to update tablet schema: {}", 
cntl.ErrorText());
+    }
+    if (resp.status().code() != MetaServiceCode::OK) {
+        return Status::InternalError("failed to update tablet schema: {}", 
resp.status().msg());
+    }
+    VLOG_DEBUG << "succeed to update tablet schema, tablet_id: " << tablet_id;
+    return Status::OK();
 }
 
 Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t 
lock_id, int64_t initiator,
                                           DeleteBitmap* delete_bitmap) {
-    return Status::NotSupported("CloudMetaMgr::update_delete_bitmap is not 
implemented");
+    VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id();
+    UpdateDeleteBitmapRequest req;
+    UpdateDeleteBitmapResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_table_id(tablet->table_id());
+    req.set_partition_id(tablet->partition_id());
+    req.set_tablet_id(tablet->tablet_id());
+    req.set_lock_id(lock_id);
+    req.set_initiator(initiator);
+    for (auto iter = delete_bitmap->delete_bitmap.begin();
+         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
+        req.add_rowset_ids(std::get<0>(iter->first).to_string());
+        req.add_segment_ids(std::get<1>(iter->first));
+        req.add_versions(std::get<2>(iter->first));
+        // To save space, convert array and bitmap containers to run containers
+        iter->second.runOptimize();
+        std::string bitmap_data(iter->second.getSizeInBytes(), '\0');
+        iter->second.write(bitmap_data.data());
+        *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
+    }
+    auto st = retry_rpc("update delete bitmap", req, &res, 
&MetaService_Stub::update_delete_bitmap);
+    if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
+        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
+                "lock expired when update delete bitmap, tablet_id: {}, 
lock_id: {}",
+                tablet->tablet_id(), lock_id);
+    }
+    return st;
 }
 
 Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet, 
int64_t lock_id,
                                                    int64_t initiator) {
-    return Status::NotSupported("CloudMetaMgr::get_delete_bitmap_update_lock 
is not implemented");
+    VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << 
tablet->tablet_id();
+    GetDeleteBitmapUpdateLockRequest req;
+    GetDeleteBitmapUpdateLockResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_table_id(tablet->table_id());
+    req.set_lock_id(lock_id);
+    req.set_initiator(initiator);
+    req.set_expiration(10); // 10s expiration time for compaction and 
schema_change
+    int retry_times = 0;
+    Status st;
+    std::default_random_engine rng = make_random_engine();
+    std::uniform_int_distribution<uint32_t> u(500, 2000);
+    do {
+        st = retry_rpc("get delete bitmap update lock", req, &res,
+                       &MetaService_Stub::get_delete_bitmap_update_lock);
+        if (res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
+            break;
+        }
+
+        uint32_t duration_ms = u(rng);
+        LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req)
+                     << " retry_times=" << retry_times << " sleep=" << 
duration_ms
+                     << "ms : " << res.status().msg();
+        bthread_usleep(duration_ms * 1000);
+    } while (++retry_times <= 100);
+    return st;
 }
 
 } // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index fe65e0441ff..6557a6eab8a 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -16,55 +16,73 @@
 // under the License.
 #pragma once
 
-#include "cloud/meta_mgr.h"
+#include <memory>
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "common/status.h"
 #include "olap/rowset/rowset_meta.h"
+#include "util/s3_util.h"
+
+namespace doris {
+
+class DeleteBitmap;
+class StreamLoadContext;
+class Tablet;
+class TabletMeta;
+class TabletSchema;
+class RowsetMeta;
+
+namespace cloud {
 
-namespace doris::cloud {
+class FinishTabletJobResponse;
+class StartTabletJobResponse;
+class TabletJobInfoPB;
 class TabletStatsPB;
 class TabletIndexPB;
 
-class CloudMetaMgr final : public MetaMgr {
+class CloudMetaMgr {
 public:
     CloudMetaMgr() = default;
-    ~CloudMetaMgr() override = default;
+    ~CloudMetaMgr() = default;
     CloudMetaMgr(const CloudMetaMgr&) = delete;
     CloudMetaMgr& operator=(const CloudMetaMgr&) = delete;
 
-    Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* 
tablet_meta) override;
+    Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* 
tablet_meta);
 
-    Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false) 
override;
+    Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false);
 
-    Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
-                          std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr) override;
+    Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
+                          std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
 
-    Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
-                         std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr) override;
+    Status commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
+                         std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
 
-    Status update_tmp_rowset(const RowsetMeta& rs_meta) override;
+    Status update_tmp_rowset(const RowsetMeta& rs_meta);
 
-    Status commit_txn(StreamLoadContext* ctx, bool is_2pc) override;
+    Status commit_txn(const StreamLoadContext& ctx, bool is_2pc);
 
-    Status abort_txn(StreamLoadContext* ctx) override;
+    Status abort_txn(const StreamLoadContext& ctx);
 
-    Status precommit_txn(StreamLoadContext* ctx) override;
+    Status precommit_txn(const StreamLoadContext& ctx);
 
-    Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) 
override;
+    Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos);
 
-    Status prepare_tablet_job(const TabletJobInfoPB& job, 
StartTabletJobResponse* res) override;
+    Status prepare_tablet_job(const TabletJobInfoPB& job, 
StartTabletJobResponse* res);
 
-    Status commit_tablet_job(const TabletJobInfoPB& job, 
FinishTabletJobResponse* res) override;
+    Status commit_tablet_job(const TabletJobInfoPB& job, 
FinishTabletJobResponse* res);
 
-    Status abort_tablet_job(const TabletJobInfoPB& job) override;
+    Status abort_tablet_job(const TabletJobInfoPB& job);
 
-    Status lease_tablet_job(const TabletJobInfoPB& job) override;
+    Status lease_tablet_job(const TabletJobInfoPB& job);
 
-    Status update_tablet_schema(int64_t tablet_id, const TabletSchema* 
tablet_schema) override;
+    Status update_tablet_schema(int64_t tablet_id, const TabletSchema& 
tablet_schema);
 
     Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t 
initiator,
-                                DeleteBitmap* delete_bitmap) override;
+                                DeleteBitmap* delete_bitmap);
 
-    Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id,
-                                         int64_t initiator) override;
+    Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t 
lock_id, int64_t initiator);
 
 private:
     Status sync_tablet_delete_bitmap(
@@ -73,4 +91,5 @@ private:
             const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap);
 };
 
-} // namespace doris::cloud
+} // namespace cloud
+} // namespace doris
diff --git a/be/src/cloud/meta_mgr.h b/be/src/cloud/meta_mgr.h
deleted file mode 100644
index c573d43ff76..00000000000
--- a/be/src/cloud/meta_mgr.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <memory>
-#include <string>
-#include <tuple>
-#include <vector>
-
-#include "common/status.h"
-#include "util/s3_util.h"
-
-namespace doris {
-class StreamLoadContext;
-class Tablet;
-class TabletMeta;
-class RowsetMeta;
-class TabletSchema;
-class DeleteBitmap;
-
-namespace cloud {
-
-class TabletJobInfoPB;
-class StartTabletJobResponse;
-class FinishTabletJobResponse;
-
-class MetaMgr {
-public:
-    virtual ~MetaMgr() = default;
-
-    virtual Status open() { return Status::OK(); }
-
-    virtual Status get_tablet_meta(int64_t tablet_id, 
std::shared_ptr<TabletMeta>* tablet_meta) = 0;
-
-    // If `warmup_delta_data` is true, download the new version rowset data in 
background
-    virtual Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data 
= false) = 0;
-
-    virtual Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
-                                  std::shared_ptr<RowsetMeta>* existed_rs_meta 
= nullptr) = 0;
-
-    virtual Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
-                                 std::shared_ptr<RowsetMeta>* existed_rs_meta 
= nullptr) = 0;
-
-    virtual Status update_tmp_rowset(const RowsetMeta& rs_meta) = 0;
-
-    virtual Status commit_txn(StreamLoadContext* ctx, bool is_2pc) = 0;
-
-    virtual Status abort_txn(StreamLoadContext* ctx) = 0;
-
-    virtual Status precommit_txn(StreamLoadContext* ctx) = 0;
-
-    virtual Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* 
s3_infos) = 0;
-
-    virtual Status prepare_tablet_job(const TabletJobInfoPB& job, 
StartTabletJobResponse* res) = 0;
-
-    virtual Status commit_tablet_job(const TabletJobInfoPB& job, 
FinishTabletJobResponse* res) = 0;
-
-    virtual Status abort_tablet_job(const TabletJobInfoPB& job) = 0;
-
-    virtual Status lease_tablet_job(const TabletJobInfoPB& job) = 0;
-
-    virtual Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, 
int64_t initiator,
-                                        DeleteBitmap* delete_bitmap) = 0;
-
-    virtual Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t 
lock_id,
-                                                 int64_t initiator) = 0;
-
-    virtual Status update_tablet_schema(int64_t tablet_id, const TabletSchema* 
tablet_schema) = 0;
-};
-
-} // namespace cloud
-} // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index bea168299e1..5199e86d120 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -54,7 +54,8 @@ namespace ErrorCode {
     TStatusError(DATA_QUALITY_ERROR, false);             \
     TStatusError(LABEL_ALREADY_EXISTS, true);            \
     TStatusError(NOT_AUTHORIZED, true);                  \
-    TStatusError(HTTP_ERROR, true);
+    TStatusError(HTTP_ERROR, true);                      \
+    TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
 // E error_name, error_code, print_stacktrace
 #define APPLY_FOR_OLAP_ERROR_CODES(E)                        \
     E(OK, 0, false);                                         \
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 82f5cff8ff9..873f6b06f97 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -20,6 +20,7 @@
 #include <aws/core/Aws.h>
 #include <aws/core/client/ClientConfiguration.h>
 #include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
 #include <stdint.h>
 
 #include <map>
@@ -77,6 +78,10 @@ struct S3Conf {
     int max_connections = -1;
     int request_timeout_ms = -1;
     int connect_timeout_ms = -1;
+
+    bool sse_enabled = false;
+    cloud::ObjectStoreInfoPB::Provider provider;
+
     bool use_virtual_addressing = true;
 
     std::string to_string() const {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to