This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9a40cba5f0b [feature](profile)Enable merging of incomplete profiles.
(#39560) (#42953)
9a40cba5f0b is described below
commit 9a40cba5f0b28b7f0334a9f1bb1ce2ac3be88cf2
Author: Mryange <[email protected]>
AuthorDate: Wed Nov 13 16:39:45 2024 +0800
[feature](profile)Enable merging of incomplete profiles. (#39560) (#42953)
## Proposed changes
https://github.com/apache/doris/pull/39560
```
java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
at
jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[?:?]
at
jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
~[?:?]
at
jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) ~[?:?]
at java.util.Objects.checkIndex(Objects.java:359) ~[?:?]
at java.util.ArrayList.get(ArrayList.java:427) ~[?:?]
at
org.apache.doris.common.profile.ExecutionProfile.getPipelineAggregatedProfile(ExecutionProfile.java:142)
~[doris-fe.jar:1.2-SNAPSHOT]
```
In the past, we needed to ensure that profiles were complete before
merging. Now, this allows incomplete profiles to be merged, with missing
profiles being marked in the merged profile.
```
- ProjectionTime: avg 0ns, max 0ns,
min 0ns
- RowsProduced: sum 0, avg 0, max 0,
min 0
-
WaitForDependency[SORT_OPERATOR_DEPENDENCY]Time: avg 15min2sec, max
15min2sec, min 15min2sec
Pipeline : 3(miss profile):
Pipeline : 4(instance_num=48):
LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-14):
- CloseTime: avg 0ns, max 0ns, min 0ns
- ExecTime: avg 29.410us, max 43.336us,
min
```
---
.../doris/common/profile/ExecutionProfile.java | 102 +++++++++++++++------
1 file changed, 74 insertions(+), 28 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index a7a05ee12fd..7828a38e6eb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -41,7 +41,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* root is used to collect profile of a complete query plan(including query or
load).
@@ -74,6 +74,7 @@ public class ExecutionProfile {
// use to merge profile from multi be
private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>>
multiBeProfile = null;
+ private ReentrantReadWriteLock multiBeProfileLock = new
ReentrantReadWriteLock();
// Not serialize this property, it is only used to get profile id.
private SummaryProfile summaryProfile;
@@ -97,8 +98,7 @@ public class ExecutionProfile {
RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " +
i);
fragmentProfiles.put(fragmentId, runtimeProfile);
fragmentsProfile.addChild(runtimeProfile);
- multiBeProfile.put(fragmentId,
- new ConcurrentHashMap<TNetworkAddress,
List<RuntimeProfile>>());
+ multiBeProfile.put(fragmentId, Maps.newHashMap());
fragmentIdBeNum.put(fragmentId, 0);
seqNoToFragmentId.put(i, fragmentId);
++i;
@@ -108,24 +108,54 @@ public class ExecutionProfile {
}
private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
- Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline =
multiBeProfile.get(fragmentId);
- List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
- int pipelineSize = 0;
- for (List<RuntimeProfile> profiles : multiPipeline.values()) {
- pipelineSize = profiles.size();
- break;
- }
- for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) {
- List<RuntimeProfile> allPipelineTask = new
ArrayList<RuntimeProfile>();
- for (List<RuntimeProfile> pipelines : multiPipeline.values()) {
- RuntimeProfile pipeline = pipelines.get(pipelineIdx);
- for (Pair<RuntimeProfile, Boolean> runtimeProfile :
pipeline.getChildList()) {
- allPipelineTask.add(runtimeProfile.first);
+ multiBeProfileLock.readLock().lock();
+ try {
+ // A fragment in the BE contains multiple pipelines, and each
pipeline contains
+ // multiple pipeline tasks.
+ Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline =
multiBeProfile.get(fragmentId);
+ List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
+ int pipelineSize = -1;
+ for (TNetworkAddress beAddress : multiPipeline.keySet()) {
+ List<RuntimeProfile> profileSingleBE =
multiPipeline.get(beAddress);
+ // Check that within the same fragment across all BEs, there
should be the same
+ // number of pipelines.
+ if (pipelineSize == -1) {
+ pipelineSize = profileSingleBE.size();
+ } else {
+ if (pipelineSize != profileSingleBE.size()) {
+ LOG.warn("The profile sizes of the two BE are
different, {} vs {}", pipelineSize,
+ profileSingleBE.size());
+ pipelineSize = Math.max(pipelineSize,
profileSingleBE.size());
+ }
+ }
+ }
+ for (int pipelineIdx = 0; pipelineIdx < pipelineSize;
pipelineIdx++) {
+ List<RuntimeProfile> allPipelineTask = new
ArrayList<RuntimeProfile>();
+ for (List<RuntimeProfile> profileSingleBE :
multiPipeline.values()) {
+ RuntimeProfile pipeline = profileSingleBE.get(pipelineIdx);
+ for (Pair<RuntimeProfile, Boolean> pipelineTaskProfile :
pipeline.getChildList()) {
+ allPipelineTask.add(pipelineTaskProfile.first);
+ }
}
+ if (allPipelineTask.isEmpty()) {
+ LOG.warn("None of the BEs have pipeline task profiles in
fragmentId:{} , pipelineIdx:{}",
+ fragmentId, pipelineIdx);
+ }
+ allPipelines.add(allPipelineTask);
}
- allPipelines.add(allPipelineTask);
+ return allPipelines;
+ } finally {
+ multiBeProfileLock.readLock().unlock();
+ }
+ }
+
+ void setMultiBeProfile(int fragmentId, TNetworkAddress backendHBAddress,
List<RuntimeProfile> taskProfile) {
+ multiBeProfileLock.writeLock().lock();
+ try {
+ multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
+ } finally {
+ multiBeProfileLock.writeLock().unlock();
}
- return allPipelines;
}
private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String>
planNodeMap) {
@@ -136,11 +166,20 @@ public class ExecutionProfile {
List<List<RuntimeProfile>> allPipelines =
getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
for (List<RuntimeProfile> allPipelineTask : allPipelines) {
- RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
- "Pipeline : " + pipelineIdx + "(instance_num="
- + allPipelineTask.size() + ")",
- allPipelineTask.get(0).nodeId());
- RuntimeProfile.mergeProfiles(allPipelineTask,
mergedpipelineProfile, planNodeMap);
+ RuntimeProfile mergedpipelineProfile = null;
+ if (allPipelineTask.isEmpty()) {
+ // It is possible that the profile collection may be
incomplete, so only part of
+ // the profile will be merged here.
+ mergedpipelineProfile = new RuntimeProfile(
+ "Pipeline : " + pipelineIdx + "(miss profile)",
+ -pipelineIdx);
+ } else {
+ mergedpipelineProfile = new RuntimeProfile(
+ "Pipeline : " + pipelineIdx + "(instance_num="
+ + allPipelineTask.size() + ")",
+ allPipelineTask.get(0).nodeId());
+ RuntimeProfile.mergeProfiles(allPipelineTask,
mergedpipelineProfile, planNodeMap);
+ }
newFragmentProfile.addChild(mergedpipelineProfile);
pipelineIdx++;
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
@@ -208,7 +247,11 @@ public class ExecutionProfile {
pipelineIdx++;
}
RuntimeProfile profileNode = new RuntimeProfile(name);
- taskProfile.add(profileNode);
+ // The taskprofile is used to save the profile of the
pipeline, without
+ // considering the FragmentLevel.
+ if (!(pipelineProfile.isSetIsFragmentLevel() &&
pipelineProfile.is_fragment_level)) {
+ taskProfile.add(profileNode);
+ }
if (!pipelineProfile.isSetProfile()) {
LOG.warn("Profile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT, "Profile
is not set");
@@ -218,7 +261,7 @@ public class ExecutionProfile {
profileNode.setIsDone(isDone);
fragmentProfiles.get(fragmentId).addChild(profileNode);
}
- multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
+ setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
if (profile.isSetLoadChannelProfiles()) {
@@ -256,7 +299,11 @@ public class ExecutionProfile {
pipelineIdx++;
}
RuntimeProfile profile = new RuntimeProfile(name);
- taskProfile.add(profile);
+ // The taskprofile is used to save the profile of the pipeline,
without
+ // considering the FragmentLevel.
+ if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
+ taskProfile.add(profile);
+ }
if (param.isSetProfile()) {
profile.update(param.profile);
}
@@ -271,8 +318,7 @@ public class ExecutionProfile {
if (params.isSetLoadChannelProfile()) {
loadChannelProfile.update(params.loadChannelProfile);
}
-
-
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(),
taskProfile);
+ setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(),
taskProfile);
}
public synchronized void addFragmentBackend(PlanFragmentId fragmentId,
Long backendId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]