This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 916fad0c778 branch-4.0: [fix](profile) Fix the issue about load
channel profile #58787 (#59094)
916fad0c778 is described below
commit 916fad0c778d460ce774ff9368f603a130e1c11b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 18:04:25 2025 +0800
branch-4.0: [fix](profile) Fix the issue about load channel profile #58787
(#59094)
Cherry-picked from #58787
Co-authored-by: Refrain <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 6 +-
.../query_profile/test_load_channel_profile.out | 4 +
.../query_profile/test_load_channel_profile.groovy | 139 +++++++++++++++++++++
3 files changed, 146 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d25cbc964a8..8ab48be87b1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1985,14 +1985,14 @@
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
for (const auto& tasks : _tasks) {
for (const auto& task : tasks) {
- if (task.second->runtime_profile() == nullptr) {
+ if (task.second->load_channel_profile() == nullptr) {
continue;
}
auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
-
task.second->runtime_profile()->to_thrift(tmp_load_channel_profile.get(),
-
_runtime_state->profile_level());
+
task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(),
+
_runtime_state->profile_level());
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
}
}
diff --git a/regression-test/data/query_profile/test_load_channel_profile.out
b/regression-test/data/query_profile/test_load_channel_profile.out
new file mode 100644
index 00000000000..117be98cb11
--- /dev/null
+++ b/regression-test/data/query_profile/test_load_channel_profile.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+10
+
diff --git
a/regression-test/suites/query_profile/test_load_channel_profile.groovy
b/regression-test/suites/query_profile/test_load_channel_profile.groovy
new file mode 100644
index 00000000000..c20217bde72
--- /dev/null
+++ b/regression-test/suites/query_profile/test_load_channel_profile.groovy
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+def getProfileList = { masterHTTPAddr ->
+ def dst = 'http://' + masterHTTPAddr
+ def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+
+def getProfile = { masterHTTPAddr, id ->
+ def dst = 'http://' + masterHTTPAddr
+ def conn = new URL(dst +
"/api/profile/text/?query_id=$id").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+suite('test_load_channel_profile') {
+ sql "set enable_profile=true;"
+ sql "set profile_level=3;"
+ sql "set enable_memtable_on_sink_node=false;"
+
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ sql "drop table if exists t;"
+ sql """
+ CREATE TABLE IF NOT EXISTS t(
+ a INT,
+ b INT
+ ) ENGINE=OLAP
+ DUPLICATE KEY(a)
+ DISTRIBUTED BY RANDOM BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ ///////////////////////////////////
+ // load channel profile
+ ///////////////////////////////////
+
+ try {
+ def ak = getS3AK()
+ def sk = getS3SK()
+
+ def s3Uri = "s3://${getS3BucketName()}/load/tvf_compress.csv.lz4"
+ logger.info("Loading from S3 URI: $s3Uri")
+
+ def sql_str = """
+ INSERT INTO t
+ SELECT CAST(split_part(c1, '|', 1) AS INT) AS a,
CAST(split_part(c1, '|', 2) AS INT) AS b FROM S3 (
+ "uri" = "$s3Uri",
+ "s3.access_key" = "$ak",
+ "s3.secret_key" = "$sk",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "csv",
+ "compress_type" = "lz4"
+ );
+ """
+ logger.info("submit sql: ${sql_str}");
+ sql """${sql_str}"""
+ logger.info("Insert completed from S3 TVF")
+
+ Thread.sleep(500)
+ qt_select """ select count(*) from t """
+
+ def allFrontends = sql """show frontends;"""
+ logger.info("allFrontends: " + allFrontends)
+ /*
+ - allFrontends: [[fe_2457d42b_68ad_43c4_a888_b3558a365be2, 127.0.0.1,
6917, 5937, 6937, 5927, -1, FOLLOWER, true, 1523277282, true, true, 13436,
2025-01-22 16:39:05, 2025-01-22 21:43:49, true, , doris-0.0.0--03faad7da5, Yes]]
+ */
+ def frontendCounts = allFrontends.size()
+ def masterIP = ""
+ def masterHTTPPort = ""
+
+ for (def i = 0; i < frontendCounts; i++) {
+ def currentFrontend = allFrontends[i]
+ def isMaster = currentFrontend[8]
+ if (isMaster == "true") {
+ masterIP = allFrontends[i][1]
+ masterHTTPPort = allFrontends[i][3]
+ break
+ }
+ }
+ def masterAddress = masterIP + ":" + masterHTTPPort
+ logger.info("masterIP:masterHTTPPort is:${masterAddress}")
+
+ def profileListString = getProfileList(masterAddress)
+ logger.info("profileListString:" + profileListString)
+ def jsonSlurper = new JsonSlurper()
+ def profileList = jsonSlurper.parseText(profileListString)
+
+ def queryId = ""
+ if (profileList.data && profileList.data.rows &&
profileList.data.rows.size() > 0) {
+ for (def row : profileList.data.rows) {
+ if (row[3] &&
row[3].toString().toUpperCase().contains("INSERT")) {
+ queryId = row[0]
+ break
+ }
+ }
+ }
+ logger.info("queryId: " + queryId)
+
+ if (queryId) {
+ def profileString = getProfile(masterAddress, queryId)
+ logger.info("profileDataString:" + profileString)
+ assertTrue(profileString.contains("TabletsChannel") ||
profileString.contains("DeltaWriter") ||
profileString.contains("MemTableWriter"))
+ } else {
+ logger.warn("No INSERT query found in profile list")
+ }
+ } finally {
+ sql "set enable_profile=false;"
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]