This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a97af1a08c1 [bugfix](profile) support multi execution profile for
brokerload (#32280)
a97af1a08c1 is described below
commit a97af1a08c11cff32b4fc0bba51a7ca1375ce386
Author: yiguolei <[email protected]>
AuthorDate: Fri Mar 15 17:41:47 2024 +0800
[bugfix](profile) support multi execution profile for brokerload (#32280)
The bug is introduced by #27184
Profile Format is :
Summary
MergedProfile
ExecutionProfile1
ExecutionProfile2
...
There maybe multiple execution profiles for broker load.
---
.../doris/common/profile/AggregatedProfile.java | 41 ----------------------
.../org/apache/doris/common/profile/Profile.java | 40 ++++++++++++---------
.../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +-
.../commands/insert/AbstractInsertExecutor.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 6 ++--
5 files changed, 28 insertions(+), 63 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
deleted file mode 100644
index f8481b21e65..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.profile;
-
-import org.apache.doris.common.util.RuntimeProfile;
-
-import java.util.Map;
-
-/**
-* AggregatedProfile is part of a query profile.
-* It contains the aggregated information of a query.
-*/
-public class AggregatedProfile {
-
- public static final String PROFILE_NAME = "MergedProfile";
- private ExecutionProfile executionProfile;
-
- public AggregatedProfile(RuntimeProfile rootProfile, ExecutionProfile
executionProfile) {
- this.executionProfile = executionProfile;
- }
-
- public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String>
planNodeMap) {
- return executionProfile.getAggregatedFragmentsProfile(planNodeMap);
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 19a51a1aaa6..5f3ed601630 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -21,11 +21,13 @@ import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.Planner;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.List;
import java.util.Map;
/**
@@ -33,20 +35,23 @@ import java.util.Map;
* following structure: root profile: // summary of this profile, such as start
* time, end time, query id, etc. [SummaryProfile] // each execution profile is
* a complete execution of a query, a job may contain multiple queries.
- * [List<ExecutionProfile>]
+ * [List<ExecutionProfile>].
+ * There maybe multi execution profiles for one job, for example broker load
job.
+ * It will create one execution profile for every single load task.
*
* SummaryProfile: Summary: Execution Summary:
*
*
- * ExecutionProfile: Fragment 0: Fragment 1: ...
+ * ExecutionProfile1: Fragment 0: Fragment 1: ...
+ * ExecutionProfile2: Fragment 0: Fragment 1: ...
+ *
*/
public class Profile {
private static final Logger LOG = LogManager.getLogger(Profile.class);
private static final int MergedProfileLevel = 1;
private RuntimeProfile rootProfile;
private SummaryProfile summaryProfile;
- private AggregatedProfile aggregatedProfile;
- private ExecutionProfile executionProfile;
+ private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
private boolean isFinished;
private Map<Integer, String> planNodeMap;
@@ -59,14 +64,13 @@ public class Profile {
this.isFinished = !isEnable;
}
- public void setExecutionProfile(ExecutionProfile executionProfile) {
+ public void addExecutionProfile(ExecutionProfile executionProfile) {
if (executionProfile == null) {
LOG.warn("try to set a null excecution profile, it is abnormal",
new Exception());
return;
}
- this.executionProfile = executionProfile;
- this.executionProfile.addToProfileAsChild(rootProfile);
- this.aggregatedProfile = new AggregatedProfile(rootProfile,
executionProfile);
+ this.executionProfiles.add(executionProfile);
+ executionProfile.addToProfileAsChild(rootProfile);
}
public synchronized void update(long startTime, Map<String, String>
summaryInfo, boolean isFinished,
@@ -75,12 +79,10 @@ public class Profile {
if (this.isFinished) {
return;
}
- if (executionProfile == null) {
- // Sometimes execution profile is not set
- return;
- }
summaryProfile.update(summaryInfo);
- executionProfile.update(startTime, isFinished);
+ for (ExecutionProfile executionProfile : executionProfiles) {
+ executionProfile.update(startTime, isFinished);
+ }
rootProfile.computeTimeInProfile();
// Nerids native insert not set planner, so it is null
if (planner != null) {
@@ -109,18 +111,22 @@ public class Profile {
// add summary to builder
summaryProfile.prettyPrint(builder);
LOG.info(builder.toString());
- if (this.profileLevel == MergedProfileLevel) {
+ // Only generate merged profile for select, insert into select.
+ // Not support broker load now.
+ if (this.profileLevel == MergedProfileLevel &&
this.executionProfiles.size() == 1) {
try {
builder.append("\n MergedProfile \n");
-
aggregatedProfile.getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
" ");
+
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
" ");
} catch (Throwable aggProfileException) {
LOG.warn("build merged simple profile failed",
aggProfileException);
builder.append("build merged simple profile failed");
}
}
try {
- builder.append("\n");
- executionProfile.getExecutionProfile().prettyPrint(builder, "");
+ for (ExecutionProfile executionProfile : executionProfiles) {
+ builder.append("\n");
+ executionProfile.getExecutionProfile().prettyPrint(builder,
"");
+ }
} catch (Throwable aggProfileException) {
LOG.warn("build profile failed", aggProfileException);
builder.append("build profile failed");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 802c52fd144..eef73542ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -145,7 +145,7 @@ public class LoadLoadingTask extends LoadTask {
loadId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes(),
planner.getTimezone(), loadZeroTolerance);
if (this.jobProfile != null) {
-
this.jobProfile.setExecutionProfile(curCoordinator.getExecutionProfile());
+
this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile());
}
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 699aa084269..77eeef22ebf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -123,7 +123,7 @@ public abstract class AbstractInsertExecutor {
String queryId = DebugUtil.printId(ctx.queryId());
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
-
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
+
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
executor.getOriginStmtInString(), coordinator);
QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 05b962b89ad..d88663ad6fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1596,7 +1596,7 @@ public class StmtExecutor {
planner, context.getStatsErrorEstimator());
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
- profile.setExecutionProfile(coord.getExecutionProfile());
+ profile.addExecutionProfile(coord.getExecutionProfile());
coordBase = coord;
}
@@ -2030,7 +2030,7 @@ public class StmtExecutor {
planner, context.getStatsErrorEstimator());
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
coord.setQueryType(TQueryType.LOAD);
- profile.setExecutionProfile(coord.getExecutionProfile());
+ profile.addExecutionProfile(coord.getExecutionProfile());
QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
this.getOriginStmtInString(), coord);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
queryInfo);
@@ -2869,7 +2869,7 @@ public class StmtExecutor {
RowBatch batch;
coord = EnvFactory.getInstance().createCoordinator(context,
analyzer,
planner, context.getStatsErrorEstimator());
- profile.setExecutionProfile(coord.getExecutionProfile());
+ profile.addExecutionProfile(coord.getExecutionProfile());
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]