This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 9b14720107b [improve](stream-load) add observability on receiving HTTP
request (#41136)
9b14720107b is described below
commit 9b14720107bb02d1ab940086e3a2d7f35c2a3f0f
Author: hui lai <[email protected]>
AuthorDate: Thu Sep 26 11:48:34 2024 +0800
[improve](stream-load) add observability on receiving HTTP request (#41136)
pick https://github.com/apache/doris/pull/30432 and
https://github.com/apache/doris/pull/40735
---------
Co-authored-by: HHoflittlefish777
<[email protected]>
---
be/src/http/action/stream_load.cpp | 14 +++++++++++++-
be/src/runtime/stream_load/stream_load_context.cpp | 2 ++
be/src/runtime/stream_load/stream_load_context.h | 2 ++
.../runtime/stream_load/stream_load_executor.cpp | 22 +++++++++++++++++++++-
4 files changed, 38 insertions(+), 2 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 1b0eadd47c3..3557d750ed4 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -75,6 +75,8 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms,
MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing,
MetricUnit::REQUESTS);
+bvar::LatencyRecorder
g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");
+
static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const string CHUNK = "chunked";
@@ -188,8 +190,10 @@ int StreamLoadAction::on_header(HttpRequest* req) {
LOG(INFO) << "new income streaming load request." << ctx->brief() << ",
db=" << ctx->db
<< ", tbl=" << ctx->table;
+ ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();
auto st = _on_header(req, ctx);
+
if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
@@ -340,7 +344,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
}
ctx->receive_bytes += remove_bytes;
}
- ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
+ int64_t read_data_time = MonotonicNanos() - start_read_data_time;
+ int64_t last_receive_and_read_data_cost_nanos =
ctx->receive_and_read_data_cost_nanos;
+ ctx->read_data_cost_nanos += read_data_time;
+ ctx->receive_and_read_data_cost_nanos =
+ MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
+ g_stream_load_receive_data_latency_ms
+ << (ctx->receive_and_read_data_cost_nanos -
last_receive_and_read_data_cost_nanos -
+ read_data_time) /
+ 1000000;
}
void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
diff --git a/be/src/runtime/stream_load/stream_load_context.cpp
b/be/src/runtime/stream_load/stream_load_context.cpp
index f381ba097db..2e9aa4fad9f 100644
--- a/be/src/runtime/stream_load/stream_load_context.cpp
+++ b/be/src/runtime/stream_load/stream_load_context.cpp
@@ -100,6 +100,8 @@ std::string StreamLoadContext::to_json() const {
writer.Int64(read_data_cost_nanos / 1000000);
writer.Key("WriteDataTimeMs");
writer.Int(write_data_cost_nanos / 1000000);
+ writer.Key("ReceiveDataTimeMs");
+ writer.Int((receive_and_read_data_cost_nanos - read_data_cost_nanos) /
1000000);
writer.Key("CommitAndPublishTimeMs");
writer.Int64(commit_and_publish_txn_cost_nanos / 1000000);
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 0e821956470..334e9ad6af2 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -194,6 +194,8 @@ public:
int64_t pre_commit_txn_cost_nanos = 0;
int64_t read_data_cost_nanos = 0;
int64_t write_data_cost_nanos = 0;
+ int64_t receive_and_read_data_cost_nanos = 0;
+ int64_t begin_receive_and_read_data_cost_nanos = 0;
std::string error_url = "";
// if label already be used, set existing job's status here
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 19d25e9ffa1..75386d7aa6b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -69,7 +69,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
// submit this params
#ifndef BE_TEST
ctx->start_write_data_nanos = MonotonicNanos();
- LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id="
<< ctx->txn_id
+ LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ",
txn_id=" << ctx->txn_id
<< ", query_id=" <<
print_id(ctx->put_result.params.params.query_id);
Status st;
if (ctx->put_result.__isset.params) {
@@ -144,6 +144,16 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
this->commit_txn(ctx.get());
}
}
+
+ LOG(INFO) << "finished to execute stream load. label=" <<
ctx->label
+ << ", txn_id=" << ctx->txn_id
+ << ", query_id=" <<
print_id(ctx->put_result.params.params.query_id)
+ << ", receive_data_cost_ms="
+ << (ctx->receive_and_read_data_cost_nanos -
+ ctx->read_data_cost_nanos) /
+ 1000000
+ << ", read_data_cost_ms=" <<
ctx->read_data_cost_nanos / 1000000
+ << ", write_data_cost_ms=" <<
ctx->write_data_cost_nanos / 1000000;
});
} else {
st = _exec_env->fragment_mgr()->exec_plan_fragment(
@@ -217,6 +227,16 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
this->commit_txn(ctx.get());
}
}
+
+ LOG(INFO) << "finished to execute stream load. label=" <<
ctx->label
+ << ", txn_id=" << ctx->txn_id
+ << ", query_id=" <<
print_id(ctx->put_result.params.params.query_id)
+ << ", receive_data_cost_ms="
+ << (ctx->receive_and_read_data_cost_nanos -
+ ctx->read_data_cost_nanos) /
+ 1000000
+ << ", read_data_cost_ms=" <<
ctx->read_data_cost_nanos / 1000000
+ << ", write_data_cost_ms=" <<
ctx->write_data_cost_nanos / 1000000;
});
}
if (!st.ok()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]