This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 333352a4051 [feature](merge-cloud) Add cloud meta manager rpc
implementation (#29965)
333352a4051 is described below
commit 333352a40511a4ff0427563b390fdc498e55a281
Author: walter <[email protected]>
AuthorDate: Mon Jan 15 21:54:12 2024 +0800
[feature](merge-cloud) Add cloud meta manager rpc implementation (#29965)
Co-authored-by: plat1ko <[email protected]>
Co-authored-by: Gavin Chou <[email protected]>
Co-authored-by: Xin Liao <[email protected]>
Co-authored-by: Xiaocc <[email protected]>
Co-authored-by: deardeng <[email protected]>
Co-authored-by: Lei Zhang <[email protected]>
Co-authored-by: Lightman <[email protected]>
Co-authored-by: Luwei <[email protected]>
Co-authored-by: Yongqiang YANG <[email protected]>
Co-authored-by: YueW <[email protected]>
Co-authored-by: bobhan1 <[email protected]>
---
be/src/cloud/cloud_meta_mgr.cpp | 308 +++++++++++++++++++++++++++++++++-------
be/src/cloud/cloud_meta_mgr.h | 67 +++++----
be/src/cloud/meta_mgr.h | 86 -----------
be/src/common/status.h | 3 +-
be/src/util/s3_util.h | 5 +
5 files changed, 304 insertions(+), 165 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index e6915eae914..cbe6ab8ae24 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -26,6 +26,7 @@
#include <mutex>
#include <random>
#include <shared_mutex>
+#include <type_traits>
#include <vector>
#include "cloud/config.h"
@@ -46,7 +47,9 @@
namespace doris::cloud {
using namespace ErrorCode;
-bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr", "get_rowset");
+static bvar::LatencyRecorder g_get_rowset_latency("doris_CloudMetaMgr",
"get_rowset");
+
+static constexpr int BRPC_RETRY_TIMES = 3;
class MetaServiceProxy {
public:
@@ -166,53 +169,105 @@ private:
std::shared_ptr<MetaService_Stub> _stub;
};
-Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr*
tablet_meta) {
- VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
- TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta",
Status::OK(), tablet_id,
- tablet_meta);
+template <typename T, typename... Ts>
+struct is_any : std::disjunction<std::is_same<T, Ts>...> {};
+
+template <typename T, typename... Ts>
+constexpr bool is_any_v = is_any<T, Ts...>::value;
+
+template <typename Request>
+static std::string debug_info(const Request& req) {
+ if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest,
PrecommitTxnRequest>) {
+ return fmt::format(" txn_id={}", req.txn_id());
+ } else if constexpr (is_any_v<Request, StartTabletJobRequest,
FinishTabletJobRequest>) {
+ return fmt::format(" tablet_id={}", req.job().idx().tablet_id());
+ } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) {
+ return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(),
req.lock_id());
+ } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) {
+ return fmt::format(" table_id={}, lock_id={}", req.table_id(),
req.lock_id());
+ } else if constexpr (is_any_v<Request, GetTabletRequest>) {
+ return fmt::format(" tablet_id={}", req.tablet_id());
+ } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) {
+ return "";
+ } else {
+ static_assert(!sizeof(Request));
+ }
+}
+
+static inline std::default_random_engine make_random_engine() {
+ return std::default_random_engine(
+
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+}
+template <typename Request, typename Response>
+using MetaServiceMethod = void
(MetaService_Stub::*)(::google::protobuf::RpcController*,
+ const Request*, Response*,
+
::google::protobuf::Closure*);
+
+template <typename Request, typename Response>
+static Status retry_rpc(std::string_view op_name, const Request& req,
Response* res,
+ MetaServiceMethod<Request, Response> method) {
+ static_assert(std::is_base_of_v<::google::protobuf::Message, Request>);
+ static_assert(std::is_base_of_v<::google::protobuf::Message, Response>);
+
+ int retry_times = 0;
+ uint32_t duration_ms = 0;
+ std::string error_msg;
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(20, 200);
+ std::uniform_int_distribution<uint32_t> u2(500, 1000);
std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
- int tried = 0;
while (true) {
brpc::Controller cntl;
cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
- GetTabletRequest req;
- GetTabletResponse resp;
- req.set_cloud_unique_id(config::cloud_unique_id);
- req.set_tablet_id(tablet_id);
- stub->get_tablet(&cntl, &req, &resp, nullptr);
- int retry_times = config::meta_service_rpc_retry_times;
- if (cntl.Failed()) {
- if (tried++ < retry_times) {
- auto rng = std::default_random_engine(static_cast<uint32_t>(
-
std::chrono::steady_clock::now().time_since_epoch().count()));
- std::uniform_int_distribution<uint32_t> u(20, 200);
- std::uniform_int_distribution<uint32_t> u1(500, 1000);
- uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng);
-
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
- LOG_INFO("failed to get tablet meta")
- .tag("reason", cntl.ErrorText())
- .tag("tablet_id", tablet_id)
- .tag("tried", tried)
- .tag("sleep", duration_ms);
- continue;
- }
- return Status::RpcError("failed to get tablet meta: {}",
cntl.ErrorText());
+ cntl.set_max_retry(BRPC_RETRY_TIMES);
+ res->Clear();
+ (stub.get()->*method)(&cntl, &req, res, nullptr);
+ if (UNLIKELY(cntl.Failed())) {
+ error_msg = cntl.ErrorText();
+ } else if (res->status().code() == MetaServiceCode::OK) {
+ return Status::OK();
+ } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to
{}: {}", op_name,
+
res->status().msg());
+ } else {
+ error_msg = res->status().msg();
}
+
+ if (++retry_times > config::meta_service_rpc_retry_times) {
+ break;
+ }
+
+ duration_ms = retry_times <= 100 ? u(rng) : u2(rng);
+ LOG(WARNING) << "failed to " << op_name << debug_info(req) << "
retry_times=" << retry_times
+ << " sleep=" << duration_ms << "ms : " <<
cntl.ErrorText();
+ bthread_usleep(duration_ms * 1000);
+ }
+ return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name,
error_msg);
+}
+
+Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr*
tablet_meta) {
+ VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id;
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta",
Status::OK(), tablet_id,
+ tablet_meta);
+ GetTabletRequest req;
+ GetTabletResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_tablet_id(tablet_id);
+ Status st = retry_rpc("get tablet meta", req, &resp,
&MetaService_Stub::get_tablet);
+ if (!st.ok()) {
if (resp.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
return Status::NotFound("failed to get tablet meta: {}",
resp.status().msg());
}
- if (resp.status().code() != MetaServiceCode::OK) {
- return Status::InternalError("failed to get tablet meta: {}",
resp.status().msg());
- }
- *tablet_meta = std::make_shared<TabletMeta>();
- (*tablet_meta)
-
->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
- VLOG_DEBUG << "get tablet meta, tablet_id: " <<
(*tablet_meta)->tablet_id();
- return Status::OK();
+ return st;
}
+
+ *tablet_meta = std::make_shared<TabletMeta>();
+ (*tablet_meta)
+
->init_from_pb(cloud_tablet_meta_to_doris(std::move(*resp.mutable_tablet_meta())));
+ VLOG_DEBUG << "get tablet meta, tablet_id: " <<
(*tablet_meta)->tablet_id();
+ return Status::OK();
}
Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool
warmup_delta_data) {
@@ -226,12 +281,12 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(
return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is
not implemented");
}
-Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
RowsetMetaSharedPtr* existed_rs_meta) {
return Status::NotSupported("CloudMetaMgr::prepare_rowset is not
implemented");
}
-Status CloudMetaMgr::commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
+Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
RowsetMetaSharedPtr* existed_rs_meta) {
return Status::NotSupported("CloudMetaMgr::commit_rowset is not
implemented");
}
@@ -240,50 +295,195 @@ Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta&
rs_meta) {
return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not
implemented");
}
-Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) {
- return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented");
+Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
+ VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " <<
ctx.txn_id
+ << ", label: " << ctx.label << ", is_2pc: " << is_2pc;
+ CommitTxnRequest req;
+ CommitTxnResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_db_id(ctx.db_id);
+ req.set_txn_id(ctx.txn_id);
+ req.set_is_2pc(is_2pc);
+ return retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn);
}
-Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) {
- return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented");
+Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
+ VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " <<
ctx.txn_id
+ << ", label: " << ctx.label;
+ AbortTxnRequest req;
+ AbortTxnResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ if (ctx.db_id > 0 && !ctx.label.empty()) {
+ req.set_db_id(ctx.db_id);
+ req.set_label(ctx.label);
+ } else {
+ req.set_txn_id(ctx.txn_id);
+ }
+ return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn);
}
-Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) {
- return Status::NotSupported("CloudMetaMgr::precommit_txn is not
implemented");
+Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
+ VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " <<
ctx.txn_id
+ << ", label: " << ctx.label;
+ PrecommitTxnRequest req;
+ PrecommitTxnResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_db_id(ctx.db_id);
+ req.set_txn_id(ctx.txn_id);
+ return retry_rpc("precommit txn", req, &res,
&MetaService_Stub::precommit_txn);
}
Status CloudMetaMgr::get_s3_info(std::vector<std::tuple<std::string, S3Conf>>*
s3_infos) {
- return Status::NotSupported("CloudMetaMgr::get_s3_info is not
implemented");
+ GetObjStoreInfoRequest req;
+ GetObjStoreInfoResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ Status s = retry_rpc("get s3 info", req, &resp,
&MetaService_Stub::get_obj_store_info);
+ if (!s.ok()) {
+ return s;
+ }
+
+ for (const auto& obj_store : resp.obj_info()) {
+ S3Conf s3_conf;
+ s3_conf.ak = obj_store.ak();
+ s3_conf.sk = obj_store.sk();
+ s3_conf.endpoint = obj_store.endpoint();
+ s3_conf.region = obj_store.region();
+ s3_conf.bucket = obj_store.bucket();
+ s3_conf.prefix = obj_store.prefix();
+ s3_conf.sse_enabled = obj_store.sse_enabled();
+ s3_conf.provider = obj_store.provider();
+ s3_infos->emplace_back(obj_store.id(), std::move(s3_conf));
+ }
+ return Status::OK();
}
Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job,
StartTabletJobResponse* res) {
- return Status::NotSupported("CloudMetaMgr::prepare_tablet_job is not
implemented");
+ VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString();
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job",
Status::OK(), job, res);
+
+ StartTabletJobRequest req;
+ req.mutable_job()->CopyFrom(job);
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ return retry_rpc("start tablet job", req, res,
&MetaService_Stub::start_tablet_job);
}
Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job,
FinishTabletJobResponse* res) {
- return Status::NotSupported("CloudMetaMgr::commit_tablet_job is not
implemented");
+ VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString();
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job",
Status::OK(), job, res);
+
+ FinishTabletJobRequest req;
+ req.mutable_job()->CopyFrom(job);
+ req.set_action(FinishTabletJobRequest::COMMIT);
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ return retry_rpc("commit tablet job", req, res,
&MetaService_Stub::finish_tablet_job);
}
Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) {
- return Status::NotSupported("CloudMetaMgr::alter_tablet_job is not
implemented");
+ VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString();
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+ req.mutable_job()->CopyFrom(job);
+ req.set_action(FinishTabletJobRequest::ABORT);
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ return retry_rpc("abort tablet job", req, &res,
&MetaService_Stub::finish_tablet_job);
}
Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) {
- return Status::NotSupported("CloudMetaMgr::lease_tablet_job is not
implemented");
+ VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString();
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+ req.mutable_job()->CopyFrom(job);
+ req.set_action(FinishTabletJobRequest::LEASE);
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ return retry_rpc("lease tablet job", req, &res,
&MetaService_Stub::finish_tablet_job);
}
-Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const
TabletSchema* tablet_schema) {
- return Status::NotSupported("CloudMetaMgr::update_tablet_schema is not
implemented");
+Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const
TabletSchema& tablet_schema) {
+ VLOG_DEBUG << "send UpdateTabletSchemaRequest, tablet_id: " << tablet_id;
+
+ std::shared_ptr<MetaService_Stub> stub;
+ RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+
+ brpc::Controller cntl;
+ cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
+ UpdateTabletSchemaRequest req;
+ UpdateTabletSchemaResponse resp;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_tablet_id(tablet_id);
+
+ TabletSchemaPB tablet_schema_pb;
+ tablet_schema.to_schema_pb(&tablet_schema_pb);
+ doris_tablet_schema_to_cloud(req.mutable_tablet_schema(),
std::move(tablet_schema_pb));
+ stub->update_tablet_schema(&cntl, &req, &resp, nullptr);
+ if (cntl.Failed()) {
+ return Status::RpcError("failed to update tablet schema: {}",
cntl.ErrorText());
+ }
+ if (resp.status().code() != MetaServiceCode::OK) {
+ return Status::InternalError("failed to update tablet schema: {}",
resp.status().msg());
+ }
+ VLOG_DEBUG << "succeed to update tablet schema, tablet_id: " << tablet_id;
+ return Status::OK();
}
Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t
lock_id, int64_t initiator,
DeleteBitmap* delete_bitmap) {
- return Status::NotSupported("CloudMetaMgr::update_delete_bitmap is not
implemented");
+ VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id();
+ UpdateDeleteBitmapRequest req;
+ UpdateDeleteBitmapResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_table_id(tablet->table_id());
+ req.set_partition_id(tablet->partition_id());
+ req.set_tablet_id(tablet->tablet_id());
+ req.set_lock_id(lock_id);
+ req.set_initiator(initiator);
+ for (auto iter = delete_bitmap->delete_bitmap.begin();
+ iter != delete_bitmap->delete_bitmap.end(); ++iter) {
+ req.add_rowset_ids(std::get<0>(iter->first).to_string());
+ req.add_segment_ids(std::get<1>(iter->first));
+ req.add_versions(std::get<2>(iter->first));
+ // To save space, convert array and bitmap containers to run containers
+ iter->second.runOptimize();
+ std::string bitmap_data(iter->second.getSizeInBytes(), '\0');
+ iter->second.write(bitmap_data.data());
+ *(req.add_segment_delete_bitmaps()) = std::move(bitmap_data);
+ }
+ auto st = retry_rpc("update delete bitmap", req, &res,
&MetaService_Stub::update_delete_bitmap);
+ if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
+ return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
+ "lock expired when update delete bitmap, tablet_id: {},
lock_id: {}",
+ tablet->tablet_id(), lock_id);
+ }
+ return st;
}
Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet,
int64_t lock_id,
int64_t initiator) {
- return Status::NotSupported("CloudMetaMgr::get_delete_bitmap_update_lock
is not implemented");
+ VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " <<
tablet->tablet_id();
+ GetDeleteBitmapUpdateLockRequest req;
+ GetDeleteBitmapUpdateLockResponse res;
+ req.set_cloud_unique_id(config::cloud_unique_id);
+ req.set_table_id(tablet->table_id());
+ req.set_lock_id(lock_id);
+ req.set_initiator(initiator);
+ req.set_expiration(10); // 10s expiration time for compaction and
schema_change
+ int retry_times = 0;
+ Status st;
+ std::default_random_engine rng = make_random_engine();
+ std::uniform_int_distribution<uint32_t> u(500, 2000);
+ do {
+ st = retry_rpc("get delete bitmap update lock", req, &res,
+ &MetaService_Stub::get_delete_bitmap_update_lock);
+ if (res.status().code() != MetaServiceCode::LOCK_CONFLICT) {
+ break;
+ }
+
+ uint32_t duration_ms = u(rng);
+ LOG(WARNING) << "get delete bitmap lock conflict. " << debug_info(req)
+ << " retry_times=" << retry_times << " sleep=" <<
duration_ms
+ << "ms : " << res.status().msg();
+ bthread_usleep(duration_ms * 1000);
+ } while (++retry_times <= 100);
+ return st;
}
} // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index fe65e0441ff..6557a6eab8a 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -16,55 +16,73 @@
// under the License.
#pragma once
-#include "cloud/meta_mgr.h"
+#include <memory>
+#include <string>
+#include <tuple>
+#include <vector>
+
+#include "common/status.h"
#include "olap/rowset/rowset_meta.h"
+#include "util/s3_util.h"
+
+namespace doris {
+
+class DeleteBitmap;
+class StreamLoadContext;
+class Tablet;
+class TabletMeta;
+class TabletSchema;
+class RowsetMeta;
+
+namespace cloud {
-namespace doris::cloud {
+class FinishTabletJobResponse;
+class StartTabletJobResponse;
+class TabletJobInfoPB;
class TabletStatsPB;
class TabletIndexPB;
-class CloudMetaMgr final : public MetaMgr {
+class CloudMetaMgr {
public:
CloudMetaMgr() = default;
- ~CloudMetaMgr() override = default;
+ ~CloudMetaMgr() = default;
CloudMetaMgr(const CloudMetaMgr&) = delete;
CloudMetaMgr& operator=(const CloudMetaMgr&) = delete;
- Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>*
tablet_meta) override;
+ Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>*
tablet_meta);
- Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false)
override;
+ Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false);
- Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
- std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr) override;
+ Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
+ std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
- Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
- std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr) override;
+ Status commit_rowset(const RowsetMeta& rs_meta, bool is_tmp,
+ std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
- Status update_tmp_rowset(const RowsetMeta& rs_meta) override;
+ Status update_tmp_rowset(const RowsetMeta& rs_meta);
- Status commit_txn(StreamLoadContext* ctx, bool is_2pc) override;
+ Status commit_txn(const StreamLoadContext& ctx, bool is_2pc);
- Status abort_txn(StreamLoadContext* ctx) override;
+ Status abort_txn(const StreamLoadContext& ctx);
- Status precommit_txn(StreamLoadContext* ctx) override;
+ Status precommit_txn(const StreamLoadContext& ctx);
- Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos)
override;
+ Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos);
- Status prepare_tablet_job(const TabletJobInfoPB& job,
StartTabletJobResponse* res) override;
+ Status prepare_tablet_job(const TabletJobInfoPB& job,
StartTabletJobResponse* res);
- Status commit_tablet_job(const TabletJobInfoPB& job,
FinishTabletJobResponse* res) override;
+ Status commit_tablet_job(const TabletJobInfoPB& job,
FinishTabletJobResponse* res);
- Status abort_tablet_job(const TabletJobInfoPB& job) override;
+ Status abort_tablet_job(const TabletJobInfoPB& job);
- Status lease_tablet_job(const TabletJobInfoPB& job) override;
+ Status lease_tablet_job(const TabletJobInfoPB& job);
- Status update_tablet_schema(int64_t tablet_id, const TabletSchema*
tablet_schema) override;
+ Status update_tablet_schema(int64_t tablet_id, const TabletSchema&
tablet_schema);
Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t
initiator,
- DeleteBitmap* delete_bitmap) override;
+ DeleteBitmap* delete_bitmap);
- Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t lock_id,
- int64_t initiator) override;
+ Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t
lock_id, int64_t initiator);
private:
Status sync_tablet_delete_bitmap(
@@ -73,4 +91,5 @@ private:
const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap);
};
-} // namespace doris::cloud
+} // namespace cloud
+} // namespace doris
diff --git a/be/src/cloud/meta_mgr.h b/be/src/cloud/meta_mgr.h
deleted file mode 100644
index c573d43ff76..00000000000
--- a/be/src/cloud/meta_mgr.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-
-#include <memory>
-#include <string>
-#include <tuple>
-#include <vector>
-
-#include "common/status.h"
-#include "util/s3_util.h"
-
-namespace doris {
-class StreamLoadContext;
-class Tablet;
-class TabletMeta;
-class RowsetMeta;
-class TabletSchema;
-class DeleteBitmap;
-
-namespace cloud {
-
-class TabletJobInfoPB;
-class StartTabletJobResponse;
-class FinishTabletJobResponse;
-
-class MetaMgr {
-public:
- virtual ~MetaMgr() = default;
-
- virtual Status open() { return Status::OK(); }
-
- virtual Status get_tablet_meta(int64_t tablet_id,
std::shared_ptr<TabletMeta>* tablet_meta) = 0;
-
- // If `warmup_delta_data` is true, download the new version rowset data in
background
- virtual Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data
= false) = 0;
-
- virtual Status prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp,
- std::shared_ptr<RowsetMeta>* existed_rs_meta
= nullptr) = 0;
-
- virtual Status commit_rowset(const RowsetMeta* rs_meta, bool is_tmp,
- std::shared_ptr<RowsetMeta>* existed_rs_meta
= nullptr) = 0;
-
- virtual Status update_tmp_rowset(const RowsetMeta& rs_meta) = 0;
-
- virtual Status commit_txn(StreamLoadContext* ctx, bool is_2pc) = 0;
-
- virtual Status abort_txn(StreamLoadContext* ctx) = 0;
-
- virtual Status precommit_txn(StreamLoadContext* ctx) = 0;
-
- virtual Status get_s3_info(std::vector<std::tuple<std::string, S3Conf>>*
s3_infos) = 0;
-
- virtual Status prepare_tablet_job(const TabletJobInfoPB& job,
StartTabletJobResponse* res) = 0;
-
- virtual Status commit_tablet_job(const TabletJobInfoPB& job,
FinishTabletJobResponse* res) = 0;
-
- virtual Status abort_tablet_job(const TabletJobInfoPB& job) = 0;
-
- virtual Status lease_tablet_job(const TabletJobInfoPB& job) = 0;
-
- virtual Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id,
int64_t initiator,
- DeleteBitmap* delete_bitmap) = 0;
-
- virtual Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t
lock_id,
- int64_t initiator) = 0;
-
- virtual Status update_tablet_schema(int64_t tablet_id, const TabletSchema*
tablet_schema) = 0;
-};
-
-} // namespace cloud
-} // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index bea168299e1..5199e86d120 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -54,7 +54,8 @@ namespace ErrorCode {
TStatusError(DATA_QUALITY_ERROR, false); \
TStatusError(LABEL_ALREADY_EXISTS, true); \
TStatusError(NOT_AUTHORIZED, true); \
- TStatusError(HTTP_ERROR, true);
+ TStatusError(HTTP_ERROR, true); \
+ TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
E(OK, 0, false); \
diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h
index 82f5cff8ff9..873f6b06f97 100644
--- a/be/src/util/s3_util.h
+++ b/be/src/util/s3_util.h
@@ -20,6 +20,7 @@
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
#include <stdint.h>
#include <map>
@@ -77,6 +78,10 @@ struct S3Conf {
int max_connections = -1;
int request_timeout_ms = -1;
int connect_timeout_ms = -1;
+
+ bool sse_enabled = false;
+ cloud::ObjectStoreInfoPB::Provider provider;
+
bool use_virtual_addressing = true;
std::string to_string() const {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]