This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5a76dbf2332 [fix](profile) Fix the issues of too many writer profile 
results (#59863)
5a76dbf2332 is described below

commit 5a76dbf2332e0868caea5b1220e6386e48148093
Author: Refrain <[email protected]>
AuthorDate: Thu Jan 15 17:47:51 2026 +0800

    [fix](profile) Fix the issues of too many writer profile results (#59863)
---
 be/src/olap/delta_writer_v2.cpp                    |   8 +-
 .../query_profile/test_writer_profile.groovy       | 194 +++++++++++++++++++++
 2 files changed, 200 insertions(+), 2 deletions(-)

diff --git a/be/src/olap/delta_writer_v2.cpp b/be/src/olap/delta_writer_v2.cpp
index ada1ccba5a1..8cf815aa7c0 100644
--- a/be/src/olap/delta_writer_v2.cpp
+++ b/be/src/olap/delta_writer_v2.cpp
@@ -186,10 +186,14 @@ Status DeltaWriterV2::close_wait(int32_t& num_segments, 
RuntimeProfile* profile)
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before close_wait() 
being called";
 
-    if (profile != nullptr) {
+    if (_state->profile_level() >= 2 && profile != nullptr) {
         _update_profile(profile);
     }
-    RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+    if (_state->profile_level() >= 2) {
+        RETURN_IF_ERROR(_memtable_writer->close_wait(profile));
+    } else {
+        RETURN_IF_ERROR(_memtable_writer->close_wait());
+    }
     num_segments = _rowset_writer->next_segment_id();
 
     _delta_written_success = true;
diff --git a/regression-test/suites/query_profile/test_writer_profile.groovy 
b/regression-test/suites/query_profile/test_writer_profile.groovy
new file mode 100644
index 00000000000..f3e0f60b817
--- /dev/null
+++ b/regression-test/suites/query_profile/test_writer_profile.groovy
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import groovy.json.JsonSlurper
+
+def getProfileList = { masterHTTPAddr ->
+    def dst = 'http://' + masterHTTPAddr
+    def conn = new URL(dst + "/rest/v1/query_profile").openConnection()
+    conn.setRequestMethod("GET")
+    def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + 
+            (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+    conn.setRequestProperty("Authorization", "Basic ${encoding}")
+    return conn.getInputStream().getText()
+}
+
+
+def getProfile = { masterHTTPAddr, id ->
+    def dst = 'http://' + masterHTTPAddr
+    def conn = new URL(dst + 
"/api/profile/text/?query_id=$id").openConnection()
+    conn.setRequestMethod("GET")
+    def encoding = 
Base64.getEncoder().encodeToString((context.config.feHttpUser + ":" + 
+            (context.config.feHttpPassword == null ? "" : 
context.config.feHttpPassword)).getBytes("UTF-8"))
+    conn.setRequestProperty("Authorization", "Basic ${encoding}")
+    return conn.getInputStream().getText()
+}
+
+suite('test_writer_profile', "nonConcurrent") {
+    //cloud-mode
+    if (isCloudMode()) {
+        return
+    }
+    
+    sql "set enable_profile=true;"
+
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+    def ak = getS3AK()
+    def sk = getS3SK()
+    def s3Uri = "s3://${getS3BucketName()}/load/data by line.json"
+
+    sql "drop table if exists t;"
+    sql """
+        CREATE TABLE t(
+            a INT,
+            b INT
+        )
+        DUPLICATE KEY(a) 
+        PROPERTIES("replication_num" = "1");
+    """
+
+    try {
+        def sql_str = """
+            INSERT INTO t
+            SELECT * FROM S3(
+                "uri" = "$s3Uri",
+                "s3.access_key" = "$ak",
+                "s3.secret_key" = "$sk",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}",
+                "format" = "json",
+                "read_json_by_line" = "true"
+            );
+        """
+        sql """${sql_str}"""
+
+        Thread.sleep(500)
+
+        def allFrontends = sql """show frontends;"""
+        logger.info("allFrontends: " + allFrontends)
+        /*
+        - allFrontends: [[fe_2457d42b_68ad_43c4_a888_b3558a365be2, 127.0.0.1, 
6917, 5937, 6937, 5927, -1, FOLLOWER, true, 1523277282, true, true, 13436, 
2025-01-22 16:39:05, 2025-01-22 21:43:49, true, , doris-0.0.0--03faad7da5, Yes]]
+        */
+        def frontendCounts = allFrontends.size()
+        def masterIP = ""
+        def masterHTTPPort = ""
+
+        for (def i = 0; i < frontendCounts; i++) {
+            def currentFrontend = allFrontends[i]
+            def isMaster = currentFrontend[8]
+            if (isMaster == "true") {
+                masterIP = allFrontends[i][1]
+                masterHTTPPort = allFrontends[i][3]
+                break
+            }
+        }
+        def masterAddress = masterIP + ":" + masterHTTPPort
+        logger.info("masterIP:masterHTTPPort is:${masterAddress}")
+
+        def profileListString = getProfileList(masterAddress)
+        def jsonSlurper = new JsonSlurper()
+        def profileList = jsonSlurper.parseText(profileListString)
+        
+        def queryId = ""
+        if (profileList.data && profileList.data.rows && 
profileList.data.rows.size() > 0) {
+            for (def row : profileList.data.rows) {
+                def taskType = row."Task Type" ?: row['Task Type']
+                def sqlStatement = row."Sql Statement" ?: row['Sql Statement']
+                if (taskType == "LOAD" && sqlStatement && 
sqlStatement.toString().toUpperCase().contains("INSERT")) {
+                    queryId = row."Profile ID" ?: row['Profile ID']
+                    break
+                }
+            }
+        }
+
+        assertTrue(queryId != null && queryId != "", "No INSERT query found in 
profile list")
+        def profileString = getProfile(masterAddress, queryId)
+        logger.info(profileString)
+        assertFalse(profileString.contains("DeltaWriterV2"), "should not 
contain DeltaWriterV2")
+        assertFalse(profileString.contains("MemTableWriter"), "should not 
contain MemTableWriter")
+    } finally {
+        sql "set enable_profile=false;"   
+    }
+
+
+    sql "set enable_profile=true;"   
+    sql "set profile_level=2;"
+
+    try {
+        def sql_str = """
+            INSERT INTO t
+            SELECT * FROM S3(
+                "uri" = "$s3Uri",
+                "s3.access_key" = "$ak",
+                "s3.secret_key" = "$sk",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}",
+                "format" = "json",
+                "read_json_by_line" = "true"
+            );
+        """
+        sql """${sql_str}"""
+
+        Thread.sleep(500)
+
+        def allFrontends = sql """show frontends;"""
+        logger.info("allFrontends: " + allFrontends)
+        /*
+        - allFrontends: [[fe_2457d42b_68ad_43c4_a888_b3558a365be2, 127.0.0.1, 
6917, 5937, 6937, 5927, -1, FOLLOWER, true, 1523277282, true, true, 13436, 
2025-01-22 16:39:05, 2025-01-22 21:43:49, true, , doris-0.0.0--03faad7da5, Yes]]
+        */
+        def frontendCounts = allFrontends.size()
+        def masterIP = ""
+        def masterHTTPPort = ""
+
+        for (def i = 0; i < frontendCounts; i++) {
+            def currentFrontend = allFrontends[i]
+            def isMaster = currentFrontend[8]
+            if (isMaster == "true") {
+                masterIP = allFrontends[i][1]
+                masterHTTPPort = allFrontends[i][3]
+                break
+            }
+        }
+        def masterAddress = masterIP + ":" + masterHTTPPort
+        logger.info("masterIP:masterHTTPPort is:${masterAddress}")
+
+        def profileListString = getProfileList(masterAddress)
+        def jsonSlurper = new JsonSlurper()
+        def profileList = jsonSlurper.parseText(profileListString)
+        
+        def queryId = ""
+        if (profileList.data && profileList.data.rows && 
profileList.data.rows.size() > 0) {
+            for (def row : profileList.data.rows) {
+                def taskType = row."Task Type" ?: row['Task Type']
+                def sqlStatement = row."Sql Statement" ?: row['Sql Statement']
+                if (taskType == "LOAD" && sqlStatement && 
sqlStatement.toString().toUpperCase().contains("INSERT")) {
+                    queryId = row."Profile ID" ?: row['Profile ID']
+                    break
+                }
+            }
+        }
+        assertTrue(queryId != null && queryId != "", "No INSERT query found in 
profile list")
+        def profileString = getProfile(masterAddress, queryId)
+        logger.info(profileString)
+        assertTrue(profileString.contains("DeltaWriterV2"), "should contain 
DeltaWriterV2")
+        assertTrue(profileString.contains("MemTableWriter"), "should contain 
MemTableWriter")
+    } finally {
+        sql "set enable_profile=false;"
+        sql "set profile_level=1;"
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to