This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 15d4404edec [feature](binlog) Add ingest_binlog/http_get_snapshot
limit download speed && Add async ingest_binlog (#26323) (#26733)
15d4404edec is described below
commit 15d4404edecc5ce6dcee917bfcc64d20e9d78230
Author: Jack Drogon <[email protected]>
AuthorDate: Sun Nov 12 19:21:22 2023 +0800
[feature](binlog) Add ingest_binlog/http_get_snapshot limit download speed
&& Add async ingest_binlog (#26323) (#26733)
---
be/src/common/config.cpp | 6 +
be/src/common/config.h | 5 +
be/src/http/action/download_action.cpp | 29 +-
be/src/http/action/download_action.h | 9 +-
be/src/http/action/download_binlog_action.cpp | 12 +-
be/src/http/action/download_binlog_action.h | 7 +-
be/src/http/ev_http_server.cpp | 66 +--
be/src/http/ev_http_server.h | 5 +
be/src/http/http_channel.cpp | 13 +-
be/src/http/http_channel.h | 4 +-
be/src/http/utils.cpp | 5 +-
be/src/http/utils.h | 5 +-
be/src/olap/txn_manager.cpp | 157 ++++--
be/src/olap/txn_manager.h | 32 +-
be/src/runtime/snapshot_loader.cpp | 2 +-
be/src/service/backend_service.cpp | 602 +++++++++++++--------
be/src/service/backend_service.h | 5 +
be/src/service/http_service.cpp | 40 +-
be/src/service/http_service.h | 4 +
.../org/apache/doris/common/GenericPoolTest.java | 8 +
.../apache/doris/utframe/MockedBackendFactory.java | 8 +
gensrc/thrift/BackendService.thrift | 23 +
.../apache/doris/regression/suite/Syncer.groovy | 3 +-
.../suites/ccr_syncer_p0/test_ingest_binlog.groovy | 4 +-
24 files changed, 716 insertions(+), 338 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index bd3e5f046ac..22ec143c6da 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1095,6 +1095,12 @@ DEFINE_String(default_tzfiles_path,
"${DORIS_HOME}/zoneinfo");
// avoid accepting error or too large package causing OOM,default 20000000(20M)
DEFINE_Int32(be_thrift_max_pkg_bytes, "20000000");
+// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
+DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
+
+// Download binlog rate limit, unit is KB/s, 0 means no limit
+DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
+
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b200ae2d6cf..b2fa2563816 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1151,6 +1151,11 @@ DECLARE_String(default_tzfiles_path);
// the max package bytes be thrift server can receive
// avoid accepting error or too large package causing OOM,default 20000000(20M)
DECLARE_Int32(be_thrift_max_pkg_bytes);
+// Ingest binlog work pool size
+DECLARE_Int32(ingest_binlog_work_pool_size);
+
+// Download binlog rate limit, unit is KB/s
+DECLARE_Int32(download_binlog_rate_limit_kbs);
#ifdef BE_TEST
// test s3
diff --git a/be/src/http/action/download_action.cpp
b/be/src/http/action/download_action.cpp
index 3041dc348de..720b9d65fa3 100644
--- a/be/src/http/action/download_action.cpp
+++ b/be/src/http/action/download_action.cpp
@@ -32,13 +32,20 @@
#include "runtime/exec_env.h"
namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const
std::vector<std::string>& allow_dirs,
- int32_t num_workers)
- : _exec_env(exec_env), _download_type(NORMAL),
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";
+static const std::string CHANNEL_PARAMETER = "channel";
+static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
+} // namespace
+
+DownloadAction::DownloadAction(ExecEnv* exec_env,
+ std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group,
+ const std::vector<std::string>& allow_dirs,
int32_t num_workers)
+ : _exec_env(exec_env),
+ _download_type(NORMAL),
+ _num_workers(num_workers),
+ _rate_limit_group(std::move(rate_limit_group)) {
for (auto& dir : allow_dirs) {
std::string p;
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
@@ -105,7 +112,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const
std::string& file_par
if (is_dir) {
do_dir_response(file_param, req);
} else {
- do_file_response(file_param, req);
+ const auto& channel = req->param(CHANNEL_PARAMETER);
+ bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
+ if (ingest_binlog) {
+ do_file_response(file_param, req, _rate_limit_group.get());
+ } else {
+ do_file_response(file_param, req);
+ }
}
}
diff --git a/be/src/http/action/download_action.h
b/be/src/http/action/download_action.h
index d8e468d9585..3aab1a0d314 100644
--- a/be/src/http/action/download_action.h
+++ b/be/src/http/action/download_action.h
@@ -24,6 +24,8 @@
#include "http/http_handler.h"
#include "util/threadpool.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
class ExecEnv;
@@ -36,8 +38,9 @@ class HttpRequest;
// We use parameter named 'file' to specify the static resource path, it is an
absolute path.
class DownloadAction : public HttpHandler {
public:
- DownloadAction(ExecEnv* exec_env, const std::vector<std::string>&
allow_dirs,
- int32_t num_workers = 0);
+ DownloadAction(ExecEnv* exec_env,
+ std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group,
+ const std::vector<std::string>& allow_dirs, int32_t
num_workers = 0);
// for load error
DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
@@ -67,6 +70,8 @@ private:
std::string _error_log_root_dir;
int32_t _num_workers;
std::unique_ptr<ThreadPool> _download_workers;
+
+ std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
}; // end class DownloadAction
} // end namespace doris
diff --git a/be/src/http/action/download_binlog_action.cpp
b/be/src/http/action/download_binlog_action.cpp
index a23d5ec109f..697512b2a30 100644
--- a/be/src/http/action/download_binlog_action.cpp
+++ b/be/src/http/action/download_binlog_action.cpp
@@ -21,8 +21,10 @@
#include <fmt/ranges.h>
#include <cstdint>
+#include <limits>
#include <stdexcept>
#include <string_view>
+#include <utility>
#include <vector>
#include "common/config.h"
@@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
}
/// handle get segment file, need tablet_id, rowset_id && index
-void handle_get_segment_file(HttpRequest* req) {
+void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group*
rate_limit_group) {
// Step 1: get download file path
std::string segment_file_path;
try {
@@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
return;
}
- do_file_response(segment_file_path, req);
+ do_file_response(segment_file_path, req, rate_limit_group);
}
void handle_get_rowset_meta(HttpRequest* req) {
@@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {
} // namespace
-DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) :
_exec_env(exec_env) {}
+DownloadBinlogAction::DownloadBinlogAction(
+ ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group)
+ : _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group))
{}
void DownloadBinlogAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download binlog request " <<
req->debug_string();
@@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
if (method == "get_binlog_info") {
handle_get_binlog_info(req);
} else if (method == "get_segment_file") {
- handle_get_segment_file(req);
+ handle_get_segment_file(req, _rate_limit_group.get());
} else if (method == "get_rowset_meta") {
handle_get_rowset_meta(req);
} else {
diff --git a/be/src/http/action/download_binlog_action.h
b/be/src/http/action/download_binlog_action.h
index 3cbd9b9e5b0..77a2ed08780 100644
--- a/be/src/http/action/download_binlog_action.h
+++ b/be/src/http/action/download_binlog_action.h
@@ -17,12 +17,15 @@
#pragma once
+#include <memory>
#include <string>
#include <vector>
#include "common/status.h"
#include "http/http_handler.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
class ExecEnv;
@@ -30,7 +33,8 @@ class HttpRequest;
class DownloadBinlogAction : public HttpHandler {
public:
- DownloadBinlogAction(ExecEnv* exec_env);
+ DownloadBinlogAction(ExecEnv* exec_env,
+ std::shared_ptr<bufferevent_rate_limit_group>
rate_limit_group);
virtual ~DownloadBinlogAction() = default;
void handle(HttpRequest* req) override;
@@ -40,6 +44,7 @@ private:
private:
ExecEnv* _exec_env;
+ std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
};
} // namespace doris
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index b0743baee38..1bbd2c0e178 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void*
param) {
EvHttpServer::EvHttpServer(int port, int num_workers)
: _port(port), _num_workers(num_workers), _real_port(0) {
_host = BackendOptions::get_service_bind_address();
+
+ evthread_use_pthreads();
DCHECK_GT(_num_workers, 0);
+ _event_bases.resize(_num_workers);
+ for (int i = 0; i < _num_workers; ++i) {
+ std::shared_ptr<event_base> base(event_base_new(),
+ [](event_base* base) {
event_base_free(base); });
+ CHECK(base != nullptr) << "Couldn't create an event_base.";
+ std::lock_guard lock(_event_bases_lock);
+ _event_bases[i] = base;
+ }
}
EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
@@ -103,38 +113,32 @@ void EvHttpServer::start() {
// bind to
auto s = _bind();
CHECK(s.ok()) << s.to_string();
- ThreadPoolBuilder("EvHttpServer")
- .set_min_threads(_num_workers)
- .set_max_threads(_num_workers)
- .build(&_workers);
-
- evthread_use_pthreads();
- _event_bases.resize(_num_workers);
+ static_cast<void>(ThreadPoolBuilder("EvHttpServer")
+ .set_min_threads(_num_workers)
+ .set_max_threads(_num_workers)
+ .build(&_workers));
for (int i = 0; i < _num_workers; ++i) {
- CHECK(_workers->submit_func([this, i]() {
- std::shared_ptr<event_base> base(event_base_new(),
[](event_base* base) {
- event_base_free(base);
- });
- CHECK(base != nullptr) << "Couldn't create an
event_base.";
- {
- std::lock_guard<std::mutex>
lock(_event_bases_lock);
- _event_bases[i] = base;
- }
-
- /* Create a new evhttp object to handle requests. */
- std::shared_ptr<evhttp> http(evhttp_new(base.get()),
- [](evhttp* http) {
evhttp_free(http); });
- CHECK(http != nullptr) << "Couldn't create an
evhttp.";
-
- auto res = evhttp_accept_socket(http.get(),
_server_fd);
- CHECK(res >= 0) << "evhttp accept socket failed,
res=" << res;
-
- evhttp_set_newreqcb(http.get(), on_connection, this);
- evhttp_set_gencb(http.get(), on_request, this);
-
- event_base_dispatch(base.get());
- })
- .ok());
+ auto status = _workers->submit_func([this, i]() {
+ std::shared_ptr<event_base> base;
+ {
+ std::lock_guard lock(_event_bases_lock);
+ base = _event_bases[i];
+ }
+
+ /* Create a new evhttp object to handle requests. */
+ std::shared_ptr<evhttp> http(evhttp_new(base.get()),
+ [](evhttp* http) { evhttp_free(http);
});
+ CHECK(http != nullptr) << "Couldn't create an evhttp.";
+
+ auto res = evhttp_accept_socket(http.get(), _server_fd);
+ CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
+
+ evhttp_set_newreqcb(http.get(), on_connection, this);
+ evhttp_set_gencb(http.get(), on_request, this);
+
+ event_base_dispatch(base.get());
+ });
+ CHECK(status.ok());
}
}
diff --git a/be/src/http/ev_http_server.h b/be/src/http/ev_http_server.h
index e7ad1c052ab..d74a8cb4efd 100644
--- a/be/src/http/ev_http_server.h
+++ b/be/src/http/ev_http_server.h
@@ -55,6 +55,11 @@ public:
// get real port
int get_real_port() const { return _real_port; }
+ std::vector<std::shared_ptr<event_base>> get_event_bases() {
+ std::lock_guard lock(_event_bases_lock);
+ return _event_bases;
+ }
+
private:
Status _bind();
HttpHandler* _find_handler(HttpRequest* req);
diff --git a/be/src/http/http_channel.cpp b/be/src/http/http_channel.cpp
index 5727ba3902e..96679195316 100644
--- a/be/src/http/http_channel.cpp
+++ b/be/src/http/http_channel.cpp
@@ -18,6 +18,7 @@
#include "http/http_channel.h"
#include <event2/buffer.h>
+#include <event2/bufferevent.h>
#include <event2/http.h>
#include <algorithm>
@@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request,
HttpStatus status, const std:
evbuffer_free(evb);
}
-void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t
size) {
+void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t
size,
+ bufferevent_rate_limit_group* rate_limit_group) {
auto evb = evbuffer_new();
evbuffer_add_file(evb, fd, off, size);
- evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
- default_reason(HttpStatus::OK).c_str(), evb);
+ auto* evhttp_request = request->get_evhttp_request();
+ if (rate_limit_group) {
+ auto* evhttp_connection =
evhttp_request_get_connection(evhttp_request);
+ auto* buffer_event =
evhttp_connection_get_bufferevent(evhttp_connection);
+ bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
+ }
+ evhttp_send_reply(evhttp_request, HttpStatus::OK,
default_reason(HttpStatus::OK).c_str(), evb);
evbuffer_free(evb);
}
diff --git a/be/src/http/http_channel.h b/be/src/http/http_channel.h
index 478f013af82..ee1e6c0888f 100644
--- a/be/src/http/http_channel.h
+++ b/be/src/http/http_channel.h
@@ -23,6 +23,7 @@
#include "http/http_status.h"
+struct bufferevent_rate_limit_group;
namespace doris {
class HttpRequest;
@@ -43,7 +44,8 @@ public:
static void send_reply(HttpRequest* request, HttpStatus status, const
std::string& content);
- static void send_file(HttpRequest* request, int fd, size_t off, size_t
size);
+ static void send_file(HttpRequest* request, int fd, size_t off, size_t
size,
+ bufferevent_rate_limit_group* rate_limit_group =
nullptr);
static bool compress_content(const std::string& accept_encoding, const
std::string& input,
std::string* output);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index fe11738d5ab..3dd0b839022 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -120,7 +120,8 @@ std::string get_content_type(const std::string& file_name) {
return "";
}
-void do_file_response(const std::string& file_path, HttpRequest* req) {
+void do_file_response(const std::string& file_path, HttpRequest* req,
+ bufferevent_rate_limit_group* rate_limit_group) {
if (file_path.find("..") != std::string::npos) {
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
@@ -161,7 +162,7 @@ void do_file_response(const std::string& file_path,
HttpRequest* req) {
return;
}
- HttpChannel::send_file(req, fd, 0, file_size);
+ HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
}
void do_dir_response(const std::string& dir_path, HttpRequest* req) {
diff --git a/be/src/http/utils.h b/be/src/http/utils.h
index 5928039c492..2d1e13fbe4e 100644
--- a/be/src/http/utils.h
+++ b/be/src/http/utils.h
@@ -22,6 +22,8 @@
#include "common/utils.h"
#include "http/http_request.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
struct AuthInfo;
@@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string*
user, std::string* pa
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
-void do_file_response(const std::string& dir_path, HttpRequest* req);
+void do_file_response(const std::string& dir_path, HttpRequest* req,
+ bufferevent_rate_limit_group* rate_limit_group =
nullptr);
void do_dir_response(const std::string& dir_path, HttpRequest* req);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 7d4e1757252..a57fba4f7b2 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -142,6 +142,7 @@ Status TxnManager::prepare_txn(TPartitionId partition_id,
TTransactionId transac
// case 1: user start a new txn, rowset = null
// case 2: loading txn from meta env
TabletTxnInfo load_info(load_id, nullptr, ingest);
+ load_info.prepare();
txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
VLOG_NOTICE << "add transaction to engine successfully."
@@ -166,6 +167,29 @@ Status TxnManager::publish_txn(TPartitionId partition_id,
const TabletSharedPtr&
stats);
}
+void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId
transaction_id,
+ TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid) {
+ pair<int64_t, int64_t> key(partition_id, transaction_id);
+ TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
+
+ std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
+
+ auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it == txn_tablet_map.end()) {
+ return;
+ }
+
+ auto& tablet_txn_info_map = it->second;
+ if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
+ tablet_txn_info_iter == tablet_txn_info_map.end()) {
+ return;
+ } else {
+ auto& txn_info = tablet_txn_info_iter->second;
+ txn_info.abort();
+ }
+}
+
// delete the txn from manager if it is not committed(not have a valid rowset)
Status TxnManager::rollback_txn(TPartitionId partition_id, const
TabletSharedPtr& tablet,
TTransactionId transaction_id) {
@@ -223,6 +247,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
<< " partition_id=" << partition_id << " transaction_id="
<< transaction_id
<< " tablet_id=" << tablet_id;
}
+
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
if (rowset_ptr == nullptr) {
@@ -252,28 +277,30 @@ Status TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
// case 1: user commit rowset, then the load id must be equal
TabletTxnInfo& load_info = load_itr->second;
// check if load id is equal
- if (load_info.load_id.hi() == load_id.hi() && load_info.load_id.lo()
== load_id.lo() &&
- load_info.rowset != nullptr &&
- load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
- // find a rowset with same rowset id, then it means a duplicate
call
+ if (load_info.rowset == nullptr) {
+ break;
+ }
+
+ if (load_info.load_id.hi() != load_id.hi() || load_info.load_id.lo()
!= load_id.lo()) {
+ break;
+ }
+
+ // find a rowset with same rowset id, then it means a duplicate call
+ if (load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
LOG(INFO) << "find rowset exists when commit transaction to
engine."
<< "partition_id: " << key.first << ", transaction_id: "
<< key.second
<< ", tablet: " << tablet_info.to_string()
<< ", rowset_id: " << load_info.rowset->rowset_id();
return Status::OK();
- } else if (load_info.load_id.hi() == load_id.hi() &&
- load_info.load_id.lo() == load_id.lo() && load_info.rowset
!= nullptr &&
- load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) {
- // find a rowset with different rowset id, then it should not
happen, just return errors
- return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
- "find rowset exists when commit transaction to engine. but
rowset ids are not "
- "same. partition_id: {}, transaction_id: {}, tablet: {},
exist rowset_id: {}, "
- "new rowset_id: {}",
- key.first, key.second, tablet_info.to_string(),
- load_info.rowset->rowset_id().to_string(),
rowset_ptr->rowset_id().to_string());
- } else {
- break;
}
+
+ // find a rowset with different rowset id, then it should not happen,
just return errors
+ return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
+ "find rowset exists when commit transaction to engine. but
rowset ids are not "
+ "same. partition_id: {}, transaction_id: {}, tablet: {}, exist
rowset_id: {}, new "
+ "rowset_id: {}",
+ key.first, key.second, tablet_info.to_string(),
+ load_info.rowset->rowset_id().to_string(),
rowset_ptr->rowset_id().to_string());
} while (false);
// if not in recovery mode, then should persist the meta to meta env
@@ -301,6 +328,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
load_info.delete_bitmap.reset(new
DeleteBitmap(tablet->tablet_id()));
}
}
+ load_info.commit();
+
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
@@ -453,30 +482,36 @@ Status TxnManager::rollback_txn(TPartitionId
partition_id, TTransactionId transa
TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
+
std::lock_guard<std::shared_mutex>
wrlock(_get_txn_map_lock(transaction_id));
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+
auto it = txn_tablet_map.find(key);
- if (it != txn_tablet_map.end()) {
- auto load_itr = it->second.find(tablet_info);
- if (load_itr != it->second.end()) {
- // found load for txn,tablet
- // case 1: user commit rowset, then the load id must be equal
- TabletTxnInfo& load_info = load_itr->second;
- if (load_info.rowset != nullptr) {
- return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
- "if rowset is not null, it means other thread may
commit the rowset should "
- "not delete txn any more");
- }
- }
- it->second.erase(tablet_info);
- LOG(INFO) << "rollback transaction from engine successfully."
- << " partition_id: " << key.first << ", transaction_id: " <<
key.second
- << ", tablet: " << tablet_info.to_string();
- if (it->second.empty()) {
- txn_tablet_map.erase(it);
- _clear_txn_partition_map_unlocked(transaction_id, partition_id);
+ if (it == txn_tablet_map.end()) {
+ return Status::OK();
+ }
+
+ auto& tablet_txn_info_map = it->second;
+ if (auto load_itr = tablet_txn_info_map.find(tablet_info);
+ load_itr != tablet_txn_info_map.end()) {
+ // found load for txn,tablet
+ // case 1: user commit rowset, then the load id must be equal
+ TabletTxnInfo& load_info = load_itr->second;
+ if (load_info.rowset != nullptr) {
+ return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
+ "if rowset is not null, it means other thread may commit
the rowset should "
+ "not delete txn any more");
}
}
+
+ tablet_txn_info_map.erase(tablet_info);
+ LOG(INFO) << "rollback transaction from engine successfully."
+ << " partition_id: " << key.first << ", transaction_id: " <<
key.second
+ << ", tablet: " << tablet_info.to_string();
+ if (tablet_txn_info_map.empty()) {
+ txn_tablet_map.erase(it);
+ _clear_txn_partition_map_unlocked(transaction_id, partition_id);
+ }
return Status::OK();
}
@@ -651,18 +686,6 @@ void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
}
}
-bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId
transaction_id,
- TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid) {
- pair<int64_t, int64_t> key(partition_id, transaction_id);
- TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
- std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
- txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
- auto it = txn_tablet_map.find(key);
- bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) !=
it->second.end();
-
- return found;
-}
-
void TxnManager::build_expire_txn_map(std::map<TabletInfo,
std::vector<int64_t>>* expire_txn_map) {
int64_t now = UnixSeconds();
// traverse the txn map, and get all expired txns
@@ -672,13 +695,15 @@ void
TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
auto txn_id = it.first.second;
for (auto& t_map : it.second) {
double diff = difftime(now, t_map.second.creation_time);
- if (diff >= config::pending_data_expire_time_sec) {
- (*expire_txn_map)[t_map.first].push_back(txn_id);
- if (VLOG_IS_ON(3)) {
- VLOG_NOTICE << "find expired txn."
- << " tablet=" << t_map.first.to_string()
- << " transaction_id=" << txn_id << "
exist_sec=" << diff;
- }
+ if (diff < config::pending_data_expire_time_sec) {
+ continue;
+ }
+
+ (*expire_txn_map)[t_map.first].push_back(txn_id);
+ if (VLOG_IS_ON(3)) {
+ VLOG_NOTICE << "find expired txn."
+ << " tablet=" << t_map.first.to_string()
+ << " transaction_id=" << txn_id << "
exist_sec=" << diff;
}
}
}
@@ -797,4 +822,28 @@ void TxnManager::update_tablet_version_txn(int64_t
tablet_id, int64_t version, i
_tablet_version_cache->release(handle);
}
+TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId
transaction_id,
+ TTabletId tablet_id, SchemaHash schema_hash,
+ TabletUid tablet_uid) {
+ pair<int64_t, int64_t> key(partition_id, transaction_id);
+ TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
+
+ std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
+
+ auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+ auto it = txn_tablet_map.find(key);
+ if (it == txn_tablet_map.end()) {
+ return TxnState::NOT_FOUND;
+ }
+
+ auto& tablet_txn_info_map = it->second;
+ if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
+ tablet_txn_info_iter == tablet_txn_info_map.end()) {
+ return TxnState::NOT_FOUND;
+ } else {
+ const auto& txn_info = tablet_txn_info_iter->second;
+ return txn_info.state;
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 7483b9ff37e..0a8d0ee3026 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -51,6 +51,15 @@ class DeltaWriter;
class OlapMeta;
struct TabletPublishStatistics;
+enum class TxnState {
+ NOT_FOUND = 0,
+ PREPARED = 1,
+ COMMITTED = 2,
+ ROLLEDBACK = 3,
+ ABORTED = 4,
+ DELETED = 5,
+};
+
struct TabletTxnInfo {
PUniqueId load_id;
RowsetSharedPtr rowset;
@@ -61,6 +70,9 @@ struct TabletTxnInfo {
int64_t creation_time;
bool ingest {false};
std::shared_ptr<PartialUpdateInfo> partial_update_info;
+ TxnState state {TxnState::PREPARED};
+
+ TabletTxnInfo() = default;
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
@@ -77,7 +89,14 @@ struct TabletTxnInfo {
rowset_ids(ids),
creation_time(UnixSeconds()) {}
- TabletTxnInfo() {}
+ void prepare() { state = TxnState::PREPARED; }
+ void commit() { state = TxnState::COMMITTED; }
+ void rollback() { state = TxnState::ROLLEDBACK; }
+ void abort() {
+ if (state == TxnState::PREPARED) {
+ state = TxnState::ABORTED;
+ }
+ }
};
struct CommitTabletTxnInfo {
@@ -149,6 +168,10 @@ public:
TTabletId tablet_id, SchemaHash schema_hash, TabletUid
tablet_uid,
const Version& version, TabletPublishStatistics* stats);
+ // only abort not committed txn
+ void abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id,
+ SchemaHash schema_hash, TabletUid tablet_uid);
+
// delete the txn from manager if it is not committed(not have a valid
rowset)
Status rollback_txn(TPartitionId partition_id, TTransactionId
transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid
tablet_uid);
@@ -167,10 +190,6 @@ public:
void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);
- // Just check if the txn exists.
- bool has_txn(TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id,
- SchemaHash schema_hash, TabletUid tablet_uid);
-
// Get all expired txns and save them in expire_txn_map.
// This is currently called before reporting all tablet info, to avoid
iterating txn map for every tablets.
void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>*
expire_txn_map);
@@ -199,6 +218,9 @@ public:
int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t
txn_id);
+ TxnState get_txn_state(TPartitionId partition_id, TTransactionId
transaction_id,
+ TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid);
+
private:
using TxnKey = std::pair<int64_t, int64_t>; // partition_id,
transaction_id;
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index 44f641c173d..af15371ab3c 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -467,7 +467,7 @@ Status SnapshotLoader::remote_http_download(
for (const auto& filename : filename_list) {
std::string remote_file_url = fmt::format(
- "http://{}:{}/api/_tablet/_download?token={}&file={}/{}",
+
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog",
remote_tablet_snapshot.remote_be_addr.hostname,
remote_tablet_snapshot.remote_be_addr.port,
remote_tablet_snapshot.remote_token,
remote_tablet_snapshot.remote_snapshot_path, filename);
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 11094e20ece..a0fa2066dfc 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -36,6 +36,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <thread>
#include <utility>
#include <vector>
@@ -63,6 +64,7 @@
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/arrow/row_batch.h"
#include "util/defer_op.h"
+#include "util/threadpool.h"
#include "util/thrift_server.h"
#include "util/uid_util.h"
@@ -79,6 +81,284 @@ class TTransportException;
namespace doris {
+namespace {
+constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+struct IngestBinlogArg {
+ int64_t txn_id;
+ int64_t partition_id;
+ int64_t local_tablet_id;
+ TabletSharedPtr local_tablet;
+ TIngestBinlogRequest request;
+ TStatus* tstatus;
+};
+
+void _ingest_binlog(IngestBinlogArg* arg) {
+ auto txn_id = arg->txn_id;
+ auto partition_id = arg->partition_id;
+ auto local_tablet_id = arg->local_tablet_id;
+ const auto& local_tablet = arg->local_tablet;
+ const auto& local_tablet_uid = local_tablet->tablet_uid();
+
+ auto& request = arg->request;
+
+ TStatus tstatus;
+ Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
+ LOG(INFO) << "ingest binlog. result: " <<
apache::thrift::ThriftDebugString(tstatus);
+ if (tstatus.status_code != TStatusCode::OK) {
+ // abort txn
+ StorageEngine::instance()->txn_manager()->abort_txn(
+ partition_id, txn_id, local_tablet_id,
local_tablet->schema_hash(),
+ local_tablet_uid);
+ }
+
+ if (ingest_binlog_tstatus) {
+ *ingest_binlog_tstatus = std::move(tstatus);
+ }
+ }};
+
+ auto set_tstatus = [&tstatus](TStatusCode::type code, std::string
error_msg) {
+ tstatus.__set_status_code(code);
+ tstatus.__isset.error_msgs = true;
+ tstatus.error_msgs.push_back(std::move(error_msg));
+ };
+
+ // Step 3: get binlog info
+ auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download",
request.remote_host,
+ request.remote_port);
+ constexpr int max_retry = 3;
+
+ auto get_binlog_info_url =
+ fmt::format("{}?method={}&tablet_id={}&binlog_version={}",
binlog_api_url,
+ "get_binlog_info", request.remote_tablet_id,
request.binlog_version);
+ std::string binlog_info;
+ auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient*
client) {
+ RETURN_IF_ERROR(client->init(get_binlog_info_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ return client->execute(&binlog_info);
+ };
+ auto status = HttpClient::execute_with_retry(max_retry, 1,
get_binlog_info_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get binlog info from " <<
get_binlog_info_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ std::vector<std::string> binlog_info_parts = strings::Split(binlog_info,
":");
+ // TODO(Drogon): check binlog info content is right
+ DCHECK(binlog_info_parts.size() == 2);
+ const std::string& remote_rowset_id = binlog_info_parts[0];
+ int64_t num_segments = std::stoll(binlog_info_parts[1]);
+
+ // Step 4: get rowset meta
+ auto get_rowset_meta_url = fmt::format(
+ "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}",
binlog_api_url,
+ "get_rowset_meta", request.remote_tablet_id, remote_rowset_id,
request.binlog_version);
+ std::string rowset_meta_str;
+ auto get_rowset_meta_cb = [&get_rowset_meta_url,
&rowset_meta_str](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(get_rowset_meta_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ return client->execute(&rowset_meta_str);
+ };
+ status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get rowset meta from " <<
get_rowset_meta_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ RowsetMetaPB rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
+ LOG(WARNING) << "failed to parse rowset meta from " <<
get_rowset_meta_url;
+ status = Status::InternalError("failed to parse rowset meta");
+ status.to_thrift(&tstatus);
+ return;
+ }
+ // rewrite rowset meta
+ rowset_meta_pb.set_tablet_id(local_tablet_id);
+ rowset_meta_pb.set_partition_id(partition_id);
+
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
+ rowset_meta_pb.set_txn_id(txn_id);
+ rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
+ auto rowset_meta = std::make_shared<RowsetMeta>();
+ if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
+ LOG(WARNING) << "failed to init rowset meta from " <<
get_rowset_meta_url;
+ status = Status::InternalError("failed to init rowset meta");
+ status.to_thrift(&tstatus);
+ return;
+ }
+ RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+ rowset_meta->set_rowset_id(new_rowset_id);
+ rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
+
+ // Step 5: get all segment files
+ // Step 5.1: get all segment files size
+ std::vector<std::string> segment_file_urls;
+ segment_file_urls.reserve(num_segments);
+ std::vector<uint64_t> segment_file_sizes;
+ segment_file_sizes.reserve(num_segments);
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto get_segment_file_size_url = fmt::format(
+ "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}",
binlog_api_url,
+ "get_segment_file", request.remote_tablet_id,
remote_rowset_id, segment_index);
+ uint64_t segment_file_size;
+ auto get_segment_file_size_cb = [&get_segment_file_size_url,
+ &segment_file_size](HttpClient*
client) {
+ RETURN_IF_ERROR(client->init(get_segment_file_size_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ RETURN_IF_ERROR(client->head());
+ return client->get_content_length(&segment_file_size);
+ };
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_size_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file size from " <<
get_segment_file_size_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ segment_file_sizes.push_back(segment_file_size);
+ segment_file_urls.push_back(std::move(get_segment_file_size_url));
+ }
+
+ // Step 5.2: check data capacity
+ uint64_t total_size = std::accumulate(segment_file_sizes.begin(),
segment_file_sizes.end(), 0);
+ if (!local_tablet->can_add_binlog(total_size)) {
+ LOG(WARNING) << "failed to add binlog, no enough space, total_size="
<< total_size
+ << ", tablet=" << local_tablet->tablet_id();
+ status = Status::InternalError("no enough space");
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ // Step 5.3: get all segment files
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto segment_file_size = segment_file_sizes[segment_index];
+ auto get_segment_file_url = segment_file_urls[segment_index];
+
+ uint64_t estimate_timeout =
+ segment_file_size / config::download_low_speed_limit_kbps /
1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ auto local_segment_path = BetaRowset::segment_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index);
+ LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
+ local_segment_path);
+ auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
+ estimate_timeout](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(get_segment_file_url));
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download(local_segment_path));
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_file_size =
std::filesystem::file_size(local_segment_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to
{}",
+ local_segment_path, ec.message());
+ }
+ if (local_file_size != segment_file_size) {
+ LOG(WARNING) << "download file length error"
+ << ", get_segment_file_url=" <<
get_segment_file_url
+ << ", file_size=" << segment_file_size
+ << ", local_file_size=" << local_file_size;
+ return Status::InternalError("downloaded file size is not
equal");
+ }
+ chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
+ return Status::OK();
+ };
+
+ auto status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file from " <<
get_segment_file_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ }
+
+ // Step 6: create rowset && calculate delete bitmap && commit
+ // Step 6.1: create rowset
+ RowsetSharedPtr rowset;
+ status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
+ local_tablet->tablet_path(),
rowset_meta, &rowset);
+
+ if (!status) {
+ LOG(WARNING) << "failed to create rowset from rowset meta for remote
tablet"
+ << ". rowset_id: " << rowset_meta_pb.rowset_id()
+ << ", rowset_type: " << rowset_meta_pb.rowset_type()
+ << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() <<
", txn_id=" << txn_id
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ // Step 6.2 calculate delete bitmap before commit
+ auto calc_delete_bitmap_token =
+
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
+ DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(local_tablet_id);
+ RowsetIdUnorderedSet pre_rowset_ids;
+ if (local_tablet->enable_unique_key_merge_on_write()) {
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
+ status = beta_rowset->load_segments(&segments);
+ if (!status) {
+ LOG(WARNING) << "failed to load segments from rowset"
+ << ". rowset_id: " << beta_rowset->rowset_id() << ",
txn_id=" << txn_id
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ if (segments.size() > 1) {
+ // calculate delete bitmap between segments
+ status = local_tablet->calc_delete_bitmap_between_segments(rowset,
segments,
+
delete_bitmap);
+ if (!status) {
+ LOG(WARNING) << "failed to calculate delete bitmap"
+ << ". tablet_id: " << local_tablet->tablet_id()
+ << ". rowset_id: " << rowset->rowset_id() << ",
txn_id=" << txn_id
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ }
+
+ static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
+ rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
+ calc_delete_bitmap_token.get(), nullptr));
+ static_cast<void>(calc_delete_bitmap_token->wait());
+ }
+
+ // Step 6.3: commit txn
+ Status commit_txn_status =
StorageEngine::instance()->txn_manager()->commit_txn(
+ local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
+ rowset_meta->txn_id(), rowset_meta->tablet_id(),
local_tablet->schema_hash(),
+ local_tablet->tablet_uid(), rowset_meta->load_id(), rowset, false);
+ if (!commit_txn_status &&
!commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
+ auto err_msg = fmt::format(
+ "failed to commit txn for remote tablet. rowset_id: {},
remote_tablet_id={}, "
+ "txn_id={}, status={}",
+ rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
+ rowset_meta->txn_id(), commit_txn_status.to_string());
+ LOG(WARNING) << err_msg;
+ set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
+ return;
+ }
+
+ if (local_tablet->enable_unique_key_merge_on_write()) {
+
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
+ partition_id, txn_id, local_tablet_id,
local_tablet->schema_hash(),
+ local_tablet->tablet_uid(), true, delete_bitmap,
pre_rowset_ids, nullptr);
+ }
+
+ tstatus.__set_status_code(TStatusCode::OK);
+}
+} // namespace
+
using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::TMultiplexedProcessor;
@@ -89,18 +369,32 @@ BackendService::BackendService(ExecEnv* exec_env)
: _exec_env(exec_env), _agent_server(new AgentServer(exec_env,
*exec_env->master_info())) {}
Status BackendService::create_service(ExecEnv* exec_env, int port,
ThriftServer** server) {
- std::shared_ptr<BackendService> handler(new BackendService(exec_env));
+ auto service = std::make_shared<BackendService>(exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't
starve
// be requests
- std::shared_ptr<ThreadFactory> thread_factory(new ThreadFactory());
-
- std::shared_ptr<TProcessor> be_processor(new
BackendServiceProcessor(handler));
+ // std::shared_ptr<TProcessor> be_processor =
std::make_shared<BackendServiceProcessor>(service);
+ auto be_processor = std::make_shared<BackendServiceProcessor>(service);
*server = new ThriftServer("backend", be_processor, port,
config::be_service_threads);
LOG(INFO) << "Doris BackendService listening on " << port;
+ auto thread_num = config::ingest_binlog_work_pool_size;
+ if (thread_num < 0) {
+ LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we
will in sync mode",
+ thread_num);
+ return Status::OK();
+ }
+
+ if (thread_num == 0) {
+ thread_num = std::thread::hardware_concurrency();
+ }
+ static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog")
+ .set_min_threads(thread_num)
+ .set_max_threads(thread_num * 2)
+ .build(&(service->_ingest_binlog_workers)));
+ LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async
mode", thread_num);
return Status::OK();
}
@@ -390,8 +684,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
const TIngestBinlogRequest& request) {
LOG(INFO) << "ingest binlog. request: " <<
apache::thrift::ThriftDebugString(request);
- constexpr uint64_t kMaxTimeoutMs = 1000;
-
TStatus tstatus;
Defer defer {[&result, &tstatus]() {
result.__set_status(tstatus);
@@ -446,6 +738,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
return;
}
+ if (!request.__isset.local_tablet_id) {
+ auto error_msg = "local_tablet_id is empty";
+ LOG(WARNING) << error_msg;
+ set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
+ return;
+ }
if (!request.__isset.load_id) {
auto error_msg = "load_id is empty";
LOG(WARNING) << error_msg;
@@ -480,240 +778,106 @@ void BackendService::ingest_binlog(TIngestBinlogResult&
result,
return;
}
- // Step 3: get binlog info
- auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download",
request.remote_host,
- request.remote_port);
- constexpr int max_retry = 3;
+ bool is_async = (_ingest_binlog_workers != nullptr);
+ result.__set_is_async(is_async);
- auto get_binlog_info_url =
- fmt::format("{}?method={}&tablet_id={}&binlog_version={}",
binlog_api_url,
- "get_binlog_info", request.remote_tablet_id,
request.binlog_version);
- std::string binlog_info;
- auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient*
client) {
- RETURN_IF_ERROR(client->init(get_binlog_info_url));
- client->set_timeout_ms(kMaxTimeoutMs);
- return client->execute(&binlog_info);
- };
- status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
- if (!status.ok()) {
- LOG(WARNING) << "failed to get binlog info from " <<
get_binlog_info_url
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
+ auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+ IngestBinlogArg ingest_binlog_arg = {
+ .txn_id = txn_id,
+ .partition_id = partition_id,
+ .local_tablet_id = local_tablet_id,
+ .local_tablet = local_tablet,
- std::vector<std::string> binlog_info_parts = strings::Split(binlog_info,
":");
- // TODO(Drogon): check binlog info content is right
- DCHECK(binlog_info_parts.size() == 2);
- const std::string& remote_rowset_id = binlog_info_parts[0];
- int64_t num_segments = std::stoll(binlog_info_parts[1]);
+ .request = std::move(request),
+ .tstatus = is_async ? nullptr : tstatus,
+ };
- // Step 4: get rowset meta
- auto get_rowset_meta_url = fmt::format(
- "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}",
binlog_api_url,
- "get_rowset_meta", request.remote_tablet_id, remote_rowset_id,
request.binlog_version);
- std::string rowset_meta_str;
- auto get_rowset_meta_cb = [&get_rowset_meta_url,
&rowset_meta_str](HttpClient* client) {
- RETURN_IF_ERROR(client->init(get_rowset_meta_url));
- client->set_timeout_ms(kMaxTimeoutMs);
- return client->execute(&rowset_meta_str);
+ _ingest_binlog(&ingest_binlog_arg);
};
- status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
- if (!status.ok()) {
- LOG(WARNING) << "failed to get rowset meta from " <<
get_rowset_meta_url
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
- RowsetMetaPB rowset_meta_pb;
- if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
- LOG(WARNING) << "failed to parse rowset meta from " <<
get_rowset_meta_url;
- status = Status::InternalError("failed to parse rowset meta");
- status.to_thrift(&tstatus);
- return;
- }
- // rewrite rowset meta
- rowset_meta_pb.set_tablet_id(local_tablet_id);
- rowset_meta_pb.set_partition_id(partition_id);
-
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
- rowset_meta_pb.set_txn_id(txn_id);
- rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
- auto rowset_meta = std::make_shared<RowsetMeta>();
- if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
- LOG(WARNING) << "failed to init rowset meta from " <<
get_rowset_meta_url;
- status = Status::InternalError("failed to init rowset meta");
- status.to_thrift(&tstatus);
- return;
- }
- RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
- rowset_meta->set_rowset_id(new_rowset_id);
- rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
- // Step 5: get all segment files
- // Step 5.1: get all segment files size
- std::vector<std::string> segment_file_urls;
- segment_file_urls.reserve(num_segments);
- std::vector<uint64_t> segment_file_sizes;
- segment_file_sizes.reserve(num_segments);
- for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
- auto get_segment_file_size_url = fmt::format(
- "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}",
binlog_api_url,
- "get_segment_file", request.remote_tablet_id,
remote_rowset_id, segment_index);
- uint64_t segment_file_size;
- auto get_segment_file_size_cb = [&get_segment_file_size_url,
- &segment_file_size](HttpClient*
client) {
- RETURN_IF_ERROR(client->init(get_segment_file_size_url));
- client->set_timeout_ms(kMaxTimeoutMs);
- RETURN_IF_ERROR(client->head());
- return client->get_content_length(&segment_file_size);
- };
-
- status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_size_cb);
+ if (is_async) {
+ status =
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
if (!status.ok()) {
- LOG(WARNING) << "failed to get segment file size from " <<
get_segment_file_size_url
- << ", status=" << status.to_string();
status.to_thrift(&tstatus);
return;
}
-
- segment_file_sizes.push_back(segment_file_size);
- segment_file_urls.push_back(std::move(get_segment_file_size_url));
- }
-
- // Step 5.2: check data capacity
- uint64_t total_size = std::accumulate(segment_file_sizes.begin(),
segment_file_sizes.end(), 0);
- if (!local_tablet->can_add_binlog(total_size)) {
- LOG(WARNING) << "failed to add binlog, no enough space, total_size="
<< total_size
- << ", tablet=" << local_tablet->full_name();
- status = Status::InternalError("no enough space");
- status.to_thrift(&tstatus);
- return;
+ } else {
+ ingest_binlog_func();
}
+}
- // Step 5.3: get all segment files
- for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
- auto segment_file_size = segment_file_sizes[segment_index];
- auto get_segment_file_url = segment_file_urls[segment_index];
-
- uint64_t estimate_timeout =
- segment_file_size / config::download_low_speed_limit_kbps /
1024;
- if (estimate_timeout < config::download_low_speed_time) {
- estimate_timeout = config::download_low_speed_time;
- }
-
- std::string local_segment_path =
- fmt::format("{}/{}_{}.dat", local_tablet->tablet_path(),
- rowset_meta->rowset_id().to_string(),
segment_index);
- LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
- local_segment_path);
- auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
- estimate_timeout](HttpClient* client) {
- RETURN_IF_ERROR(client->init(get_segment_file_url));
- client->set_timeout_ms(estimate_timeout * 1000);
- RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+ const TQueryIngestBinlogRequest&
request) {
+ LOG(INFO) << "query ingest binlog. request: " <<
apache::thrift::ThriftDebugString(request);
- std::error_code ec;
- // Check file length
- uint64_t local_file_size =
std::filesystem::file_size(local_segment_path, ec);
- if (ec) {
- LOG(WARNING) << "download file error" << ec.message();
- return Status::IOError("can't retrive file_size of {}, due to
{}",
- local_segment_path, ec.message());
- }
- if (local_file_size != segment_file_size) {
- LOG(WARNING) << "download file length error"
- << ", get_segment_file_url=" <<
get_segment_file_url
- << ", file_size=" << segment_file_size
- << ", local_file_size=" << local_file_size;
- return Status::InternalError("downloaded file size is not
equal");
- }
- chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
- return Status::OK();
- };
+ auto set_result = [&](TIngestBinlogStatus::type status, std::string
error_msg) {
+ result.__set_status(status);
+ result.__set_err_msg(std::move(error_msg));
+ };
- auto status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_cb);
- if (!status.ok()) {
- LOG(WARNING) << "failed to get segment file from " <<
get_segment_file_url
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
+ /// Check args: txn_id, partition_id, tablet_id, load_id
+ if (!request.__isset.txn_id) {
+ auto error_msg = "txn_id is empty";
+ LOG(WARNING) << error_msg;
+ set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+ return;
}
-
- // Step 6: create rowset && calculate delete bitmap && commit
- // Step 6.1: create rowset
- RowsetSharedPtr rowset;
- status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
- local_tablet->tablet_path(),
rowset_meta, &rowset);
-
- if (!status) {
- LOG(WARNING) << "failed to create rowset from rowset meta for remote
tablet"
- << ". rowset_id: " << rowset_meta_pb.rowset_id()
- << ", rowset_type: " << rowset_meta_pb.rowset_type()
- << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() <<
", txn_id=" << txn_id
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
+ if (!request.__isset.partition_id) {
+ auto error_msg = "partition_id is empty";
+ LOG(WARNING) << error_msg;
+ set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
return;
}
-
- // Step 6.2 calculate delete bitmap before commit
- auto calc_delete_bitmap_token =
-
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
- DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(local_tablet_id);
- RowsetIdUnorderedSet pre_rowset_ids;
- if (local_tablet->enable_unique_key_merge_on_write()) {
- auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
- std::vector<segment_v2::SegmentSharedPtr> segments;
- status = beta_rowset->load_segments(&segments);
- if (!status) {
- LOG(WARNING) << "failed to load segments from rowset"
- << ". rowset_id: " << beta_rowset->rowset_id() << ",
txn_id=" << txn_id
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
- if (segments.size() > 1) {
- // calculate delete bitmap between segments
- status = local_tablet->calc_delete_bitmap_between_segments(rowset,
segments,
-
delete_bitmap);
- if (!status) {
- LOG(WARNING) << "failed to calculate delete bitmap"
- << ". tablet_id: " << local_tablet->tablet_id()
- << ". rowset_id: " << rowset->rowset_id() << ",
txn_id=" << txn_id
- << ", status=" << status.to_string();
- status.to_thrift(&tstatus);
- return;
- }
- }
-
- local_tablet->commit_phase_update_delete_bitmap(rowset,
pre_rowset_ids, delete_bitmap,
- segments, txn_id,
-
calc_delete_bitmap_token.get(), nullptr);
- calc_delete_bitmap_token->wait();
+ if (!request.__isset.tablet_id) {
+ auto error_msg = "tablet_id is empty";
+ LOG(WARNING) << error_msg;
+ set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+ return;
}
-
- // Step 6.3: commit txn
- Status commit_txn_status =
StorageEngine::instance()->txn_manager()->commit_txn(
- local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
- rowset_meta->txn_id(), rowset_meta->tablet_id(),
rowset_meta->tablet_schema_hash(),
- local_tablet->tablet_uid(), rowset_meta->load_id(), rowset, false);
- if (!commit_txn_status &&
!commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
- auto err_msg = fmt::format(
- "failed to commit txn for remote tablet. rowset_id: {},
remote_tablet_id={}, "
- "txn_id={}, status={}",
- rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
- rowset_meta->txn_id(), commit_txn_status.to_string());
- LOG(WARNING) << err_msg;
- set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
+ if (!request.__isset.load_id) {
+ auto error_msg = "load_id is empty";
+ LOG(WARNING) << error_msg;
+ set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
return;
}
- if (local_tablet->enable_unique_key_merge_on_write()) {
-
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
- partition_id, txn_id, local_tablet_id,
local_tablet->schema_hash(),
- local_tablet->tablet_uid(), true, delete_bitmap,
pre_rowset_ids, nullptr);
+ auto partition_id = request.partition_id;
+ auto txn_id = request.txn_id;
+ auto tablet_id = request.tablet_id;
+
+ // Step 1: get local tablet
+ auto local_tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ if (local_tablet == nullptr) {
+ auto error_msg = fmt::format("tablet {} not found", tablet_id);
+ LOG(WARNING) << error_msg;
+ set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg));
+ return;
}
- tstatus.__set_status_code(TStatusCode::OK);
+ // Step 2: get txn state
+ auto schema_hash = local_tablet->schema_hash();
+ auto tablet_uid = local_tablet->tablet_uid();
+ auto txn_state = StorageEngine::instance()->txn_manager()->get_txn_state(
+ partition_id, txn_id, tablet_id, schema_hash, tablet_uid);
+ switch (txn_state) {
+ case TxnState::NOT_FOUND:
+ result.__set_status(TIngestBinlogStatus::NOT_FOUND);
+ break;
+ case TxnState::PREPARED:
+ result.__set_status(TIngestBinlogStatus::DOING);
+ break;
+ case TxnState::COMMITTED:
+ result.__set_status(TIngestBinlogStatus::OK);
+ break;
+ case TxnState::ROLLEDBACK:
+ result.__set_status(TIngestBinlogStatus::FAILED);
+ break;
+ case TxnState::ABORTED:
+ result.__set_status(TIngestBinlogStatus::FAILED);
+ break;
+ case TxnState::DELETED:
+ result.__set_status(TIngestBinlogStatus::FAILED);
+ break;
+ }
}
} // namespace doris
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index 1cc952a0092..f6b52b275d4 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -58,6 +58,7 @@ class TTransmitDataParams;
class TUniqueId;
class TIngestBinlogRequest;
class TIngestBinlogResult;
+class ThreadPool;
// This class just forward rpc for actual handler
// make this class because we can bind multiple service on single point
@@ -131,10 +132,14 @@ public:
void ingest_binlog(TIngestBinlogResult& result, const
TIngestBinlogRequest& request) override;
+ void query_ingest_binlog(TQueryIngestBinlogResult& result,
+ const TQueryIngestBinlogRequest& request)
override;
+
private:
Status start_plan_fragment_execution(const TExecPlanFragmentParams&
exec_params);
ExecEnv* _exec_env;
std::unique_ptr<AgentServer> _agent_server;
+ std::unique_ptr<ThreadPool> _ingest_binlog_workers;
};
} // namespace doris
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 25d261e53f8..556d101b05c 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -17,6 +17,9 @@
#include "service/http_service.h"
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+
#include <algorithm>
#include <string>
#include <vector>
@@ -57,6 +60,30 @@
#include "util/doris_metrics.h"
namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base*
event_base) {
+ auto rate_limit = config::download_binlog_rate_limit_kbs;
+ if (rate_limit <= 0) {
+ return nullptr;
+ }
+
+ auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
+ if (rate_limit > max_value) {
+ LOG(WARNING) << "rate limit is too large, set to max value.";
+ rate_limit = max_value;
+ }
+ struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
+ rate_limit = rate_limit / 10 * 1024; // convert to KB/S
+
+ auto token_bucket = std::unique_ptr<ev_token_bucket_cfg,
decltype(&ev_token_bucket_cfg_free)>(
+ ev_token_bucket_cfg_new(rate_limit, rate_limit * 2, rate_limit,
rate_limit * 2,
+ &cfg_tick),
+ ev_token_bucket_cfg_free);
+ return std::shared_ptr<bufferevent_rate_limit_group>(
+ bufferevent_rate_limit_group_new(event_base, token_bucket.get()),
+ bufferevent_rate_limit_group_free);
+}
+} // namespace
HttpService::HttpService(ExecEnv* env, int port, int num_threads)
: _env(env),
@@ -68,6 +95,9 @@ HttpService::~HttpService() {}
Status HttpService::start() {
add_default_path_handlers(_web_page_handler.get());
+ auto event_base = _ev_http_server->get_event_bases()[0];
+ _rate_limit_group = get_rate_limit_group(event_base.get());
+
// register load
StreamLoadAction* streamload_action = _pool.add(new
StreamLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT,
"/api/{db}/{table}/_load",
@@ -85,18 +115,19 @@ Status HttpService::start() {
for (auto& path : _env->store_paths()) {
allow_paths.emplace_back(path.path);
}
- DownloadAction* download_action = _pool.add(new DownloadAction(_env,
allow_paths));
+ DownloadAction* download_action = _pool.add(new DownloadAction(_env,
nullptr, allow_paths));
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load",
download_action);
_ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load",
download_action);
- DownloadAction* tablet_download_action = _pool.add(new
DownloadAction(_env, allow_paths));
+ DownloadAction* tablet_download_action =
+ _pool.add(new DownloadAction(_env, _rate_limit_group,
allow_paths));
_ev_http_server->register_handler(HttpMethod::HEAD,
"/api/_tablet/_download",
tablet_download_action);
_ev_http_server->register_handler(HttpMethod::GET,
"/api/_tablet/_download",
tablet_download_action);
if (config::enable_single_replica_load) {
DownloadAction* single_replica_download_action = _pool.add(new
DownloadAction(
- _env, allow_paths,
config::single_replica_load_download_num_workers));
+ _env, nullptr, allow_paths,
config::single_replica_load_download_num_workers));
_ev_http_server->register_handler(HttpMethod::HEAD,
"/api/_single_replica/_download",
single_replica_download_action);
_ev_http_server->register_handler(HttpMethod::GET,
"/api/_single_replica/_download",
@@ -110,7 +141,8 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log",
error_log_download_action);
- DownloadBinlogAction* download_binlog_action = _pool.add(new
DownloadBinlogAction(_env));
+ DownloadBinlogAction* download_binlog_action =
+ _pool.add(new DownloadBinlogAction(_env, _rate_limit_group));
_ev_http_server->register_handler(HttpMethod::GET,
"/api/_binlog/_download",
download_binlog_action);
_ev_http_server->register_handler(HttpMethod::HEAD,
"/api/_binlog/_download",
diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h
index 8e08782f0de..251987a6f87 100644
--- a/be/src/service/http_service.h
+++ b/be/src/service/http_service.h
@@ -22,6 +22,8 @@
#include "common/object_pool.h"
#include "common/status.h"
+struct bufferevent_rate_limit_group;
+
namespace doris {
class ExecEnv;
@@ -46,6 +48,8 @@ private:
std::unique_ptr<EvHttpServer> _ev_http_server;
std::unique_ptr<WebPageHandler> _web_page_handler;
+
+ std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
};
} // namespace doris
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
index 3ab76732f61..075fc6d6598 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java
@@ -32,6 +32,8 @@ import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryIngestBinlogRequest;
+import org.apache.doris.thrift.TQueryIngestBinlogResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -222,6 +224,12 @@ public class GenericPoolTest {
public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest
ingestBinlogRequest) throws TException {
return null;
}
+
+ @Override
+ public TQueryIngestBinlogResult
queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
+ throws TException {
+ return null;
+ }
}
@Test
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
index bf2a7964b91..7d261790066 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java
@@ -50,6 +50,8 @@ import org.apache.doris.thrift.TIngestBinlogRequest;
import org.apache.doris.thrift.TIngestBinlogResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TQueryIngestBinlogRequest;
+import org.apache.doris.thrift.TQueryIngestBinlogResult;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@@ -363,6 +365,12 @@ public class MockedBackendFactory {
public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest
ingestBinlogRequest) throws TException {
return null;
}
+
+ @Override
+ public TQueryIngestBinlogResult
queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
+ throws TException {
+ return null;
+ }
}
// The default Brpc service.
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 3d77eab4cad..eb9ad6a64d8 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -136,6 +136,28 @@ struct TIngestBinlogRequest {
struct TIngestBinlogResult {
1: optional Status.TStatus status;
+ 2: optional bool is_async;
+}
+
+struct TQueryIngestBinlogRequest {
+ 1: optional i64 txn_id;
+ 2: optional i64 partition_id;
+ 3: optional i64 tablet_id;
+ 4: optional Types.TUniqueId load_id;
+}
+
+enum TIngestBinlogStatus {
+ ANALYSIS_ERROR,
+ UNKNOWN,
+ NOT_FOUND,
+ OK,
+ FAILED,
+ DOING
+}
+
+struct TQueryIngestBinlogResult {
+ 1: optional TIngestBinlogStatus status;
+ 2: optional string err_msg;
}
service BackendService {
@@ -193,4 +215,5 @@ service BackendService {
TCheckStorageFormatResult check_storage_format();
TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest
ingest_binlog_request);
+ TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest
query_ingest_binlog_request);
}
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
index a808db7accf..fff5c1c26cf 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Syncer.groovy
@@ -72,7 +72,6 @@ class Syncer {
}
private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) {
-
// step 1: check binlog availability
if (binlog == null) {
return false
@@ -735,6 +734,7 @@ class Syncer {
if (!binlogRecords.contains(srcPartition.key)) {
continue
}
+
Iterator srcTabletIter =
srcPartition.value.tabletMeta.iterator()
Iterator tarTabletIter =
tarPartition.value.tabletMeta.iterator()
@@ -771,6 +771,7 @@ class Syncer {
logger.info("request -> ${request}")
TIngestBinlogResult result =
tarClient.client.ingestBinlog(request)
if (!checkIngestBinlog(result)) {
+ logger.error("Ingest binlog error! result: ${result}")
return false
}
diff --git a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
index cef48715aeb..3004e344cc9 100644
--- a/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
+++ b/regression-test/suites/ccr_syncer_p0/test_ingest_binlog.groovy
@@ -22,6 +22,7 @@ suite("test_ingest_binlog") {
logger.info("fe enable_feature_binlog is false, skip case
test_ingest_binlog")
return
}
+
def tableName = "tbl_ingest_binlog"
def insert_num = 5
def test_num = 0
@@ -102,6 +103,7 @@ suite("test_ingest_binlog") {
logger.info("=== Test 2.2: Wrong binlog version case ===")
// -1 means use the number of syncer.context
// Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1)
+ // use fakeVersion = 1, 1 is doris be talet first version, so no binlog,
only http error
assertTrue(syncer.ingestBinlog(-1, 1) == false)
@@ -120,4 +122,4 @@ suite("test_ingest_binlog") {
// End Test 2
syncer.closeBackendClients()
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]