This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 822f2b1255aa584ec64f91bf2a3ebe68b42f8aee
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Feb 1 18:45:22 2024 +0800

     [improve](stream-load) add observability on receiving HTTP request #30432
---
 be/src/http/action/stream_load.cpp                  | 14 +++++++++++++-
 be/src/runtime/stream_load/stream_load_context.h    |  2 ++
 be/src/runtime/stream_load/stream_load_executor.cpp | 10 +++++++++-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index cf3cd469fad..09e6376df4c 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -77,6 +77,8 @@ 
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, 
MetricUnit::MILLISECONDS);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, 
MetricUnit::REQUESTS);
 
+bvar::LatencyRecorder 
g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
+
 static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
 static const string CHUNK = "chunked";
 
@@ -195,9 +197,11 @@ int StreamLoadAction::on_header(HttpRequest* req) {
 
     LOG(INFO) << "new income streaming load request." << ctx->brief() << ", 
db=" << ctx->db
               << ", tbl=" << ctx->table << ", group_commit=" << 
ctx->group_commit;
+    ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
 
     if (st.ok()) {
         st = _on_header(req, ctx);
+        LOG(INFO) << "finished to handle HTTP header, " << ctx->brief();
     }
     if (!st.ok()) {
         ctx->status = std::move(st);
@@ -350,7 +354,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
         }
         ctx->receive_bytes += remove_bytes;
     }
-    ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
+    int64_t read_data_time = MonotonicNanos() - start_read_data_time;
+    int64_t last_receive_and_read_data_cost_nanos = 
ctx->receive_and_read_data_cost_nanos;
+    ctx->read_data_cost_nanos += read_data_time;
+    ctx->receive_and_read_data_cost_nanos =
+            MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
+    g_stream_load_receive_data_latency_ms
+            << (ctx->receive_and_read_data_cost_nanos - 
last_receive_and_read_data_cost_nanos -
+                read_data_time) /
+                       1000000;
 }
 
 void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
diff --git a/be/src/runtime/stream_load/stream_load_context.h 
b/be/src/runtime/stream_load/stream_load_context.h
index 3f1f6b92431..376228e022d 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -208,6 +208,8 @@ public:
     int64_t pre_commit_txn_cost_nanos = 0;
     int64_t read_data_cost_nanos = 0;
     int64_t write_data_cost_nanos = 0;
+    int64_t receive_and_read_data_cost_nanos = 0;
+    int64_t begin_receive_and_read_data_cost_nanos = 0;
 
     std::string error_url = "";
     // if label already be used, set existing job's status here
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 386db8c8497..54ad204a879 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -68,7 +68,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
 // submit this params
 #ifndef BE_TEST
     ctx->start_write_data_nanos = MonotonicNanos();
-    LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id=" 
<< ctx->txn_id
+    LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", 
txn_id=" << ctx->txn_id
               << ", query_id=" << 
print_id(ctx->put_result.params.params.query_id);
     Status st;
     auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
@@ -148,6 +148,14 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
                 static_cast<void>(this->commit_txn(ctx.get()));
             }
         }
+
+        LOG(INFO) << "finished to execute stream load. label=" << ctx->label
+                  << ", txn_id=" << ctx->txn_id
+                  << ", query_id=" << 
print_id(ctx->put_result.params.params.query_id)
+                  << ", receive_data_cost_ms="
+                  << (ctx->receive_and_read_data_cost_nanos - 
ctx->read_data_cost_nanos) / 1000000
+                  << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 
1000000
+                  << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 
1000000;
     };
 
     if (ctx->put_result.__isset.params) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to