w41ter commented on code in PR #26323:
URL: https://github.com/apache/doris/pull/26323#discussion_r1381326802
##########
be/src/service/backend_service.cpp:
##########
@@ -79,6 +81,283 @@ class TTransportException;
namespace doris {
+namespace {
+constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+struct IngestBinlogArg {
+ int64_t txn_id;
+ int64_t partition_id;
+ int64_t local_tablet_id;
+ TabletSharedPtr local_tablet;
+ TIngestBinlogRequest request;
+ TStatus* tstatus;
+};
+
+void _ingest_binlog(IngestBinlogArg* arg) {
+ auto txn_id = arg->txn_id;
+ auto partition_id = arg->partition_id;
+ auto local_tablet_id = arg->local_tablet_id;
+ const auto& local_tablet = arg->local_tablet;
+ const auto& local_tablet_uid = local_tablet->tablet_uid();
+
+ auto& request = arg->request;
+
+ TStatus tstatus;
+ Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
+ LOG(INFO) << "ingest binlog. result: " <<
apache::thrift::ThriftDebugString(tstatus);
+ if (tstatus.status_code != TStatusCode::OK) {
+ // abort txn
+ StorageEngine::instance()->txn_manager()->abort_txn(partition_id,
txn_id,
+
local_tablet_id, local_tablet_uid);
+ }
+
+ if (ingest_binlog_tstatus) {
+ *ingest_binlog_tstatus = std::move(tstatus);
+ }
+ }};
+
+ auto set_tstatus = [&tstatus](TStatusCode::type code, std::string
error_msg) {
+ tstatus.__set_status_code(code);
+ tstatus.__isset.error_msgs = true;
+ tstatus.error_msgs.push_back(std::move(error_msg));
+ };
+
+ // Step 3: get binlog info
+ auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download",
request.remote_host,
+ request.remote_port);
+ constexpr int max_retry = 3;
+
+ auto get_binlog_info_url =
+ fmt::format("{}?method={}&tablet_id={}&binlog_version={}",
binlog_api_url,
+ "get_binlog_info", request.remote_tablet_id,
request.binlog_version);
+ std::string binlog_info;
+ auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient*
client) {
+ RETURN_IF_ERROR(client->init(get_binlog_info_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ return client->execute(&binlog_info);
+ };
+ auto status = HttpClient::execute_with_retry(max_retry, 1,
get_binlog_info_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get binlog info from " <<
get_binlog_info_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ std::vector<std::string> binlog_info_parts = strings::Split(binlog_info,
":");
+ // TODO(Drogon): check binlog info content is right
+ DCHECK(binlog_info_parts.size() == 2);
+ const std::string& remote_rowset_id = binlog_info_parts[0];
+ int64_t num_segments = std::stoll(binlog_info_parts[1]);
+
+ // Step 4: get rowset meta
+ auto get_rowset_meta_url = fmt::format(
+ "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}",
binlog_api_url,
+ "get_rowset_meta", request.remote_tablet_id, remote_rowset_id,
request.binlog_version);
+ std::string rowset_meta_str;
+ auto get_rowset_meta_cb = [&get_rowset_meta_url,
&rowset_meta_str](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(get_rowset_meta_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ return client->execute(&rowset_meta_str);
+ };
+ status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get rowset meta from " <<
get_rowset_meta_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+ RowsetMetaPB rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
+ LOG(WARNING) << "failed to parse rowset meta from " <<
get_rowset_meta_url;
+ status = Status::InternalError("failed to parse rowset meta");
+ status.to_thrift(&tstatus);
+ return;
+ }
+ // rewrite rowset meta
+ rowset_meta_pb.set_tablet_id(local_tablet_id);
+ rowset_meta_pb.set_partition_id(partition_id);
+
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
+ rowset_meta_pb.set_txn_id(txn_id);
+ rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
+ auto rowset_meta = std::make_shared<RowsetMeta>();
+ if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
+ LOG(WARNING) << "failed to init rowset meta from " <<
get_rowset_meta_url;
+ status = Status::InternalError("failed to init rowset meta");
+ status.to_thrift(&tstatus);
+ return;
+ }
+ RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+ rowset_meta->set_rowset_id(new_rowset_id);
+ rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
+
+ // Step 5: get all segment files
+ // Step 5.1: get all segment files size
+ std::vector<std::string> segment_file_urls;
+ segment_file_urls.reserve(num_segments);
+ std::vector<uint64_t> segment_file_sizes;
+ segment_file_sizes.reserve(num_segments);
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto get_segment_file_size_url = fmt::format(
+ "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}",
binlog_api_url,
+ "get_segment_file", request.remote_tablet_id,
remote_rowset_id, segment_index);
+ uint64_t segment_file_size;
+ auto get_segment_file_size_cb = [&get_segment_file_size_url,
+ &segment_file_size](HttpClient*
client) {
+ RETURN_IF_ERROR(client->init(get_segment_file_size_url));
+ client->set_timeout_ms(kMaxTimeoutMs);
+ RETURN_IF_ERROR(client->head());
+ return client->get_content_length(&segment_file_size);
+ };
+
+ status = HttpClient::execute_with_retry(max_retry, 1,
get_segment_file_size_cb);
+ if (!status.ok()) {
+ LOG(WARNING) << "failed to get segment file size from " <<
get_segment_file_size_url
+ << ", status=" << status.to_string();
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ segment_file_sizes.push_back(segment_file_size);
+ segment_file_urls.push_back(std::move(get_segment_file_size_url));
+ }
+
+ // Step 5.2: check data capacity
+ uint64_t total_size = std::accumulate(segment_file_sizes.begin(),
segment_file_sizes.end(), 0);
+ if (!local_tablet->can_add_binlog(total_size)) {
+ LOG(WARNING) << "failed to add binlog, no enough space, total_size="
<< total_size
+ << ", tablet=" << local_tablet->tablet_id();
+ status = Status::InternalError("no enough space");
+ status.to_thrift(&tstatus);
+ return;
+ }
+
+ // Step 5.3: get all segment files
+ for (int64_t segment_index = 0; segment_index < num_segments;
++segment_index) {
+ auto segment_file_size = segment_file_sizes[segment_index];
+ auto get_segment_file_url = segment_file_urls[segment_index];
+
+ uint64_t estimate_timeout =
+ segment_file_size / config::download_low_speed_limit_kbps /
1024;
+ if (estimate_timeout < config::download_low_speed_time) {
+ estimate_timeout = config::download_low_speed_time;
+ }
+
+ auto local_segment_path = BetaRowset::segment_file_path(
+ local_tablet->tablet_path(), rowset_meta->rowset_id(),
segment_index);
+ LOG(INFO) << fmt::format("download segment file from {} to {}",
get_segment_file_url,
+ local_segment_path);
+ auto get_segment_file_cb = [&get_segment_file_url,
&local_segment_path, segment_file_size,
+ estimate_timeout](HttpClient* client) {
+ RETURN_IF_ERROR(client->init(get_segment_file_url));
+ client->set_timeout_ms(estimate_timeout * 1000);
+ RETURN_IF_ERROR(client->download(local_segment_path));
+
+ std::error_code ec;
+ // Check file length
+ uint64_t local_file_size =
std::filesystem::file_size(local_segment_path, ec);
+ if (ec) {
+ LOG(WARNING) << "download file error" << ec.message();
+ return Status::IOError("can't retrive file_size of {}, due to
{}",
+ local_segment_path, ec.message());
+ }
+ if (local_file_size != segment_file_size) {
+ LOG(WARNING) << "download file length error"
+ << ", get_segment_file_url=" <<
get_segment_file_url
+ << ", file_size=" << segment_file_size
+ << ", local_file_size=" << local_file_size;
+ return Status::InternalError("downloaded file size is not
equal");
+ }
+ chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
Review Comment:
Plz check the return value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]