This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5a76dbf2332 [fix](profile) Fix the issues of too many writer profile
results (#59863)
5a76dbf2332 is described below
commit 5a76dbf2332e0868caea5b1220e6386e48148093
Author: Refrain <[email protected]>
AuthorDate: Thu Jan 15 17:47:51 2026 +0800
[fix](profile) Fix the issues of too many writer profile results (#59863)
---
be/src/olap/delta_writer_v2.cpp | 8 +-
.../query_profile/test_writer_profile.groovy | 194 +++++++++++++++++++++
2 files changed, 200 insertions(+), 2 deletions(-)
diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index ada1ccba5a1..8cf815aa7c0 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -186,10 +186,14 @@ Status DeltaWriterV2::close_wait(int32_t& num_segments,
RuntimeProfile* profile)
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
- if (profile != nullptr) {
+ if (_state->profile_level() >= 2 && profile != nullptr) {
_update_profile(profile);
}
- RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+ if (_state->profile_level() >= 2) {
+ RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+ } else {
+ RETURN_IF_ERROR(_memtable_writer->close_wait());
+ }
num_segments = _rowset_writer->next_segment_id();
_delta_written_success = true;
diff --git a/regression-test/suites/query_profile/test_writer_profile.groovy
b/regression-test/suites/query_profile/test_writer_profile.groovy
new file mode 100644
index 00000000000..f3e0f60b817
--- /dev/null
+++ b/regression-test/suites/query_profile/test_writer_profile.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+def getProfileList = { masterHTTPAddr ->
+ def dst = 'http://' + masterHTTPAddr
+ def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+
+def getProfile = { masterHTTPAddr, id ->
+ def dst = 'http://' + masterHTTPAddr
+ def conn = new URL(dst +
"/api/profile/text/?query_id=$id").openConnection()
+ conn.setRequestMethod("GET")
+ def encoding =
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" +
+ (context.config.feHttpPassword == null ? "" :
context.config.feHttpPassword)).getBytes("UTF-8"))
+ conn.setRequestProperty("Authorization", "Basic ${encoding}")
+ return conn.getInputStream().getText()
+}
+
+suite('test_writer_profile', "nonConcurrent") {
+ //cloud-mode
+ if (isCloudMode()) {
+ return
+ }
+
+ sql "set enable_profile=true;"
+
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+ def s3Uri = "s3://${getS3BucketName()}/load/data by line.json"
+
+ sql "drop table if exists t;"
+ sql """
+ CREATE TABLE t(
+ a INT,
+ b INT
+ )
+ DUPLICATE KEY(a)
+ PROPERTIES("replication_num" = "1");
+ """
+
+ try {
+ def sql_str = """
+ INSERT INTO t
+ SELECT * FROM S3(
+ "uri" = "$s3Uri",
+ "s3.access_key" = "$ak",
+ "s3.secret_key" = "$sk",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json",
+ "read_json_by_line" = "true"
+ );
+ """
+ sql """${sql_str}"""
+
+ Thread.sleep(500)
+
+ def allFrontends = sql """show frontends;"""
+ logger.info("allFrontends: " + allFrontends)
+ /*
+ - allFrontends: [[fe_2457d42b_68ad_43c4_a888_b3558a365be2, 127.0.0.1,
6917, 5937, 6937, 5927, -1, FOLLOWER, true, 1523277282, true, true, 13436,
2025-01-22 16:39:05, 2025-01-22 21:43:49, true, , doris-0.0.0--03faad7da5, Yes]]
+ */
+ def frontendCounts = allFrontends.size()
+ def masterIP = ""
+ def masterHTTPPort = ""
+
+ for (def i = 0; i < frontendCounts; i++) {
+ def currentFrontend = allFrontends[i]
+ def isMaster = currentFrontend[8]
+ if (isMaster == "true") {
+ masterIP = allFrontends[i][1]
+ masterHTTPPort = allFrontends[i][3]
+ break
+ }
+ }
+ def masterAddress = masterIP + ":" + masterHTTPPort
+ logger.info("masterIP:masterHTTPPort is:${masterAddress}")
+
+ def profileListString = getProfileList(masterAddress)
+ def jsonSlurper = new JsonSlurper()
+ def profileList = jsonSlurper.parseText(profileListString)
+
+ def queryId = ""
+ if (profileList.data && profileList.data.rows &&
profileList.data.rows.size() > 0) {
+ for (def row : profileList.data.rows) {
+ def taskType = row."Task Type" ?: row['Task Type']
+ def sqlStatement = row."Sql Statement" ?: row['Sql Statement']
+ if (taskType == "LOAD" && sqlStatement &&
sqlStatement.toString().toUpperCase().contains("INSERT")) {
+ queryId = row."Profile ID" ?: row['Profile ID']
+ break
+ }
+ }
+ }
+
+ assertTrue(queryId != null && queryId != "", "No INSERT query found in
profile list")
+ def profileString = getProfile(masterAddress, queryId)
+ logger.info(profileString)
+ assertFalse(profileString.contains("DeltaWriterV2"), "should not
contain DeltaWriterV2")
+ assertFalse(profileString.contains("MemTableWriter"), "should not
contain MemTableWriter")
+ } finally {
+ sql "set enable_profile=false;"
+ }
+
+
+ sql "set enable_profile=true;"
+ sql "set profile_level=2;"
+
+ try {
+ def sql_str = """
+ INSERT INTO t
+ SELECT * FROM S3(
+ "uri" = "$s3Uri",
+ "s3.access_key" = "$ak",
+ "s3.secret_key" = "$sk",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}",
+ "format" = "json",
+ "read_json_by_line" = "true"
+ );
+ """
+ sql """${sql_str}"""
+
+ Thread.sleep(500)
+
+ def allFrontends = sql """show frontends;"""
+ logger.info("allFrontends: " + allFrontends)
+ /*
+ - allFrontends: [[fe_2457d42b_68ad_43c4_a888_b3558a365be2, 127.0.0.1,
6917, 5937, 6937, 5927, -1, FOLLOWER, true, 1523277282, true, true, 13436,
2025-01-22 16:39:05, 2025-01-22 21:43:49, true, , doris-0.0.0--03faad7da5, Yes]]
+ */
+ def frontendCounts = allFrontends.size()
+ def masterIP = ""
+ def masterHTTPPort = ""
+
+ for (def i = 0; i < frontendCounts; i++) {
+ def currentFrontend = allFrontends[i]
+ def isMaster = currentFrontend[8]
+ if (isMaster == "true") {
+ masterIP = allFrontends[i][1]
+ masterHTTPPort = allFrontends[i][3]
+ break
+ }
+ }
+ def masterAddress = masterIP + ":" + masterHTTPPort
+ logger.info("masterIP:masterHTTPPort is:${masterAddress}")
+
+ def profileListString = getProfileList(masterAddress)
+ def jsonSlurper = new JsonSlurper()
+ def profileList = jsonSlurper.parseText(profileListString)
+
+ def queryId = ""
+ if (profileList.data && profileList.data.rows &&
profileList.data.rows.size() > 0) {
+ for (def row : profileList.data.rows) {
+ def taskType = row."Task Type" ?: row['Task Type']
+ def sqlStatement = row."Sql Statement" ?: row['Sql Statement']
+ if (taskType == "LOAD" && sqlStatement &&
sqlStatement.toString().toUpperCase().contains("INSERT")) {
+ queryId = row."Profile ID" ?: row['Profile ID']
+ break
+ }
+ }
+ }
+ assertTrue(queryId != null && queryId != "", "No INSERT query found in
profile list")
+ def profileString = getProfile(masterAddress, queryId)
+ logger.info(profileString)
+ assertTrue(profileString.contains("DeltaWriterV2"), "should contain
DeltaWriterV2")
+ assertTrue(profileString.contains("MemTableWriter"), "should contain
MemTableWriter")
+ } finally {
+ sql "set enable_profile=false;"
+ sql "set profile_level=1;"
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]