github-actions[bot] commented on code in PR #26421:
URL: https://github.com/apache/doris/pull/26421#discussion_r1385914349


##########
be/src/exec/tablet_info.h:
##########
@@ -162,9 +165,78 @@
     int64_t version() const { return _t_param.version; }
 
     // return true if we found this block_row in partition
-    bool find_partition(BlockRow* block_row, const VOlapTablePartition** 
partition) const;
+    //TODO: use virtual function to refactor it
+    ALWAYS_INLINE bool find_partition(vectorized::Block* block, int row,
+                                      VOlapTablePartition*& partition) const {
+        auto it = _is_in_partition ? _partitions_map->find(std::tuple {block, 
row, true})
+                                   : _partitions_map->upper_bound(std::tuple 
{block, row, true});
+        // for list partition it might result in default partition
+        if (_is_in_partition) {
+            partition = (it != _partitions_map->end()) ? it->second : 
_default_partition;
+            it = _partitions_map->end();
+        }
+        if (it != _partitions_map->end() &&
+            _part_contains(it->second, std::tuple {block, row, true})) {
+            partition = it->second;
+        }
+        return (partition != nullptr);
+    }
+
+    ALWAYS_INLINE void find_tablets(

Review Comment:
   warning: method 'find_tablets' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static ALWAYS_INLINE void find_tablets(
   ```
   
   be/src/exec/tablet_info.h:189:
   ```diff
   -             std::map<int64_t, int64_t>* partition_tablets_buffer = 
nullptr) const {
   +             std::map<int64_t, int64_t>* partition_tablets_buffer = 
nullptr) {
   ```
   



##########
be/src/exec/tablet_info.h:
##########
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <butil/fast_rand.h>

Review Comment:
   warning: 'butil/fast_rand.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <butil/fast_rand.h>
            ^
   ```
   



##########
be/src/http/http_channel.cpp:
##########
@@ -18,6 +18,7 @@
 #include "http/http_channel.h"
 
 #include <event2/buffer.h>

Review Comment:
   warning: 'event2/buffer.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <event2/buffer.h>
            ^
   ```
   



##########
be/src/olap/txn_manager.cpp:
##########
@@ -162,6 +163,29 @@
                        tablet->tablet_id(), tablet->tablet_uid(), version, 
stats);
 }
 
+void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
+                           TTabletId tablet_id, TabletUid tablet_uid) {
+    pair<int64_t, int64_t> key(partition_id, transaction_id);
+    TabletInfo tablet_info(tablet_id, tablet_uid);
+
+    std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
+
+    auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
+    auto it = txn_tablet_map.find(key);
+    if (it == txn_tablet_map.end()) {
+        return;
+    }
+
+    auto& tablet_txn_info_map = it->second;
+    if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
+        tablet_txn_info_iter == tablet_txn_info_map.end()) {
+        return;
+    } else {
+        auto& txn_info = tablet_txn_info_iter->second;
+        txn_info.abort();
+    }

Review Comment:
   warning: do not use 'else' after 'return' [readability-else-after-return]
   
   ```suggestion
       }         auto& txn_info = tablet_txn_info_iter->second;
           txn_info.abort();
      
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -79,6 +81,283 @@
 
 namespace doris {
 
+namespace {
+constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+struct IngestBinlogArg {
+    int64_t txn_id;
+    int64_t partition_id;
+    int64_t local_tablet_id;
+    TabletSharedPtr local_tablet;
+    TIngestBinlogRequest request;
+    TStatus* tstatus;
+};
+
+void _ingest_binlog(IngestBinlogArg* arg) {
+    auto txn_id = arg->txn_id;
+    auto partition_id = arg->partition_id;
+    auto local_tablet_id = arg->local_tablet_id;
+    const auto& local_tablet = arg->local_tablet;
+    const auto& local_tablet_uid = local_tablet->tablet_uid();
+
+    auto& request = arg->request;
+
+    TStatus tstatus;
+    Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
+        LOG(INFO) << "ingest binlog. result: " << 
apache::thrift::ThriftDebugString(tstatus);
+        if (tstatus.status_code != TStatusCode::OK) {
+            // abort txn
+            StorageEngine::instance()->txn_manager()->abort_txn(partition_id, 
txn_id,
+                                                                
local_tablet_id, local_tablet_uid);
+        }
+
+        if (ingest_binlog_tstatus) {
+            *ingest_binlog_tstatus = std::move(tstatus);
+        }
+    }};
+
+    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string 
error_msg) {
+        tstatus.__set_status_code(code);
+        tstatus.__isset.error_msgs = true;
+        tstatus.error_msgs.push_back(std::move(error_msg));
+    };
+
+    // Step 3: get binlog info
+    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
+                                      request.remote_port);
+    constexpr int max_retry = 3;
+
+    auto get_binlog_info_url =
+            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
+                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
+    std::string binlog_info;
+    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
+        RETURN_IF_ERROR(client->init(get_binlog_info_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&binlog_info);
+    };
+    auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_binlog_info_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
+    // TODO(Drogon): check binlog info content is right
+    DCHECK(binlog_info_parts.size() == 2);
+    const std::string& remote_rowset_id = binlog_info_parts[0];
+    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+
+    // Step 4: get rowset meta
+    auto get_rowset_meta_url = fmt::format(
+            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
+            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
+    std::string rowset_meta_str;
+    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&rowset_meta_str);
+    };
+    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
+        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to parse rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    // rewrite rowset meta
+    rowset_meta_pb.set_tablet_id(local_tablet_id);
+    rowset_meta_pb.set_partition_id(partition_id);
+    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
+    rowset_meta_pb.set_txn_id(txn_id);
+    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
+    auto rowset_meta = std::make_shared<RowsetMeta>();
+    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
+        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to init rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+    rowset_meta->set_rowset_id(new_rowset_id);
+    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
+
+    // Step 5: get all segment files
+    // Step 5.1: get all segment files size
+    std::vector<std::string> segment_file_urls;
+    segment_file_urls.reserve(num_segments);
+    std::vector<uint64_t> segment_file_sizes;
+    segment_file_sizes.reserve(num_segments);
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto get_segment_file_size_url = fmt::format(
+                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
+                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
+        uint64_t segment_file_size;
+        auto get_segment_file_size_cb = [&get_segment_file_size_url,
+                                         &segment_file_size](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
+            client->set_timeout_ms(kMaxTimeoutMs);
+            RETURN_IF_ERROR(client->head());
+            return client->get_content_length(&segment_file_size);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+
+        segment_file_sizes.push_back(segment_file_size);
+        segment_file_urls.push_back(std::move(get_segment_file_size_url));
+    }
+
+    // Step 5.2: check data capacity
+    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
+    if (!local_tablet->can_add_binlog(total_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
+                     << ", tablet=" << local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 5.3: get all segment files
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto segment_file_size = segment_file_sizes[segment_index];
+        auto get_segment_file_url = segment_file_urls[segment_index];
+
+        uint64_t estimate_timeout =
+                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        auto local_segment_path = BetaRowset::segment_file_path(
+                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
+        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
+                                 local_segment_path);
+        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
+                                    estimate_timeout](HttpClient* client) {
+            RETURN_IF_ERROR(client->init(get_segment_file_url));
+            client->set_timeout_ms(estimate_timeout * 1000);

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
               client->set_timeout_ms(estimate_timeout * 1000);
                                                         ^
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -452,6 +743,12 @@
         set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
         return;
     }
+    if (!request.__isset.local_tablet_id) {
+        auto error_msg = "local_tablet_id is empty";

Review Comment:
   warning: 'auto error_msg' can be declared as 'const auto *error_msg' 
[readability-qualified-auto]
   
   ```suggestion
           const auto *error_msg = "local_tablet_id is empty";
   ```
   



##########
be/src/service/http_service.cpp:
##########
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
+    if (rate_limit > max_value) {
+        LOG(WARNING) << "rate limit is too large, set to max value.";
+        rate_limit = max_value;
+    }
+    struct timeval cfg_tick = {0, 100 * 1000}; // 100ms

Review Comment:
   warning: 1000 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
                                           ^
   ```
   



##########
be/src/service/http_service.cpp:
##########
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
+    if (rate_limit > max_value) {
+        LOG(WARNING) << "rate limit is too large, set to max value.";
+        rate_limit = max_value;
+    }
+    struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
+    rate_limit = rate_limit / 10 * 1024;       // convert to KB/S

Review Comment:
   warning: 10 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       rate_limit = rate_limit / 10 * 1024;       // convert to KB/S
                                 ^
   ```
   



##########
be/src/http/action/download_action.cpp:
##########
@@ -33,13 +33,20 @@
 #include "runtime/exec_env.h"
 
 namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const 
std::vector<std::string>& allow_dirs,
-                               int32_t num_workers)
-        : _exec_env(exec_env), _download_type(NORMAL), 
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";

Review Comment:
   warning: 'FILE_PARAMETER' is a static definition in anonymous namespace; 
static is redundant here [readability-static-definition-in-anonymous-namespace]
   
   ```suggestion
   const std::string FILE_PARAMETER = "file";
   ```
   



##########
be/src/http/action/download_action.cpp:
##########
@@ -33,13 +33,20 @@
 #include "runtime/exec_env.h"
 
 namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const 
std::vector<std::string>& allow_dirs,
-                               int32_t num_workers)
-        : _exec_env(exec_env), _download_type(NORMAL), 
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";

Review Comment:
   warning: 'TOKEN_PARAMETER' is a static definition in anonymous namespace; 
static is redundant here [readability-static-definition-in-anonymous-namespace]
   
   ```suggestion
   const std::string TOKEN_PARAMETER = "token";
   ```
   



##########
be/src/olap/txn_manager.cpp:
##########
@@ -162,6 +163,29 @@ Status TxnManager::publish_txn(TPartitionId partition_id, 
const TabletSharedPtr&
                        tablet->tablet_id(), tablet->tablet_uid(), version, 
stats);
 }
 
+void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId 
transaction_id,

Review Comment:
   warning: method 'abort_txn' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
   ```
   



##########
be/src/http/action/download_action.cpp:
##########
@@ -33,13 +33,20 @@
 #include "runtime/exec_env.h"
 
 namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const 
std::vector<std::string>& allow_dirs,
-                               int32_t num_workers)
-        : _exec_env(exec_env), _download_type(NORMAL), 
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";
+static const std::string CHANNEL_PARAMETER = "channel";

Review Comment:
   warning: 'CHANNEL_PARAMETER' is a static definition in anonymous namespace; 
static is redundant here [readability-static-definition-in-anonymous-namespace]
   
   ```suggestion
   const std::string CHANNEL_PARAMETER = "channel";
   ```
   



##########
be/src/runtime/load_stream_writer.cpp:
##########
@@ -121,7 +121,15 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
     return Status::OK();
 }
 
-Status LoadStreamWriter::add_segment(uint32_t segid, SegmentStatistics& stat) {
+Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& 
stat) {

Review Comment:
   warning: method 'add_segment' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/load_stream_writer.h:73:
   ```diff
   -     Status add_segment(uint32_t segid, const SegmentStatistics& stat);
   +     static Status add_segment(uint32_t segid, const SegmentStatistics& 
stat);
   ```
   



##########
be/src/http/action/download_action.cpp:
##########
@@ -33,13 +33,20 @@
 #include "runtime/exec_env.h"
 
 namespace doris {
-
-const std::string FILE_PARAMETER = "file";
-const std::string TOKEN_PARAMETER = "token";
-
-DownloadAction::DownloadAction(ExecEnv* exec_env, const 
std::vector<std::string>& allow_dirs,
-                               int32_t num_workers)
-        : _exec_env(exec_env), _download_type(NORMAL), 
_num_workers(num_workers) {
+namespace {
+static const std::string FILE_PARAMETER = "file";
+static const std::string TOKEN_PARAMETER = "token";
+static const std::string CHANNEL_PARAMETER = "channel";
+static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";

Review Comment:
   warning: 'CHANNEL_INGEST_BINLOG_TYPE' is a static definition in anonymous 
namespace; static is redundant here 
[readability-static-definition-in-anonymous-namespace]
   
   ```suggestion
   const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
   ```
   



##########
be/src/runtime/result_buffer_mgr.cpp:
##########
@@ -109,21 +109,21 @@
     return std::shared_ptr<BufferControlBlock>();
 }
 
-void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
-                                              const RowDescriptor& row_desc) {
-    std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
-    _row_descriptor_map.insert(std::make_pair(query_id, row_desc));
+void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
+                                            const 
std::shared_ptr<arrow::Schema>& arrow_schema) {
+    std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
+    _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
 }
 
-RowDescriptor ResultBufferMgr::find_row_descriptor(const TUniqueId& query_id) {
-    std::shared_lock<std::shared_mutex> rlock(_row_descriptor_map_lock);
-    RowDescriptorMap::iterator iter = _row_descriptor_map.find(query_id);
+std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const 
TUniqueId& query_id) {

Review Comment:
   warning: method 'find_arrow_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static std::shared_ptr<arrow::Schema> 
ResultBufferMgr::find_arrow_schema(const TUniqueId& query_id) {
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -79,6 +81,283 @@ class TTransportException;
 
 namespace doris {
 
+namespace {
+constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
+struct IngestBinlogArg {
+    int64_t txn_id;
+    int64_t partition_id;
+    int64_t local_tablet_id;
+    TabletSharedPtr local_tablet;
+    TIngestBinlogRequest request;
+    TStatus* tstatus;
+};
+
+void _ingest_binlog(IngestBinlogArg* arg) {
+    auto txn_id = arg->txn_id;
+    auto partition_id = arg->partition_id;
+    auto local_tablet_id = arg->local_tablet_id;
+    const auto& local_tablet = arg->local_tablet;
+    const auto& local_tablet_uid = local_tablet->tablet_uid();
+
+    auto& request = arg->request;
+
+    TStatus tstatus;
+    Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
+        LOG(INFO) << "ingest binlog. result: " << 
apache::thrift::ThriftDebugString(tstatus);
+        if (tstatus.status_code != TStatusCode::OK) {
+            // abort txn
+            StorageEngine::instance()->txn_manager()->abort_txn(partition_id, 
txn_id,
+                                                                
local_tablet_id, local_tablet_uid);
+        }
+
+        if (ingest_binlog_tstatus) {
+            *ingest_binlog_tstatus = std::move(tstatus);
+        }
+    }};
+
+    auto set_tstatus = [&tstatus](TStatusCode::type code, std::string 
error_msg) {
+        tstatus.__set_status_code(code);
+        tstatus.__isset.error_msgs = true;
+        tstatus.error_msgs.push_back(std::move(error_msg));
+    };
+
+    // Step 3: get binlog info
+    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
+                                      request.remote_port);
+    constexpr int max_retry = 3;
+
+    auto get_binlog_info_url =
+            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
+                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
+    std::string binlog_info;
+    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
+        RETURN_IF_ERROR(client->init(get_binlog_info_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&binlog_info);
+    };
+    auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_binlog_info_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
+    // TODO(Drogon): check binlog info content is right
+    DCHECK(binlog_info_parts.size() == 2);
+    const std::string& remote_rowset_id = binlog_info_parts[0];
+    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+
+    // Step 4: get rowset meta
+    auto get_rowset_meta_url = fmt::format(
+            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
+            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
+    std::string rowset_meta_str;
+    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
+        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
+        client->set_timeout_ms(kMaxTimeoutMs);
+        return client->execute(&rowset_meta_str);
+    };
+    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
+    if (!status.ok()) {
+        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
+                     << ", status=" << status.to_string();
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
+        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to parse rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    // rewrite rowset meta
+    rowset_meta_pb.set_tablet_id(local_tablet_id);
+    rowset_meta_pb.set_partition_id(partition_id);
+    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
+    rowset_meta_pb.set_txn_id(txn_id);
+    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
+    auto rowset_meta = std::make_shared<RowsetMeta>();
+    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
+        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
+        status = Status::InternalError("failed to init rowset meta");
+        status.to_thrift(&tstatus);
+        return;
+    }
+    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
+    rowset_meta->set_rowset_id(new_rowset_id);
+    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
+
+    // Step 5: get all segment files
+    // Step 5.1: get all segment files size
+    std::vector<std::string> segment_file_urls;
+    segment_file_urls.reserve(num_segments);
+    std::vector<uint64_t> segment_file_sizes;
+    segment_file_sizes.reserve(num_segments);
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto get_segment_file_size_url = fmt::format(
+                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
+                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
+        uint64_t segment_file_size;
+        auto get_segment_file_size_cb = [&get_segment_file_size_url,
+                                         &segment_file_size](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
+            client->set_timeout_ms(kMaxTimeoutMs);
+            RETURN_IF_ERROR(client->head());
+            return client->get_content_length(&segment_file_size);
+        };
+
+        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+        if (!status.ok()) {
+            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
+                         << ", status=" << status.to_string();
+            status.to_thrift(&tstatus);
+            return;
+        }
+
+        segment_file_sizes.push_back(segment_file_size);
+        segment_file_urls.push_back(std::move(get_segment_file_size_url));
+    }
+
+    // Step 5.2: check data capacity
+    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
+    if (!local_tablet->can_add_binlog(total_size)) {
+        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
+                     << ", tablet=" << local_tablet->tablet_id();
+        status = Status::InternalError("no enough space");
+        status.to_thrift(&tstatus);
+        return;
+    }
+
+    // Step 5.3: get all segment files
+    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
+        auto segment_file_size = segment_file_sizes[segment_index];
+        auto get_segment_file_url = segment_file_urls[segment_index];
+
+        uint64_t estimate_timeout =
+                segment_file_size / config::download_low_speed_limit_kbps / 
1024;

Review Comment:
   warning: 1024 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
                   segment_file_size / config::download_low_speed_limit_kbps / 
1024;
                                                                               ^
   ```
   



##########
be/src/runtime/result_buffer_mgr.cpp:
##########
@@ -109,21 +109,21 @@ std::shared_ptr<BufferControlBlock> 
ResultBufferMgr::find_control_block(const TU
     return std::shared_ptr<BufferControlBlock>();
 }
 
-void ResultBufferMgr::register_row_descriptor(const TUniqueId& query_id,
-                                              const RowDescriptor& row_desc) {
-    std::unique_lock<std::shared_mutex> wlock(_row_descriptor_map_lock);
-    _row_descriptor_map.insert(std::make_pair(query_id, row_desc));
+void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,

Review Comment:
   warning: method 'register_arrow_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
   ```
   



##########
be/src/service/http_service.cpp:
##########
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;

Review Comment:
   warning: 1024 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
                                                              ^
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -486,239 +783,105 @@
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                         const TQueryIngestBinlogRequest& 
request) {
+    LOG(INFO) << "query ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-            std::error_code ec;
-            // Check file length
-            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}",
-                                       local_segment_path, ec.message());
-            }
-            if (local_file_size != segment_file_size) {
-                LOG(WARNING) << "download file length error"
-                             << ", get_segment_file_url=" << 
get_segment_file_url
-                             << ", file_size=" << segment_file_size
-                             << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
-            }
-            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
-            return Status::OK();
-        };
+    auto set_result = [&](TIngestBinlogStatus::type status, std::string 
error_msg) {
+        result.__set_status(status);
+        result.__set_err_msg(std::move(error_msg));
+    };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
+    /// Check args: txn_id, partition_id, tablet_id, load_id
+    if (!request.__isset.txn_id) {
+        auto error_msg = "txn_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
-    RowsetSharedPtr rowset;
-    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
-                                          local_tablet->tablet_path(), 
rowset_meta, &rowset);
-
-    if (!status) {
-        LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
-                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
-                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
-                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << 
", txn_id=" << txn_id
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
+    if (!request.__isset.partition_id) {
+        auto error_msg = "partition_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
         return;
     }
-
-    // Step 6.2 calculate delete bitmap before commit
-    auto calc_delete_bitmap_token =
-            
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
-    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
-    RowsetIdUnorderedSet pre_rowset_ids;
-    if (local_tablet->enable_unique_key_merge_on_write()) {
-        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-        std::vector<segment_v2::SegmentSharedPtr> segments;
-        status = beta_rowset->load_segments(&segments);
-        if (!status) {
-            LOG(WARNING) << "failed to load segments from rowset"
-                         << ". rowset_id: " << beta_rowset->rowset_id() << ", 
txn_id=" << txn_id
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
-        if (segments.size() > 1) {
-            // calculate delete bitmap between segments
-            status = local_tablet->calc_delete_bitmap_between_segments(rowset, 
segments,
-                                                                       
delete_bitmap);
-            if (!status) {
-                LOG(WARNING) << "failed to calculate delete bitmap"
-                             << ". tablet_id: " << local_tablet->tablet_id()
-                             << ". rowset_id: " << rowset->rowset_id() << ", 
txn_id=" << txn_id
-                             << ", status=" << status.to_string();
-                status.to_thrift(&tstatus);
-                return;
-            }
-        }
-
-        static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
-                rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
-                calc_delete_bitmap_token.get(), nullptr));
-        static_cast<void>(calc_delete_bitmap_token->wait());
+    if (!request.__isset.tablet_id) {
+        auto error_msg = "tablet_id is empty";

Review Comment:
   warning: 'auto error_msg' can be declared as 'const auto *error_msg' 
[readability-qualified-auto]
   
   ```suggestion
           const auto *error_msg = "tablet_id is empty";
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -486,239 +783,105 @@
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,

Review Comment:
   warning: method 'query_ingest_binlog' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& 
result,
   ```
   



##########
be/src/service/http_service.cpp:
##########
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;

Review Comment:
   warning: 10 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
                                                                     ^
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -486,239 +783,105 @@
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                         const TQueryIngestBinlogRequest& 
request) {
+    LOG(INFO) << "query ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-            std::error_code ec;
-            // Check file length
-            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}",
-                                       local_segment_path, ec.message());
-            }
-            if (local_file_size != segment_file_size) {
-                LOG(WARNING) << "download file length error"
-                             << ", get_segment_file_url=" << 
get_segment_file_url
-                             << ", file_size=" << segment_file_size
-                             << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
-            }
-            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
-            return Status::OK();
-        };
+    auto set_result = [&](TIngestBinlogStatus::type status, std::string 
error_msg) {
+        result.__set_status(status);
+        result.__set_err_msg(std::move(error_msg));
+    };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
+    /// Check args: txn_id, partition_id, tablet_id, load_id
+    if (!request.__isset.txn_id) {
+        auto error_msg = "txn_id is empty";

Review Comment:
   warning: 'auto error_msg' can be declared as 'const auto *error_msg' 
[readability-qualified-auto]
   
   ```suggestion
           const auto *error_msg = "txn_id is empty";
   ```
   



##########
be/src/service/http_service.cpp:
##########
@@ -59,6 +62,30 @@
 #include "util/doris_metrics.h"
 
 namespace doris {
+namespace {
+std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* 
event_base) {
+    auto rate_limit = config::download_binlog_rate_limit_kbs;
+    if (rate_limit <= 0) {
+        return nullptr;
+    }
+
+    auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
+    if (rate_limit > max_value) {
+        LOG(WARNING) << "rate limit is too large, set to max value.";
+        rate_limit = max_value;
+    }
+    struct timeval cfg_tick = {0, 100 * 1000}; // 100ms

Review Comment:
   warning: 100 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
                                     ^
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -486,239 +783,105 @@
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                         const TQueryIngestBinlogRequest& 
request) {
+    LOG(INFO) << "query ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-            std::error_code ec;
-            // Check file length
-            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}",
-                                       local_segment_path, ec.message());
-            }
-            if (local_file_size != segment_file_size) {
-                LOG(WARNING) << "download file length error"
-                             << ", get_segment_file_url=" << 
get_segment_file_url
-                             << ", file_size=" << segment_file_size
-                             << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
-            }
-            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
-            return Status::OK();
-        };
+    auto set_result = [&](TIngestBinlogStatus::type status, std::string 
error_msg) {
+        result.__set_status(status);
+        result.__set_err_msg(std::move(error_msg));
+    };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
+    /// Check args: txn_id, partition_id, tablet_id, load_id
+    if (!request.__isset.txn_id) {
+        auto error_msg = "txn_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
-    RowsetSharedPtr rowset;
-    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
-                                          local_tablet->tablet_path(), 
rowset_meta, &rowset);
-
-    if (!status) {
-        LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
-                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
-                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
-                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << 
", txn_id=" << txn_id
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
+    if (!request.__isset.partition_id) {
+        auto error_msg = "partition_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
         return;
     }
-
-    // Step 6.2 calculate delete bitmap before commit
-    auto calc_delete_bitmap_token =
-            
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
-    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(local_tablet_id);
-    RowsetIdUnorderedSet pre_rowset_ids;
-    if (local_tablet->enable_unique_key_merge_on_write()) {
-        auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-        std::vector<segment_v2::SegmentSharedPtr> segments;
-        status = beta_rowset->load_segments(&segments);
-        if (!status) {
-            LOG(WARNING) << "failed to load segments from rowset"
-                         << ". rowset_id: " << beta_rowset->rowset_id() << ", 
txn_id=" << txn_id
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
-        if (segments.size() > 1) {
-            // calculate delete bitmap between segments
-            status = local_tablet->calc_delete_bitmap_between_segments(rowset, 
segments,
-                                                                       
delete_bitmap);
-            if (!status) {
-                LOG(WARNING) << "failed to calculate delete bitmap"
-                             << ". tablet_id: " << local_tablet->tablet_id()
-                             << ". rowset_id: " << rowset->rowset_id() << ", 
txn_id=" << txn_id
-                             << ", status=" << status.to_string();
-                status.to_thrift(&tstatus);
-                return;
-            }
-        }
-
-        static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
-                rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
-                calc_delete_bitmap_token.get(), nullptr));
-        static_cast<void>(calc_delete_bitmap_token->wait());
+    if (!request.__isset.tablet_id) {
+        auto error_msg = "tablet_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6.3: commit txn
-    Status commit_txn_status = 
StorageEngine::instance()->txn_manager()->commit_txn(
-            local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
-            rowset_meta->txn_id(), rowset_meta->tablet_id(), 
local_tablet->tablet_uid(),
-            rowset_meta->load_id(), rowset, false);
-    if (!commit_txn_status && 
!commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
-        auto err_msg = fmt::format(
-                "failed to commit txn for remote tablet. rowset_id: {}, 
remote_tablet_id={}, "
-                "txn_id={}, status={}",
-                rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
-                rowset_meta->txn_id(), commit_txn_status.to_string());
-        LOG(WARNING) << err_msg;
-        set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
+    if (!request.__isset.load_id) {
+        auto error_msg = "load_id is empty";

Review Comment:
   warning: 'auto error_msg' can be declared as 'const auto *error_msg' 
[readability-qualified-auto]
   
   ```suggestion
           const auto *error_msg = "load_id is empty";
   ```
   



##########
be/src/service/backend_service.cpp:
##########
@@ -486,239 +783,105 @@
         return;
     }
 
-    // Step 3: get binlog info
-    auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download";, 
request.remote_host,
-                                      request.remote_port);
-    constexpr int max_retry = 3;
+    bool is_async = (_ingest_binlog_workers != nullptr);
+    result.__set_is_async(is_async);
 
-    auto get_binlog_info_url =
-            fmt::format("{}?method={}&tablet_id={}&binlog_version={}", 
binlog_api_url,
-                        "get_binlog_info", request.remote_tablet_id, 
request.binlog_version);
-    std::string binlog_info;
-    auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* 
client) {
-        RETURN_IF_ERROR(client->init(get_binlog_info_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&binlog_info);
-    };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get binlog info from " << 
get_binlog_info_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
+    auto ingest_binlog_func = [=, tstatus = &tstatus]() {
+        IngestBinlogArg ingest_binlog_arg = {
+                .txn_id = txn_id,
+                .partition_id = partition_id,
+                .local_tablet_id = local_tablet_id,
+                .local_tablet = local_tablet,
 
-    std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, 
":");
-    // TODO(Drogon): check binlog info content is right
-    DCHECK(binlog_info_parts.size() == 2);
-    const std::string& remote_rowset_id = binlog_info_parts[0];
-    int64_t num_segments = std::stoll(binlog_info_parts[1]);
+                .request = std::move(request),
+                .tstatus = is_async ? nullptr : tstatus,
+        };
 
-    // Step 4: get rowset meta
-    auto get_rowset_meta_url = fmt::format(
-            "{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", 
binlog_api_url,
-            "get_rowset_meta", request.remote_tablet_id, remote_rowset_id, 
request.binlog_version);
-    std::string rowset_meta_str;
-    auto get_rowset_meta_cb = [&get_rowset_meta_url, 
&rowset_meta_str](HttpClient* client) {
-        RETURN_IF_ERROR(client->init(get_rowset_meta_url));
-        client->set_timeout_ms(kMaxTimeoutMs);
-        return client->execute(&rowset_meta_str);
+        _ingest_binlog(&ingest_binlog_arg);
     };
-    status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
-    if (!status.ok()) {
-        LOG(WARNING) << "failed to get rowset meta from " << 
get_rowset_meta_url
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetMetaPB rowset_meta_pb;
-    if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
-        LOG(WARNING) << "failed to parse rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to parse rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    // rewrite rowset meta
-    rowset_meta_pb.set_tablet_id(local_tablet_id);
-    rowset_meta_pb.set_partition_id(partition_id);
-    
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
-    rowset_meta_pb.set_txn_id(txn_id);
-    rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
-    auto rowset_meta = std::make_shared<RowsetMeta>();
-    if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
-        LOG(WARNING) << "failed to init rowset meta from " << 
get_rowset_meta_url;
-        status = Status::InternalError("failed to init rowset meta");
-        status.to_thrift(&tstatus);
-        return;
-    }
-    RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
-    rowset_meta->set_rowset_id(new_rowset_id);
-    rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
 
-    // Step 5: get all segment files
-    // Step 5.1: get all segment files size
-    std::vector<std::string> segment_file_urls;
-    segment_file_urls.reserve(num_segments);
-    std::vector<uint64_t> segment_file_sizes;
-    segment_file_sizes.reserve(num_segments);
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto get_segment_file_size_url = fmt::format(
-                "{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", 
binlog_api_url,
-                "get_segment_file", request.remote_tablet_id, 
remote_rowset_id, segment_index);
-        uint64_t segment_file_size;
-        auto get_segment_file_size_cb = [&get_segment_file_size_url,
-                                         &segment_file_size](HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_size_url));
-            client->set_timeout_ms(kMaxTimeoutMs);
-            RETURN_IF_ERROR(client->head());
-            return client->get_content_length(&segment_file_size);
-        };
-
-        status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_size_cb);
+    if (is_async) {
+        status = 
_ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
         if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file size from " << 
get_segment_file_size_url
-                         << ", status=" << status.to_string();
             status.to_thrift(&tstatus);
             return;
         }
-
-        segment_file_sizes.push_back(segment_file_size);
-        segment_file_urls.push_back(std::move(get_segment_file_size_url));
-    }
-
-    // Step 5.2: check data capacity
-    uint64_t total_size = std::accumulate(segment_file_sizes.begin(), 
segment_file_sizes.end(), 0);
-    if (!local_tablet->can_add_binlog(total_size)) {
-        LOG(WARNING) << "failed to add binlog, no enough space, total_size=" 
<< total_size
-                     << ", tablet=" << local_tablet->tablet_id();
-        status = Status::InternalError("no enough space");
-        status.to_thrift(&tstatus);
-        return;
+    } else {
+        ingest_binlog_func();
     }
+}
 
-    // Step 5.3: get all segment files
-    for (int64_t segment_index = 0; segment_index < num_segments; 
++segment_index) {
-        auto segment_file_size = segment_file_sizes[segment_index];
-        auto get_segment_file_url = segment_file_urls[segment_index];
-
-        uint64_t estimate_timeout =
-                segment_file_size / config::download_low_speed_limit_kbps / 
1024;
-        if (estimate_timeout < config::download_low_speed_time) {
-            estimate_timeout = config::download_low_speed_time;
-        }
-
-        auto local_segment_path = BetaRowset::segment_file_path(
-                local_tablet->tablet_path(), rowset_meta->rowset_id(), 
segment_index);
-        LOG(INFO) << fmt::format("download segment file from {} to {}", 
get_segment_file_url,
-                                 local_segment_path);
-        auto get_segment_file_cb = [&get_segment_file_url, 
&local_segment_path, segment_file_size,
-                                    estimate_timeout](HttpClient* client) {
-            RETURN_IF_ERROR(client->init(get_segment_file_url));
-            client->set_timeout_ms(estimate_timeout * 1000);
-            RETURN_IF_ERROR(client->download(local_segment_path));
+void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
+                                         const TQueryIngestBinlogRequest& 
request) {
+    LOG(INFO) << "query ingest binlog. request: " << 
apache::thrift::ThriftDebugString(request);
 
-            std::error_code ec;
-            // Check file length
-            uint64_t local_file_size = 
std::filesystem::file_size(local_segment_path, ec);
-            if (ec) {
-                LOG(WARNING) << "download file error" << ec.message();
-                return Status::IOError("can't retrive file_size of {}, due to 
{}",
-                                       local_segment_path, ec.message());
-            }
-            if (local_file_size != segment_file_size) {
-                LOG(WARNING) << "download file length error"
-                             << ", get_segment_file_url=" << 
get_segment_file_url
-                             << ", file_size=" << segment_file_size
-                             << ", local_file_size=" << local_file_size;
-                return Status::InternalError("downloaded file size is not 
equal");
-            }
-            chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
-            return Status::OK();
-        };
+    auto set_result = [&](TIngestBinlogStatus::type status, std::string 
error_msg) {
+        result.__set_status(status);
+        result.__set_err_msg(std::move(error_msg));
+    };
 
-        auto status = HttpClient::execute_with_retry(max_retry, 1, 
get_segment_file_cb);
-        if (!status.ok()) {
-            LOG(WARNING) << "failed to get segment file from " << 
get_segment_file_url
-                         << ", status=" << status.to_string();
-            status.to_thrift(&tstatus);
-            return;
-        }
+    /// Check args: txn_id, partition_id, tablet_id, load_id
+    if (!request.__isset.txn_id) {
+        auto error_msg = "txn_id is empty";
+        LOG(WARNING) << error_msg;
+        set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
+        return;
     }
-
-    // Step 6: create rowset && calculate delete bitmap && commit
-    // Step 6.1: create rowset
-    RowsetSharedPtr rowset;
-    status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
-                                          local_tablet->tablet_path(), 
rowset_meta, &rowset);
-
-    if (!status) {
-        LOG(WARNING) << "failed to create rowset from rowset meta for remote 
tablet"
-                     << ". rowset_id: " << rowset_meta_pb.rowset_id()
-                     << ", rowset_type: " << rowset_meta_pb.rowset_type()
-                     << ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << 
", txn_id=" << txn_id
-                     << ", status=" << status.to_string();
-        status.to_thrift(&tstatus);
+    if (!request.__isset.partition_id) {
+        auto error_msg = "partition_id is empty";

Review Comment:
   warning: 'auto error_msg' can be declared as 'const auto *error_msg' 
[readability-qualified-auto]
   
   ```suggestion
           const auto *error_msg = "partition_id is empty";
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to