This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ee36b2f70d51dedb69c7f4e81ab9b219b765a180
Author: morningman <morning...@163.com>
AuthorDate: Mon Apr 8 13:45:53 2024 +0800

    [branch-2.1](opt)(profile) parallel serialize fragment and add detail 
schedule profile #33376 #33379
---
 .../doris/common/profile/SummaryProfile.java       | 282 ++++++++++-----------
 .../org/apache/doris/common/util/BrokerUtil.java   |   3 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  80 ++++--
 .../java/org/apache/doris/qe/StmtExecutor.java     |   1 +
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  17 +-
 .../ExternalFileTableValuedFunction.java           |   2 +-
 6 files changed, 217 insertions(+), 168 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 0d07e865d02..3ca627f4d99 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -62,14 +62,18 @@ public class SummaryProfile {
     public static final String INIT_SCAN_NODE_TIME = "Init Scan Node Time";
     public static final String FINALIZE_SCAN_NODE_TIME = "Finalize Scan Node 
Time";
     public static final String GET_SPLITS_TIME = "Get Splits Time";
-    public static final String GET_PARTITIONS_TIME = "Get PARTITIONS Time";
-    public static final String GET_PARTITION_FILES_TIME = "Get PARTITION FILES 
Time";
+    public static final String GET_PARTITIONS_TIME = "Get Partitions Time";
+    public static final String GET_PARTITION_FILES_TIME = "Get Partition Files 
Time";
     public static final String CREATE_SCAN_RANGE_TIME = "Create Scan Range 
Time";
     public static final String PLAN_TIME = "Plan Time";
     public static final String SCHEDULE_TIME = "Schedule Time";
+    public static final String ASSIGN_FRAGMENT_TIME = "Fragment Assign Time";
+    public static final String FRAGMENT_SERIALIZE_TIME = "Fragment Serialize 
Time";
+    public static final String SEND_FRAGMENT_PHASE1_TIME = "Fragment RPC 
Phase1 Time";
+    public static final String SEND_FRAGMENT_PHASE2_TIME = "Fragment RPC 
Phase2 Time";
+    public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result 
Time";
     public static final String FETCH_RESULT_TIME = "Fetch Result Time";
     public static final String WRITE_RESULT_TIME = "Write Result Time";
-    public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result 
Time";
 
     public static final String PARSE_SQL_TIME = "Parse SQL Time";
     public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time";
@@ -77,34 +81,75 @@ public class SummaryProfile {
     public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
     public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate 
Time";
 
+    public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed 
Size";
+    public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count";
+
     // These info will display on FE's web ui table, every one will be 
displayed as
     // a column, so that should not
     // add many columns here. Add to ExcecutionSummary list.
     public static final ImmutableList<String> SUMMARY_KEYS = 
ImmutableList.of(PROFILE_ID, TASK_TYPE,
             START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, 
SQL_STATEMENT);
 
+    // The display order of execution summary items.
     public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = 
ImmutableList.of(
-            PARSE_SQL_TIME, NEREIDS_ANALYSIS_TIME, NEREIDS_REWRITE_TIME, 
NEREIDS_OPTIMIZE_TIME, NEREIDS_TRANSLATE_TIME,
-            WORKLOAD_GROUP, ANALYSIS_TIME,
-            PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME, 
QUERY_DISTRIBUTED_TIME,
-            INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME, 
GET_PARTITIONS_TIME,
-            GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, SCHEDULE_TIME, 
FETCH_RESULT_TIME,
-            WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION, 
IS_NEREIDS, IS_PIPELINE,
-            IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, 
PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID);
+            PARSE_SQL_TIME,
+            NEREIDS_ANALYSIS_TIME,
+            NEREIDS_REWRITE_TIME,
+            NEREIDS_OPTIMIZE_TIME,
+            NEREIDS_TRANSLATE_TIME,
+            WORKLOAD_GROUP,
+            ANALYSIS_TIME,
+            PLAN_TIME,
+            JOIN_REORDER_TIME,
+            CREATE_SINGLE_NODE_TIME,
+            QUERY_DISTRIBUTED_TIME,
+            INIT_SCAN_NODE_TIME,
+            FINALIZE_SCAN_NODE_TIME,
+            GET_SPLITS_TIME,
+            GET_PARTITIONS_TIME,
+            GET_PARTITION_FILES_TIME,
+            CREATE_SCAN_RANGE_TIME,
+            SCHEDULE_TIME,
+            ASSIGN_FRAGMENT_TIME,
+            FRAGMENT_SERIALIZE_TIME,
+            SEND_FRAGMENT_PHASE1_TIME,
+            SEND_FRAGMENT_PHASE2_TIME,
+            FRAGMENT_COMPRESSED_SIZE,
+            FRAGMENT_RPC_COUNT,
+            WAIT_FETCH_RESULT_TIME,
+            FETCH_RESULT_TIME,
+            WRITE_RESULT_TIME,
+            DORIS_VERSION,
+            IS_NEREIDS,
+            IS_PIPELINE,
+            IS_CACHED,
+            TOTAL_INSTANCES_NUM,
+            INSTANCES_NUM_PER_BE,
+            PARALLEL_FRAGMENT_EXEC_INSTANCE,
+            TRACE_ID);
 
     // Ident of each item. Default is 0, which doesn't need to present in this 
Map.
     // Please set this map for new profile items if they need ident.
-    public static ImmutableMap<String, Integer> 
EXECUTION_SUMMARY_KEYS_IDENTATION = ImmutableMap.of(
-            JOIN_REORDER_TIME, 1,
-            CREATE_SINGLE_NODE_TIME, 1,
-            QUERY_DISTRIBUTED_TIME, 1,
-            INIT_SCAN_NODE_TIME, 1,
-            FINALIZE_SCAN_NODE_TIME, 1,
-            GET_SPLITS_TIME, 2,
-            GET_PARTITIONS_TIME, 3,
-            GET_PARTITION_FILES_TIME, 3,
-            CREATE_SCAN_RANGE_TIME, 2
-    );
+    public static ImmutableMap<String, Integer> 
EXECUTION_SUMMARY_KEYS_IDENTATION
+            = ImmutableMap.<String, Integer>builder()
+            .put(JOIN_REORDER_TIME, 1)
+            .put(CREATE_SINGLE_NODE_TIME, 1)
+            .put(QUERY_DISTRIBUTED_TIME, 1)
+            .put(INIT_SCAN_NODE_TIME, 1)
+            .put(FINALIZE_SCAN_NODE_TIME, 1)
+            .put(GET_SPLITS_TIME, 2)
+            .put(GET_PARTITIONS_TIME, 3)
+            .put(GET_PARTITION_FILES_TIME, 3)
+            .put(CREATE_SCAN_RANGE_TIME, 2)
+            .put(FETCH_RESULT_TIME, 1)
+            .put(WRITE_RESULT_TIME, 1)
+            .put(ASSIGN_FRAGMENT_TIME, 1)
+            .put(FRAGMENT_SERIALIZE_TIME, 1)
+            .put(SEND_FRAGMENT_PHASE1_TIME, 1)
+            .put(SEND_FRAGMENT_PHASE2_TIME, 1)
+            .put(FRAGMENT_COMPRESSED_SIZE, 1)
+            .put(FRAGMENT_RPC_COUNT, 1)
+            .build();
 
     private RuntimeProfile summaryProfile;
     private RuntimeProfile executionSummaryProfile;
@@ -136,6 +181,12 @@ public class SummaryProfile {
     private long createScanRangeFinishTime = -1;
     // Plan end time
     private long queryPlanFinishTime = -1;
+    private long assignFragmentTime = -1;
+    private long fragmentSerializeTime = -1;
+    private long fragmentSendPhase1Time = -1;
+    private long fragmentSendPhase2Time = -1;
+    private long fragmentCompressedSize = 0;
+    private long fragmentRpcCount = 0;
     // Fragment schedule and send end time
     private long queryScheduleFinishTime = -1;
     // Query result fetch end time
@@ -207,23 +258,47 @@ public class SummaryProfile {
         executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, 
getPrettyNereidsRewriteTime());
         executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, 
getPrettyNereidsOptimizeTime());
         executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, 
getPrettyNereidsTranslateTime());
-        executionSummaryProfile.addInfoString(ANALYSIS_TIME, 
getPrettyQueryAnalysisFinishTime());
-        executionSummaryProfile.addInfoString(PLAN_TIME, 
getPrettyQueryPlanFinishTime());
-        executionSummaryProfile.addInfoString(JOIN_REORDER_TIME, 
getPrettyQueryJoinReorderFinishTime());
-        executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME, 
getPrettyCreateSingleNodeFinishTime());
-        executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME, 
getPrettyQueryDistributedFinishTime());
-        executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME, 
getPrettyInitScanNodeTime());
-        executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME, 
getPrettyFinalizeScanNodeTime());
-        executionSummaryProfile.addInfoString(GET_SPLITS_TIME, 
getPrettyGetSplitsTime());
-        executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME, 
getPrettyGetPartitionsTime());
-        executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME, 
getPrettyGetPartitionFilesTime());
-        executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME, 
getPrettyCreateScanRangeTime());
-        executionSummaryProfile.addInfoString(SCHEDULE_TIME, 
getPrettyQueryScheduleFinishTime());
+        executionSummaryProfile.addInfoString(ANALYSIS_TIME,
+                getPrettyTime(queryAnalysisFinishTime, queryBeginTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(PLAN_TIME,
+                getPrettyTime(queryPlanFinishTime, queryAnalysisFinishTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(JOIN_REORDER_TIME,
+                getPrettyTime(queryJoinReorderFinishTime, 
queryAnalysisFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(CREATE_SINGLE_NODE_TIME,
+                getPrettyTime(queryCreateSingleNodeFinishTime, 
queryJoinReorderFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(QUERY_DISTRIBUTED_TIME,
+                getPrettyTime(queryDistributedFinishTime, 
queryCreateSingleNodeFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(INIT_SCAN_NODE_TIME,
+                getPrettyTime(initScanNodeFinishTime, initScanNodeStartTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(FINALIZE_SCAN_NODE_TIME,
+                getPrettyTime(finalizeScanNodeFinishTime, 
finalizeScanNodeStartTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(GET_SPLITS_TIME,
+                getPrettyTime(getSplitsFinishTime, getSplitsStartTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(GET_PARTITIONS_TIME,
+                getPrettyTime(getPartitionsFinishTime, getSplitsStartTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(GET_PARTITION_FILES_TIME,
+                getPrettyTime(getPartitionFilesFinishTime, 
getPartitionsFinishTime, TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(CREATE_SCAN_RANGE_TIME,
+                getPrettyTime(createScanRangeFinishTime, getSplitsFinishTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SCHEDULE_TIME,
+                getPrettyTime(queryScheduleFinishTime, queryPlanFinishTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(ASSIGN_FRAGMENT_TIME,
+                getPrettyTime(assignFragmentTime, queryPlanFinishTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(FRAGMENT_SERIALIZE_TIME,
+                getPrettyTime(fragmentSerializeTime, assignFragmentTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE1_TIME,
+                getPrettyTime(fragmentSendPhase1Time, fragmentSerializeTime, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(SEND_FRAGMENT_PHASE2_TIME,
+                getPrettyTime(fragmentSendPhase2Time, fragmentSendPhase1Time, 
TUnit.TIME_MS));
+        executionSummaryProfile.addInfoString(FRAGMENT_COMPRESSED_SIZE,
+                RuntimeProfile.printCounter(fragmentCompressedSize, 
TUnit.BYTES));
+        executionSummaryProfile.addInfoString(FRAGMENT_RPC_COUNT, "" + 
fragmentRpcCount);
+        executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME,
+                getPrettyTime(queryFetchResultFinishTime, 
queryScheduleFinishTime, TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(FETCH_RESULT_TIME,
                 RuntimeProfile.printCounter(queryFetchResultConsumeTime, 
TUnit.TIME_MS));
         executionSummaryProfile.addInfoString(WRITE_RESULT_TIME,
                 RuntimeProfile.printCounter(queryWriteResultConsumeTime, 
TUnit.TIME_MS));
-        executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, 
getPrettyQueryFetchResultFinishTime());
     }
 
     public void setParseSqlStartTime(long parseSqlStartTime) {
@@ -330,8 +405,28 @@ public class SummaryProfile {
         this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - 
tempStarTime;
     }
 
-    public long getQueryBeginTime() {
-        return queryBeginTime;
+    public void setAssignFragmentTime() {
+        this.assignFragmentTime = TimeUtils.getStartTimeMs();
+    }
+
+    public void setFragmentSerializeTime() {
+        this.fragmentSerializeTime = TimeUtils.getStartTimeMs();
+    }
+
+    public void setFragmentSendPhase1Time() {
+        this.fragmentSendPhase1Time = TimeUtils.getStartTimeMs();
+    }
+
+    public void setFragmentSendPhase2Time() {
+        this.fragmentSendPhase2Time = TimeUtils.getStartTimeMs();
+    }
+
+    public void updateFragmentCompressedSize(long size) {
+        this.fragmentCompressedSize += size;
+    }
+
+    public void updateFragmentRpcCount(long count) {
+        this.fragmentRpcCount += count;
     }
 
     public static class SummaryBuilder {
@@ -433,128 +528,29 @@ public class SummaryProfile {
     }
 
     public String getPrettyParseSqlTime() {
-        if (parseSqlStartTime == -1 || parseSqlFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(parseSqlFinishTime - 
parseSqlStartTime, TUnit.TIME_MS);
+        return getPrettyTime(parseSqlStartTime, parseSqlFinishTime, 
TUnit.TIME_MS);
     }
 
     public String getPrettyNereidsAnalysisTime() {
-        if (nereidsAnalysisFinishTime == -1 || queryAnalysisFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(nereidsAnalysisFinishTime - 
queryBeginTime, TUnit.TIME_MS);
+        return getPrettyTime(nereidsAnalysisFinishTime, queryBeginTime, 
TUnit.TIME_MS);
     }
 
     public String getPrettyNereidsRewriteTime() {
-        if (nereidsRewriteFinishTime == -1 || nereidsAnalysisFinishTime == -1) 
{
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(nereidsRewriteFinishTime - 
nereidsAnalysisFinishTime, TUnit.TIME_MS);
+        return getPrettyTime(nereidsRewriteFinishTime, 
nereidsAnalysisFinishTime, TUnit.TIME_MS);
     }
 
     public String getPrettyNereidsOptimizeTime() {
-        if (nereidsOptimizeFinishTime == -1 || nereidsRewriteFinishTime == -1) 
{
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(nereidsOptimizeFinishTime - 
nereidsRewriteFinishTime, TUnit.TIME_MS);
+        return getPrettyTime(nereidsOptimizeFinishTime, 
nereidsRewriteFinishTime, TUnit.TIME_MS);
     }
 
     public String getPrettyNereidsTranslateTime() {
-        if (nereidsTranslateFinishTime == -1 || nereidsOptimizeFinishTime == 
-1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(nereidsTranslateFinishTime - 
nereidsOptimizeFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyQueryAnalysisFinishTime() {
-        if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryAnalysisFinishTime - 
queryBeginTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyQueryJoinReorderFinishTime() {
-        if (queryAnalysisFinishTime == -1 || queryJoinReorderFinishTime == -1) 
{
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryJoinReorderFinishTime - 
queryAnalysisFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyCreateSingleNodeFinishTime() {
-        if (queryJoinReorderFinishTime == -1 || 
queryCreateSingleNodeFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryCreateSingleNodeFinishTime - 
queryJoinReorderFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyQueryDistributedFinishTime() {
-        if (queryCreateSingleNodeFinishTime == -1 || 
queryDistributedFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryDistributedFinishTime - 
queryCreateSingleNodeFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyInitScanNodeTime() {
-        if (initScanNodeStartTime == -1 || initScanNodeFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(initScanNodeFinishTime - 
initScanNodeStartTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyFinalizeScanNodeTime() {
-        if (finalizeScanNodeFinishTime == -1 || finalizeScanNodeStartTime == 
-1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(finalizeScanNodeFinishTime - 
finalizeScanNodeStartTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyGetSplitsTime() {
-        if (getSplitsFinishTime == -1 || getSplitsStartTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(getSplitsFinishTime - 
getSplitsStartTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyGetPartitionsTime() {
-        if (getSplitsStartTime == -1 || getPartitionsFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(getPartitionsFinishTime - 
getSplitsStartTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyGetPartitionFilesTime() {
-        if (getPartitionsFinishTime == -1 || getPartitionFilesFinishTime == 
-1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(getPartitionFilesFinishTime - 
getPartitionsFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyCreateScanRangeTime() {
-        if (getSplitsFinishTime == -1 || createScanRangeFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(createScanRangeFinishTime - 
getSplitsFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyQueryPlanFinishTime() {
-        if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryPlanFinishTime - 
queryAnalysisFinishTime, TUnit.TIME_MS);
-    }
-
-    private String getPrettyQueryScheduleFinishTime() {
-        if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) {
-            return "N/A";
-        }
-        return RuntimeProfile.printCounter(queryScheduleFinishTime - 
queryPlanFinishTime, TUnit.TIME_MS);
+        return getPrettyTime(nereidsTranslateFinishTime, 
nereidsOptimizeFinishTime, TUnit.TIME_MS);
     }
 
-    private String getPrettyQueryFetchResultFinishTime() {
-        if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) 
{
+    private String getPrettyTime(long end, long start, TUnit unit) {
+        if (start == -1 || end == -1) {
             return "N/A";
         }
-        return RuntimeProfile.printCounter(queryFetchResultFinishTime - 
queryScheduleFinishTime, TUnit.TIME_MS);
+        return RuntimeProfile.printCounter(end - start, unit);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index fef91f6f022..a4d8186d1df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -90,8 +90,7 @@ public class BrokerUtil {
                     brokerDesc.getName(), brokerDesc.getStorageType(), 
brokerDesc.getProperties());
             Status st = fileSystem.list(path, rfiles, false);
             if (!st.ok()) {
-                throw new UserException(brokerDesc.getName() + " list path 
failed. path=" + path
-                        + ",msg=" + st.getErrMsg());
+                throw new UserException(st.getErrMsg());
             }
         } catch (Exception e) {
             LOG.warn("{} list path exception, path={}", brokerDesc.getName(), 
path, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b00854a84ea..bb546d7cb2b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.Status;
 import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListUtil;
 import org.apache.doris.common.util.RuntimeProfile;
@@ -126,11 +127,14 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
 import org.jetbrains.annotations.NotNull;
 
 import java.security.SecureRandom;
@@ -150,6 +154,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
@@ -715,6 +720,7 @@ public class Coordinator implements CoordInterface {
             LOG.info("dispatch load job: {} to {}", 
DebugUtil.printId(queryId), addressToBackendID.keySet());
         }
 
+        updateProfileIfPresent(profile -> profile.setAssignFragmentTime());
         if (enablePipelineEngine) {
             sendPipelineCtx();
         } else {
@@ -992,35 +998,44 @@ public class Coordinator implements CoordInterface {
             }
 
             // 4. send and wait fragments rpc
+            // 4.1 serialize fragment
+            // unsetFields() must be called serially.
+            beToPipelineExecCtxs.values().stream().forEach(ctxs -> 
ctxs.unsetFields());
+            // serializeFragments() can be called in parallel.
+            final AtomicLong compressedSize = new AtomicLong(0);
+            beToPipelineExecCtxs.values().parallelStream().forEach(ctxs -> {
+                try {
+                    compressedSize.addAndGet(ctxs.serializeFragments());
+                } catch (TException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+            updateProfileIfPresent(profile -> 
profile.updateFragmentCompressedSize(compressedSize.get()));
+            updateProfileIfPresent(profile -> 
profile.setFragmentSerializeTime());
+
+            // 4.2 send fragments rpc
             List<Triple<PipelineExecContexts, BackendServiceProxy, 
Future<InternalService.PExecPlanFragmentResult>>>
                     futures = Lists.newArrayList();
-
+            BackendServiceProxy proxy = BackendServiceProxy.getInstance();
             for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
                 if (LOG.isDebugEnabled()) {
-                    String infos = "";
-                    for (PipelineExecContext pec : ctxs.ctxs) {
-                        infos += pec.fragmentId + " ";
-                    }
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("query {}, sending pipeline fragments: {} to 
be {} bprc address {}",
-                                DebugUtil.printId(queryId), infos, ctxs.beId, 
ctxs.brpcAddr.toString());
-                    }
+                    LOG.debug(ctxs.debugInfo());
                 }
-
-                ctxs.unsetFields();
-                BackendServiceProxy proxy = BackendServiceProxy.getInstance();
                 futures.add(ImmutableTriple.of(ctxs, proxy, 
ctxs.execRemoteFragmentsAsync(proxy)));
             }
             waitPipelineRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send fragments");
+            updateProfileIfPresent(profile -> 
profile.updateFragmentRpcCount(futures.size()));
+            updateProfileIfPresent(profile -> 
profile.setFragmentSendPhase1Time());
 
             if (twoPhaseExecution) {
                 // 5. send and wait execution start rpc
                 futures.clear();
                 for (PipelineExecContexts ctxs : 
beToPipelineExecCtxs.values()) {
-                    BackendServiceProxy proxy = 
BackendServiceProxy.getInstance();
                     futures.add(ImmutableTriple.of(ctxs, proxy, 
ctxs.execPlanFragmentStartAsync(proxy)));
                 }
                 waitPipelineRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
+                updateProfileIfPresent(profile -> 
profile.updateFragmentRpcCount(futures.size()));
+                updateProfileIfPresent(profile -> 
profile.setFragmentSendPhase2Time());
             }
         } finally {
             unlock();
@@ -3552,6 +3567,7 @@ public class Coordinator implements CoordInterface {
         List<PipelineExecContext> ctxs = Lists.newArrayList();
         boolean twoPhaseExecution = false;
         int instanceNumber;
+        ByteString serializedFragments = null;
 
         public PipelineExecContexts(long beId, TNetworkAddress brpcAddr, 
boolean twoPhaseExecution,
                 int instanceNumber) {
@@ -3585,15 +3601,10 @@ public class Coordinator implements CoordInterface {
             }
         }
 
-        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync(BackendServiceProxy proxy)
-                throws TException {
+        public Future<InternalService.PExecPlanFragmentResult> 
execRemoteFragmentsAsync(BackendServiceProxy proxy) {
+            Preconditions.checkNotNull(serializedFragments);
             try {
-                TPipelineFragmentParamsList paramsList = new 
TPipelineFragmentParamsList();
-                for (PipelineExecContext cts : ctxs) {
-                    cts.initiated = true;
-                    paramsList.addToParamsList(cts.rpcParams);
-                }
-                return proxy.execPlanFragmentsAsync(brpcAddr, paramsList, 
twoPhaseExecution);
+                return proxy.execPlanFragmentsAsync(brpcAddr, 
serializedFragments, twoPhaseExecution);
             } catch (RpcException e) {
                 // DO NOT throw exception here, return a complete future with 
error code,
                 // so that the following logic will cancel the fragment.
@@ -3647,6 +3658,26 @@ public class Coordinator implements CoordInterface {
                 }
             };
         }
+
+        public long serializeFragments() throws TException {
+            TPipelineFragmentParamsList paramsList = new 
TPipelineFragmentParamsList();
+            for (PipelineExecContext cts : ctxs) {
+                cts.initiated = true;
+                paramsList.addToParamsList(cts.rpcParams);
+            }
+            serializedFragments = ByteString.copyFrom(
+                    new TSerializer(new 
TCompactProtocol.Factory()).serialize(paramsList));
+            return serializedFragments.size();
+        }
+
+        public String debugInfo() {
+            String infos = "";
+            for (PipelineExecContext pec : ctxs) {
+                infos += pec.fragmentId + " ";
+            }
+            return String.format("query %s, sending pipeline fragments: %s to 
be %s bprc address %s",
+                    DebugUtil.printId(queryId), infos, beId, 
brpcAddr.toString());
+        }
     }
 
     // execution parameters for a single fragment,
@@ -4063,5 +4094,12 @@ public class Coordinator implements CoordInterface {
             this.targetFragmentInstanceAddr = host;
         }
     }
+
+    private void updateProfileIfPresent(Consumer<SummaryProfile> 
profileAction) {
+        Optional.ofNullable(context)
+                .map(ConnectContext::getExecutor)
+                .map(StmtExecutor::getSummaryProfile)
+                .ifPresent(profileAction);
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index b07447a0e91..0a0adc17f77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -252,6 +252,7 @@ public class StmtExecutor {
     public StmtExecutor(ConnectContext context, OriginStatement originStmt, 
boolean isProxy) {
         
Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
         this.context = context;
+        this.context.setExecutor(this);
         this.originStmt = originStmt;
         this.serializer = context.getMysqlChannel().getSerializer();
         this.isProxy = isProxy;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 5a89614bab7..eadca4b3b85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -198,8 +198,23 @@ public class BackendServiceProxy {
         }
         // VERSION 3 means we send TPipelineFragmentParamsList
         builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3);
+        return execPlanFragmentsAsync(address, builder.build(), 
twoPhaseExecution);
+    }
 
-        final InternalService.PExecPlanFragmentRequest pRequest = 
builder.build();
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentsAsync(TNetworkAddress address,
+            ByteString serializedFragments, boolean twoPhaseExecution) throws 
RpcException {
+        InternalService.PExecPlanFragmentRequest.Builder builder =
+                InternalService.PExecPlanFragmentRequest.newBuilder();
+        builder.setRequest(serializedFragments);
+        builder.setCompact(true);
+        // VERSION 3 means we send TPipelineFragmentParamsList
+        builder.setVersion(InternalService.PFragmentRequestVersion.VERSION_3);
+        return execPlanFragmentsAsync(address, builder.build(), 
twoPhaseExecution);
+    }
+
+    public Future<InternalService.PExecPlanFragmentResult> 
execPlanFragmentsAsync(TNetworkAddress address,
+            InternalService.PExecPlanFragmentRequest pRequest, boolean 
twoPhaseExecution)
+            throws RpcException {
         
MetricRepo.BE_COUNTER_QUERY_RPC_ALL.getOrAdd(address.hostname).increase(1L);
         
MetricRepo.BE_COUNTER_QUERY_RPC_SIZE.getOrAdd(address.hostname).increase((long) 
pRequest.getSerializedSize());
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 451b37d1311..260f7b2df44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -168,7 +168,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         try {
             BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
         } catch (UserException e) {
-            throw new AnalysisException("parse file failed, path = " + path, 
e);
+            throw new AnalysisException("parse file failed, err: " + 
e.getMessage(), e);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to