This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.1-tmp by this push:
new 16f8afc4083 [refactor](coordinator) split profile logic and instance
report logic (#32010)
16f8afc4083 is described below
commit 16f8afc4083d84f3ce4a4317e2a558b907ff535d
Author: yiguolei <[email protected]>
AuthorDate: Wed Apr 3 18:48:19 2024 +0800
[refactor](coordinator) split profile logic and instance report logic
(#32010)
Co-authored-by: yiguolei <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 1 +
.../main/java/org/apache/doris/common/Status.java | 5 +
.../doris/common/profile/ExecutionProfile.java | 309 +++++------
.../org/apache/doris/common/profile/Profile.java | 80 ++-
.../doris/common/profile/SummaryProfile.java | 16 +-
.../apache/doris/common/util/ProfileManager.java | 82 ++-
.../apache/doris/common/util/RuntimeProfile.java | 34 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 21 +-
.../apache/doris/load/loadv2/LoadLoadingTask.java | 16 +-
.../nereids/trees/plans/commands/LoadCommand.java | 4 +-
.../java/org/apache/doris/qe/CoordInterface.java | 2 -
.../main/java/org/apache/doris/qe/Coordinator.java | 603 +++++++++++----------
.../java/org/apache/doris/qe/PointQueryExec.java | 6 -
.../main/java/org/apache/doris/qe/QeProcessor.java | 2 -
.../java/org/apache/doris/qe/QeProcessorImpl.java | 79 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 9 +
.../java/org/apache/doris/qe/StmtExecutor.java | 51 +-
.../org/apache/doris/rpc/BackendServiceClient.java | 3 +-
.../org/apache/doris/rpc/BackendServiceProxy.java | 7 +-
.../doris/common/util/RuntimeProfileTest.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutorTest.java | 4 +-
21 files changed, 715 insertions(+), 621 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5a6bdf1b6d5..a727c23bb74 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2520,6 +2520,7 @@ public class Config extends ConfigBase {
"Whether to enable proxy protocol"
})
public static boolean enable_proxy_protocol = false;
+ public static int profile_async_collect_expire_time_secs = 5;
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 1961f9b8cc5..555a82751ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -128,4 +128,9 @@ public class Status {
}
}
}
+
+ @Override
+ public String toString() {
+ return "Status [errorCode=" + errorCode + ", errorMsg=" + errorMsg +
"]";
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 24bd2355c56..f339da82924 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -17,33 +17,31 @@
package org.apache.doris.common.profile;
-import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUnit;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * ExecutionProfile is used to collect profile of a complete query
plan(including query or load).
+ * root is used to collect profile of a complete query plan(including query or
load).
* Need to call addToProfileAsChild() to add it to the root profile.
* It has the following structure:
* Execution Profile:
@@ -59,53 +57,62 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ExecutionProfile {
private static final Logger LOG =
LogManager.getLogger(ExecutionProfile.class);
+ private final TUniqueId queryId;
+ private boolean isFinished = false;
+ private long startTime = 0L;
+ private long queryFinishTime = 0L;
// The root profile of this execution task
- private RuntimeProfile executionProfile;
+ private RuntimeProfile root;
// Profiles for each fragment. And the InstanceProfile is the child of
fragment profile.
// Which will be added to fragment profile when calling
Coordinator::sendFragment()
- private List<RuntimeProfile> fragmentProfiles;
+ // Could not use array list because fragment id is not continuous, planner
may cut fragment
+ // during planning.
+ private Map<Integer, RuntimeProfile> fragmentProfiles;
// Profile for load channels. Only for load job.
private RuntimeProfile loadChannelProfile;
- // A countdown latch to mark the completion of each instance.
- // use for old pipeline
- // instance id -> dummy value
- private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
+ // FragmentId -> InstanceId -> RuntimeProfile
+ private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>>
fragmentInstancesProfiles;
+ private boolean isPipelineXProfile = false;
- // A countdown latch to mark the completion of each fragment. use for
pipelineX
- // fragmentId -> dummy value
- private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
-
- // fragmentId -> The number of BE without 'done.
- private Map<Integer, Integer> befragmentDone;
+ // use to merge profile from multi be
+ private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>>
multiBeProfile = null;
- // lock befragmentDone
- private ReadWriteLock lock;
+ // Not serialize this property, it is only used to get profile id.
+ private SummaryProfile summaryProfile;
- // use to merge profile from multi be
- private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile =
null;
+ // BE only has instance id, does not have fragmentid, so should use this
map to find fragmentid.
+ private Map<TUniqueId, PlanFragmentId> instanceIdToFragmentId;
+ private Map<Integer, Integer> fragmentIdBeNum;
+ private Map<Integer, Integer> seqNoToFragmentId;
- public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
- executionProfile = new RuntimeProfile("Execution Profile " +
DebugUtil.printId(queryId));
+ public ExecutionProfile(TUniqueId queryId, List<PlanFragment> fragments) {
+ this.queryId = queryId;
+ root = new RuntimeProfile("Execution Profile " +
DebugUtil.printId(queryId));
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
- executionProfile.addChild(fragmentsProfile);
- fragmentProfiles = Lists.newArrayList();
- multiBeProfile = Lists.newArrayList();
- for (int i = 0; i < fragmentNum; i++) {
- fragmentProfiles.add(new RuntimeProfile("Fragment " + i));
- fragmentsProfile.addChild(fragmentProfiles.get(i));
- multiBeProfile.add(new ConcurrentHashMap<TNetworkAddress,
List<RuntimeProfile>>());
+ root.addChild(fragmentsProfile);
+ fragmentProfiles = Maps.newHashMap();
+ multiBeProfile = Maps.newHashMap();
+ fragmentIdBeNum = Maps.newHashMap();
+ seqNoToFragmentId = Maps.newHashMap();
+ int i = 0;
+ for (PlanFragment planFragment : fragments) {
+ RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " +
i);
+ fragmentProfiles.put(planFragment.getFragmentId().asInt(),
runtimeProfile);
+ fragmentsProfile.addChild(runtimeProfile);
+ multiBeProfile.put(planFragment.getFragmentId().asInt(),
+ new ConcurrentHashMap<TNetworkAddress,
List<RuntimeProfile>>());
+ fragmentIdBeNum.put(planFragment.getFragmentId().asInt(), 0);
+ seqNoToFragmentId.put(i, planFragment.getFragmentId().asInt());
+ ++i;
}
loadChannelProfile = new RuntimeProfile("LoadChannels");
- executionProfile.addChild(loadChannelProfile);
+ root.addChild(loadChannelProfile);
+ fragmentInstancesProfiles = Maps.newHashMap();
+ instanceIdToFragmentId = Maps.newHashMap();
}
- public void addMultiBeProfileByPipelineX(int profileFragmentId,
TNetworkAddress address,
- List<RuntimeProfile> taskProfile) {
- multiBeProfile.get(profileFragmentId).put(address, taskProfile);
- }
-
- private List<List<RuntimeProfile>> getMultiBeProfile(int
profileFragmentId) {
- Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline =
multiBeProfile.get(profileFragmentId);
+ private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
+ Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline =
multiBeProfile.get(fragmentId);
List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
int pipelineSize = 0;
for (List<RuntimeProfile> profiles : multiPipeline.values()) {
@@ -130,7 +137,7 @@ public class ExecutionProfile {
for (int i = 0; i < fragmentProfiles.size(); ++i) {
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment "
+ i);
fragmentsProfile.addChild(newFragmentProfile);
- List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(i);
+ List<List<RuntimeProfile>> allPipelines =
getMultiBeProfile(seqNoToFragmentId.get(i));
int pipelineIdx = 0;
for (List<RuntimeProfile> allPipelineTask : allPipelines) {
RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
@@ -148,7 +155,7 @@ public class ExecutionProfile {
private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer,
String> planNodeMap) {
RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
for (int i = 0; i < fragmentProfiles.size(); ++i) {
- RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
+ RuntimeProfile oldFragmentProfile =
fragmentProfiles.get(seqNoToFragmentId.get(i));
RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment "
+ i);
fragmentsProfile.addChild(newFragmentProfile);
List<RuntimeProfile> allInstanceProfiles = new
ArrayList<RuntimeProfile>();
@@ -164,7 +171,7 @@ public class ExecutionProfile {
}
public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String>
planNodeMap) {
- if (enablePipelineX()) {
+ if (isPipelineXProfile) {
/*
* Fragment 0
* ---Pipeline 0
@@ -211,143 +218,143 @@ public class ExecutionProfile {
}
}
- public RuntimeProfile getExecutionProfile() {
- return executionProfile;
- }
-
- public RuntimeProfile getLoadChannelProfile() {
- return loadChannelProfile;
- }
-
- public List<RuntimeProfile> getFragmentProfiles() {
- return fragmentProfiles;
+ public RuntimeProfile getRoot() {
+ return root;
}
- public void addToProfileAsChild(RuntimeProfile rootProfile) {
- rootProfile.addChild(executionProfile);
+ public void setPipelineX() {
+ this.isPipelineXProfile = true;
}
- public void markInstances(Set<TUniqueId> instanceIds) {
- profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size());
- for (TUniqueId instanceId : instanceIds) {
- profileDoneSignal.addMark(instanceId, -1L /* value is meaningless
*/);
+ // The execution profile is maintained in ProfileManager, if it is
finished, then should
+ // remove it from it as soon as possible.
+ public void update(long startTime, boolean isFinished) {
+ if (this.isFinished) {
+ return;
}
- }
-
- private boolean enablePipelineX() {
- return profileFragmentDoneSignal != null;
- }
-
- public void markFragments(int fragments) {
- profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
- lock = new ReentrantReadWriteLock();
- befragmentDone = new HashMap<>();
- for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
- profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is
meaningless */);
- befragmentDone.put(fragmentId, 0);
+ this.isFinished = isFinished;
+ this.startTime = startTime;
+ if (startTime > 0) {
+ root.getCounterTotalTime().setValue(TUnit.TIME_MS,
TimeUtils.getElapsedTimeMs(startTime));
}
- }
- public void addFragments(int fragmentId) {
- lock.writeLock().lock();
- try {
- befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1);
- } finally {
- lock.writeLock().unlock();
+ for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) {
+ fragmentProfile.sortChildren();
}
}
- public void update(long startTime, boolean isFinished) {
- if (startTime > 0) {
- executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS,
TimeUtils.getElapsedTimeMs(startTime));
- }
- // Wait for all backends to finish reporting when writing profile last
time.
- if (isFinished && profileDoneSignal != null) {
- try {
- profileDoneSignal.await(2, TimeUnit.SECONDS);
- } catch (InterruptedException e1) {
- LOG.warn("signal await error", e1);
+ public void updateProfile(TReportExecStatusParams params, TNetworkAddress
address) {
+ if (isPipelineXProfile) {
+ int pipelineIdx = 0;
+ List<RuntimeProfile> taskProfile = Lists.newArrayList();
+ for (TDetailedReportParams param : params.detailed_report) {
+ String name = "Pipeline :" + pipelineIdx + " "
+ + " (host=" + address + ")";
+ RuntimeProfile profile = new RuntimeProfile(name);
+ taskProfile.add(profile);
+ if (param.isSetProfile()) {
+ profile.update(param.profile);
+ }
+ if (params.done) {
+ profile.setIsDone(true);
+ }
+ pipelineIdx++;
+ fragmentProfiles.get(params.fragment_id).addChild(profile);
}
- }
-
- if (isFinished && profileFragmentDoneSignal != null) {
- try {
- profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
- } catch (InterruptedException e1) {
- LOG.warn("signal await error", e1);
+ // TODO ygl: is this right? there maybe multi Backends, what does
+ // update load profile do???
+ if (params.isSetLoadChannelProfile()) {
+ loadChannelProfile.update(params.loadChannelProfile);
+ }
+ multiBeProfile.get(params.fragment_id).put(address, taskProfile);
+ } else {
+ PlanFragmentId fragmentId =
instanceIdToFragmentId.get(params.fragment_instance_id);
+ if (fragmentId == null) {
+ LOG.warn("Could not find related fragment for instance {}",
+ DebugUtil.printId(params.fragment_instance_id));
+ return;
+ }
+ // Do not use fragment id in params, because non-pipeline engine
will set it to -1
+ Map<TUniqueId, RuntimeProfile> instanceProfiles =
fragmentInstancesProfiles.get(fragmentId);
+ if (instanceProfiles == null) {
+ LOG.warn("Could not find related instances for fragment {}",
fragmentId);
+ return;
+ }
+ RuntimeProfile instanceProfile =
instanceProfiles.get(params.fragment_instance_id);
+ if (instanceProfile == null) {
+ LOG.warn("Could not find related instance {}",
params.fragment_instance_id);
+ return;
+ }
+ if (params.isSetProfile()) {
+ instanceProfile.update(params.profile);
+ }
+ if (params.isSetDone() && params.isDone()) {
+ instanceProfile.setIsDone(true);
+ }
+ if (params.isSetLoadChannelProfile()) {
+ loadChannelProfile.update(params.loadChannelProfile);
}
- }
-
- for (RuntimeProfile fragmentProfile : fragmentProfiles) {
- fragmentProfile.sortChildren();
}
}
- public void onCancel() {
- if (profileDoneSignal != null) {
- // count down to zero to notify all objects waiting for this
- profileDoneSignal.countDownToZero(new Status());
+ // MultiInstances may update the profile concurrently
+ public synchronized void addInstanceProfile(PlanFragmentId fragmentId,
TUniqueId instanceId,
+ RuntimeProfile instanceProfile) {
+ Map<TUniqueId, RuntimeProfile> instanceProfiles =
fragmentInstancesProfiles.get(fragmentId);
+ if (instanceProfiles == null) {
+ instanceProfiles = Maps.newHashMap();
+ fragmentInstancesProfiles.put(fragmentId, instanceProfiles);
}
- if (profileFragmentDoneSignal != null) {
- profileFragmentDoneSignal.countDownToZero(new Status());
+ RuntimeProfile existingInstanceProfile =
instanceProfiles.get(instanceId);
+ if (existingInstanceProfile == null) {
+ instanceProfiles.put(instanceId, instanceProfile);
+ instanceIdToFragmentId.put(instanceId, fragmentId);
+ fragmentProfiles.get(fragmentId.asInt()).addChild(instanceProfile);
+ return;
}
}
- public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
- if (profileDoneSignal != null) {
- if (!profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
- LOG.warn("Mark instance {} done failed",
DebugUtil.printId(fragmentInstanceId));
- }
- }
+ public synchronized void addFragmentBackend(PlanFragmentId fragmentId,
Long backendId) {
+ fragmentIdBeNum.put(fragmentId.asInt(),
fragmentIdBeNum.get(fragmentId.asInt()) + 1);
}
- public void markOneFragmentDone(int fragmentId) {
- if (profileFragmentDoneSignal != null) {
- lock.writeLock().lock();
- try {
- befragmentDone.put(fragmentId, befragmentDone.get(fragmentId)
- 1);
- if (befragmentDone.get(fragmentId) == 0) {
- if (!profileFragmentDoneSignal.markedCountDown(fragmentId,
-1L)) {
- LOG.warn("Mark fragment {} done failed", fragmentId);
- }
- }
- } finally {
- lock.writeLock().unlock();
- }
- }
+ public TUniqueId getQueryId() {
+ return queryId;
}
- public boolean awaitAllInstancesDone(long waitTimeS) throws
InterruptedException {
- if (profileDoneSignal == null) {
- return true;
+ // Check all fragments's child, if all finished, then this execution
profile is finished
+ public boolean isCompleted() {
+ for (Entry<Integer, RuntimeProfile> element :
fragmentProfiles.entrySet()) {
+ RuntimeProfile fragmentProfile = element.getValue();
+ // If any fragment is empty, it means BE does not report the
profile, then the total
+ // execution profile is not completed.
+ if (fragmentProfile.isEmpty()
+ || fragmentProfile.getChildList().size() <
fragmentIdBeNum.get(element.getKey())) {
+ return false;
+ }
+ for (Pair<RuntimeProfile, Boolean> runtimeProfile :
fragmentProfile.getChildList()) {
+ // If any child instance profile is not ready, then return
false.
+ if (!(runtimeProfile.first.getIsDone() ||
runtimeProfile.first.getIsCancel())) {
+ return false;
+ }
+ }
}
- return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+ return true;
}
- public boolean awaitAllFragmentsDone(long waitTimeS) throws
InterruptedException {
- if (profileFragmentDoneSignal == null) {
- return true;
- }
- return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+ public long getQueryFinishTime() {
+ return queryFinishTime;
}
- public boolean isAllInstancesDone() {
- if (profileDoneSignal == null) {
- return true;
- }
- return profileDoneSignal.getCount() == 0;
+ public void setQueryFinishTime(long queryFinishTime) {
+ this.queryFinishTime = queryFinishTime;
}
- public boolean isAllFragmentsDone() {
- if (profileFragmentDoneSignal == null) {
- return true;
- }
- return profileFragmentDoneSignal.getCount() == 0;
+ public SummaryProfile getSummaryProfile() {
+ return summaryProfile;
}
- public void addInstanceProfile(int fragmentId, RuntimeProfile
instanceProfile) {
- Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
- fragmentId + " vs. " + fragmentProfiles.size());
- fragmentProfiles.get(fragmentId).addChild(instanceProfile);
+ public void setSummaryProfile(SummaryProfile summaryProfile) {
+ this.summaryProfile = summaryProfile;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 5f3ed601630..12ba687bfd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -45,11 +45,16 @@ import java.util.Map;
* ExecutionProfile1: Fragment 0: Fragment 1: ...
* ExecutionProfile2: Fragment 0: Fragment 1: ...
*
+ * ExecutionProfile: Fragment 0: Fragment 1: ...
+ * And also summary profile contains plan information, but execution profile
is for
+ * be execution time.
+ * StmtExecutor(Profile) ---> Coordinator(ExecutionProfile)
*/
public class Profile {
private static final Logger LOG = LogManager.getLogger(Profile.class);
private static final int MergedProfileLevel = 1;
- private RuntimeProfile rootProfile;
+ private final String name;
+ private final boolean isPipelineX;
private SummaryProfile summaryProfile;
private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
private boolean isFinished;
@@ -57,51 +62,57 @@ public class Profile {
private int profileLevel = 3;
- public Profile(String name, boolean isEnable) {
- this.rootProfile = new RuntimeProfile(name);
- this.summaryProfile = new SummaryProfile(rootProfile);
+ public Profile(String name, boolean isEnable, int profileLevel, boolean
isPipelineX) {
+ this.name = name;
+ this.isPipelineX = isPipelineX;
+ this.summaryProfile = new SummaryProfile();
// if disabled, just set isFinished to true, so that update() will do
nothing
this.isFinished = !isEnable;
+ this.profileLevel = profileLevel;
}
+ // For load task, the profile contains many execution profiles
public void addExecutionProfile(ExecutionProfile executionProfile) {
if (executionProfile == null) {
LOG.warn("try to set a null excecution profile, it is abnormal",
new Exception());
return;
}
+ if (this.isPipelineX) {
+ executionProfile.setPipelineX();
+ }
+ executionProfile.setSummaryProfile(summaryProfile);
this.executionProfiles.add(executionProfile);
- executionProfile.addToProfileAsChild(rootProfile);
}
- public synchronized void update(long startTime, Map<String, String>
summaryInfo, boolean isFinished,
- int profileLevel, Planner planner, boolean isPipelineX) {
+ public List<ExecutionProfile> getExecutionProfiles() {
+ return this.executionProfiles;
+ }
+
+ // This API will also add the profile to ProfileManager, so that we could
get the profile from ProfileManager.
+ // isFinished ONLY means the cooridnator or stmtexecutor is finished.
+ public synchronized void updateSummary(long startTime, Map<String, String>
summaryInfo, boolean isFinished,
+ Planner planner) {
try {
if (this.isFinished) {
return;
}
summaryProfile.update(summaryInfo);
for (ExecutionProfile executionProfile : executionProfiles) {
+ // Tell execution profile the start time
executionProfile.update(startTime, isFinished);
}
- rootProfile.computeTimeInProfile();
// Nerids native insert not set planner, so it is null
if (planner != null) {
this.planNodeMap = planner.getExplainStringMap();
}
- rootProfile.setIsPipelineX(isPipelineX);
ProfileManager.getInstance().pushProfile(this);
this.isFinished = isFinished;
- this.profileLevel = profileLevel;
} catch (Throwable t) {
LOG.warn("update profile failed", t);
throw t;
}
}
- public RuntimeProfile getRootProfile() {
- return this.rootProfile;
- }
-
public SummaryProfile getSummaryProfile() {
return summaryProfile;
}
@@ -110,7 +121,7 @@ public class Profile {
StringBuilder builder = new StringBuilder();
// add summary to builder
summaryProfile.prettyPrint(builder);
- LOG.info(builder.toString());
+ waitProfileCompleteIfNeeded();
// Only generate merged profile for select, insert into select.
// Not support broker load now.
if (this.profileLevel == MergedProfileLevel &&
this.executionProfiles.size() == 1) {
@@ -125,7 +136,7 @@ public class Profile {
try {
for (ExecutionProfile executionProfile : executionProfiles) {
builder.append("\n");
- executionProfile.getExecutionProfile().prettyPrint(builder,
"");
+ executionProfile.getRoot().prettyPrint(builder, "");
}
} catch (Throwable aggProfileException) {
LOG.warn("build profile failed", aggProfileException);
@@ -134,7 +145,44 @@ public class Profile {
return builder.toString();
}
+ // If the query is already finished, and user wants to get the profile, we
should check
+ // if BE has reported all profiles, if not, sleep 2s.
+ private void waitProfileCompleteIfNeeded() {
+ if (!this.isFinished) {
+ return;
+ }
+ boolean allCompleted = true;
+ for (ExecutionProfile executionProfile : executionProfiles) {
+ if (!executionProfile.isCompleted()) {
+ allCompleted = false;
+ break;
+ }
+ }
+ if (!allCompleted) {
+ try {
+ Thread.currentThread().sleep(2000);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ }
+ }
+
+ private RuntimeProfile composeRootProfile() {
+
+ RuntimeProfile rootProfile = new RuntimeProfile(name);
+ rootProfile.setIsPipelineX(isPipelineX);
+ rootProfile.addChild(summaryProfile.getSummary());
+ rootProfile.addChild(summaryProfile.getExecutionSummary());
+ for (ExecutionProfile executionProfile : executionProfiles) {
+ rootProfile.addChild(executionProfile.getRoot());
+ }
+ rootProfile.computeTimeInProfile();
+ return rootProfile;
+ }
+
public String getProfileBrief() {
+ waitProfileCompleteIfNeeded();
+ RuntimeProfile rootProfile = composeRootProfile();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
return gson.toJson(rootProfile.toBrief());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index e9389b48b99..0d07e865d02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -144,12 +144,22 @@ public class SummaryProfile {
private long queryFetchResultConsumeTime = 0;
private long queryWriteResultConsumeTime = 0;
- public SummaryProfile(RuntimeProfile rootProfile) {
+ public SummaryProfile() {
summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
executionSummaryProfile = new
RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
init();
- rootProfile.addChild(summaryProfile);
- rootProfile.addChild(executionSummaryProfile);
+ }
+
+ public String getProfileId() {
+ return this.summaryProfile.getInfoString(PROFILE_ID);
+ }
+
+ public RuntimeProfile getSummary() {
+ return summaryProfile;
+ }
+
+ public RuntimeProfile getExecutionSummary() {
+ return executionSummaryProfile;
}
private void init() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 8644fea6221..70bb21a27e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -21,12 +21,14 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.profile.MultiProfileTreeBuilder;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.ProfileTreeBuilder;
import org.apache.doris.common.profile.ProfileTreeNode;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -104,6 +106,9 @@ public class ProfileManager {
// record the order of profiles by queryId
private Deque<String> queryIdDeque;
private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId
to RuntimeProfile
+ // Sometimes one Profile is related with multiple execution
profiles(Brokerload), so that
+ // execution profile's query id is not related with Profile's query id.
+ private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;
public static ProfileManager getInstance() {
if (INSTANCE == null) {
@@ -122,23 +127,56 @@ public class ProfileManager {
writeLock = lock.writeLock();
queryIdDeque = new LinkedList<>();
queryIdToProfileMap = new ConcurrentHashMap<>();
+ queryIdToExecutionProfiles = Maps.newHashMap();
}
- public ProfileElement createElement(Profile profile) {
+ private ProfileElement createElement(Profile profile) {
ProfileElement element = new ProfileElement(profile);
element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings());
- MultiProfileTreeBuilder builder = new
MultiProfileTreeBuilder(profile.getRootProfile());
+ // Not init builder any more, we will not maintain it since 2.1.0,
because the structure
+ // assume that the execution profiles structure is already known
before execution. But in
+ // PipelineX Engine, it will changed during execution.
+ return element;
+ }
+
+ public void addExecutionProfile(ExecutionProfile executionProfile) {
+ if (executionProfile == null) {
+ return;
+ }
+ writeLock.lock();
try {
- builder.build();
- } catch (Exception e) {
- element.errMsg = e.getMessage();
+ if
(queryIdToExecutionProfiles.containsKey(executionProfile.getQueryId())) {
+ return;
+ }
+ queryIdToExecutionProfiles.put(executionProfile.getQueryId(),
executionProfile);
if (LOG.isDebugEnabled()) {
- LOG.debug("failed to build profile tree", e);
+ LOG.debug("Add execution profile {} to profile manager",
+ DebugUtil.printId(executionProfile.getQueryId()));
}
- return element;
+ // Check if there are some query profiles that not finish
collecting, should
+ // remove them to release memory.
+ if (queryIdToExecutionProfiles.size() > 2 *
Config.max_query_profile_num) {
+ List<ExecutionProfile> finishOrExpireExecutionProfiles =
Lists.newArrayList();
+ for (ExecutionProfile tmpProfile :
queryIdToExecutionProfiles.values()) {
+ if (System.currentTimeMillis() -
tmpProfile.getQueryFinishTime()
+ > Config.profile_async_collect_expire_time_secs *
1000) {
+ finishOrExpireExecutionProfiles.add(tmpProfile);
+ }
+ }
+ for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) {
+ queryIdToExecutionProfiles.remove(tmp.getQueryId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove expired execution profile {}",
DebugUtil.printId(tmp.getQueryId()));
+ }
+ }
+ }
+ } finally {
+ writeLock.unlock();
}
- element.builder = builder;
- return element;
+ }
+
+ public ExecutionProfile getExecutionProfile(TUniqueId queryId) {
+ return this.queryIdToExecutionProfiles.get(queryId);
}
public void pushProfile(Profile profile) {
@@ -148,14 +186,13 @@ public class ProfileManager {
ProfileElement element = createElement(profile);
// 'insert into' does have job_id, put all profiles key with query_id
- String key = element.infoStrings.get(SummaryProfile.PROFILE_ID);
+ String key = element.profile.getSummaryProfile().getProfileId();
// check when push in, which can ensure every element in the list has
QUERY_ID column,
// so there is no need to check when remove element from list.
if (Strings.isNullOrEmpty(key)) {
LOG.warn("the key or value of Map is null, "
+ "may be forget to insert 'QUERY_ID' or 'JOB_ID' column
into infoStrings");
}
-
writeLock.lock();
// a profile may be updated multiple times in queryIdToProfileMap,
// and only needs to be inserted into the queryIdDeque for the first
time.
@@ -163,7 +200,13 @@ public class ProfileManager {
try {
if (!queryIdDeque.contains(key)) {
if (queryIdDeque.size() >= Config.max_query_profile_num) {
- queryIdToProfileMap.remove(queryIdDeque.getFirst());
+ ProfileElement profileElementRemoved =
queryIdToProfileMap.remove(queryIdDeque.getFirst());
+ // If the Profile object is removed from manager, then
related execution profile is also useless.
+ if (profileElementRemoved != null) {
+ for (ExecutionProfile executionProfile :
profileElementRemoved.profile.getExecutionProfiles()) {
+
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+ }
+ }
queryIdDeque.removeFirst();
}
queryIdDeque.addLast(key);
@@ -173,6 +216,21 @@ public class ProfileManager {
}
}
+ public void removeProfile(String profileId) {
+ writeLock.lock();
+ try {
+ ProfileElement profileElementRemoved =
queryIdToProfileMap.remove(profileId);
+ // If the Profile object is removed from manager, then related
execution profile is also useless.
+ if (profileElementRemoved != null) {
+ for (ExecutionProfile executionProfile :
profileElementRemoved.profile.getExecutionProfiles()) {
+
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+ }
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
public List<List<String>> getAllQueries() {
return getQueryWithType(null);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index eab9b3b6734..372b84fa3f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -26,6 +26,7 @@ import org.apache.doris.thrift.TRuntimeProfileTree;
import org.apache.doris.thrift.TUnit;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
@@ -69,7 +70,8 @@ public class RuntimeProfile {
private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
private List<String> planNodeInfos = Lists.newArrayList();
- private String name;
+ // name should not changed.
+ private final String name;
private Long timestamp = -1L;
@@ -85,22 +87,24 @@ public class RuntimeProfile {
private int nodeid = -1;
public RuntimeProfile(String name) {
- this();
+ this.localTimePercent = 0;
+ if (Strings.isNullOrEmpty(name)) {
+ throw new RuntimeException("Profile name must not be null");
+ }
this.name = name;
this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
+ this.counterMap.put("TotalTime", counterTotalTime);
}
public RuntimeProfile(String name, int nodeId) {
- this();
+ this.localTimePercent = 0;
+ if (Strings.isNullOrEmpty(name)) {
+ throw new RuntimeException("Profile name must not be null");
+ }
this.name = name;
this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3);
- this.nodeid = nodeId;
- }
-
- public RuntimeProfile() {
- this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
- this.localTimePercent = 0;
this.counterMap.put("TotalTime", counterTotalTime);
+ this.nodeid = nodeId;
}
public void setIsCancel(Boolean isCancel) {
@@ -143,10 +147,6 @@ public class RuntimeProfile {
this.isPipelineX = isPipelineX;
}
- public boolean getIsPipelineX() {
- return this.isPipelineX;
- }
-
public Map<String, Counter> getCounterMap() {
return counterMap;
}
@@ -155,6 +155,10 @@ public class RuntimeProfile {
return childList;
}
+ public boolean isEmpty() {
+ return childList.isEmpty();
+ }
+
public Map<String, RuntimeProfile> getChildMap() {
return childMap;
}
@@ -750,10 +754,6 @@ public class RuntimeProfile {
}
}
- public void setName(String name) {
- this.name = name;
- }
-
// Returns the value to which the specified key is mapped;
// or null if this map contains no mapping for the key.
public String getInfoString(String key) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 27503d4cc78..02f9bb0a3e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -199,7 +199,9 @@ public class BrokerLoadJob extends BulkLoadJob {
// divide job into broker loading task by table
List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
if (enableProfile) {
- this.jobProfile = new Profile("BrokerLoadJob " + id + ". " +
label, true);
+ this.jobProfile = new Profile("BrokerLoadJob " + id + ". " +
label, true,
+
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL,
"3")),
+ false);
}
ProgressManager progressManager = Env.getCurrentProgressManager();
progressManager.registerProgressSimple(String.valueOf(id));
@@ -329,16 +331,6 @@ public class BrokerLoadJob extends BulkLoadJob {
}
}
- private void writeProfile() {
- if (!enableProfile) {
- return;
- }
- jobProfile.update(createTimestamp, getSummaryInfo(true), true,
-
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL,
"3")), null, false);
- // jobProfile has been pushed into ProfileManager, remove reference in
brokerLoadJob
- jobProfile = null;
- }
-
private Map<String, String> getSummaryInfo(boolean isFinished) {
long currentTimestamp = System.currentTimeMillis();
SummaryBuilder builder = new SummaryBuilder();
@@ -410,7 +402,12 @@ public class BrokerLoadJob extends BulkLoadJob {
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
super.afterVisible(txnState, txnOperated);
- writeProfile();
+ if (!enableProfile) {
+ return;
+ }
+ jobProfile.updateSummary(createTimestamp, getSummaryInfo(true), true,
null);
+ // jobProfile has been pushed into ProfileManager, remove reference in
brokerLoadJob
+ jobProfile = null;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 94fb49d6c85..ef7a07cb9a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -172,8 +172,11 @@ public class LoadLoadingTask extends LoadTask {
}
try {
- QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
+ QeProcessorImpl.INSTANCE.registerQuery(loadId, new
QeProcessorImpl.QueryInfo(curCoordinator));
actualExecute(curCoordinator, timeoutS);
+ if (this.jobProfile != null) {
+ curCoordinator.getExecutionProfile().update(beginTime, true);
+ }
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
@@ -198,8 +201,6 @@ public class LoadLoadingTask extends LoadTask {
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
curCoordinator.getErrorTabletInfos().clear();
- // Create profile of this task and add to the job profile.
- createProfile(curCoordinator);
} else {
throw new LoadException(status.getErrorMsg());
}
@@ -212,15 +213,6 @@ public class LoadLoadingTask extends LoadTask {
return jobDeadlineMs - System.currentTimeMillis();
}
- private void createProfile(Coordinator coord) {
- if (jobProfile == null) {
- // No need to gather profile
- return;
- }
- // Summary profile
- coord.getExecutionProfile().update(beginTime, true);
- }
-
@Override
public void updateRetryInfo() {
super.updateRetryInfo();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index cea0efc6fe7..45c96bd742a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -127,7 +127,9 @@ public class LoadCommand extends Command implements
ForwardWithSync {
if (!Config.enable_nereids_load) {
throw new AnalysisException("Fallback to legacy planner
temporary.");
}
- this.profile = new Profile("Query",
ctx.getSessionVariable().enableProfile);
+ this.profile = new Profile("Query",
ctx.getSessionVariable().enableProfile,
+ ctx.getSessionVariable().profileLevel,
+ ctx.getSessionVariable().getEnablePipelineXEngine());
profile.getSummaryProfile().setQueryBeginTime();
if (sourceInfos.size() == 1) {
plans = ImmutableList.of(new
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
index 50a6bd6495b..5718e68c6b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -25,8 +25,6 @@ public interface CoordInterface {
public RowBatch getNext() throws Exception;
- public int getInstanceTotalNum();
-
public void cancel(Types.PPlanFragmentCancelReason cancelReason);
// When call exec or get next data finished, should call this method to
release
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1b99da152f9..64b07e518a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -23,9 +23,11 @@ import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.Config;
+import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.Status;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
@@ -80,7 +82,6 @@ import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerScanRange;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDescriptorTable;
-import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TErrorTabletInfo;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -122,6 +123,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
@@ -142,13 +146,13 @@ import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class Coordinator implements CoordInterface {
private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -158,6 +162,9 @@ public class Coordinator implements CoordInterface {
// Random is used to shuffle instances of partitioned
private static final Random instanceRandom = new SecureRandom();
+ private static ExecutorService backendRpcCallbackExecutor =
ThreadPoolManager.newDaemonProfileThreadPool(32, 100,
+ "backend-rpc-callback", true);
+
// Overall status of the entire query; set to the first reported fragment
error
// status or to CANCELLED, if Cancel() is called.
Status queryStatus = new Status();
@@ -188,15 +195,12 @@ public class Coordinator implements CoordInterface {
// coordinator still needs to wait for cleanup on remote fragments (e.g.
queries
// with limit)
// Once this is set to true, errors from remote fragments are ignored.
- private boolean returnedAllResults;
-
- private List<RuntimeProfile> fragmentProfile;
+ private boolean returnedAllResults = false;
// populated in computeFragmentExecParams()
private final Map<PlanFragmentId, FragmentExecParams>
fragmentExecParamsMap = Maps.newHashMap();
private final List<PlanFragment> fragments;
- private int instanceTotalNum;
private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
private Map<Long, PipelineExecContexts> beToPipelineExecCtxs =
Maps.newHashMap();
@@ -267,6 +271,15 @@ public class Coordinator implements CoordInterface {
private StatsErrorEstimator statsErrorEstimator;
+ // A countdown latch to mark the completion of each instance.
+ // use for old pipeline
+ // instance id -> dummy value
+ private MarkedCountDownLatch<TUniqueId, Long> instancesDoneLatch = null;
+
+ // A countdown latch to mark the completion of each fragment. use for
pipelineX
+ // fragmentid -> backendid
+ private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
+
public void setTWorkloadGroups(List<TPipelineWorkloadGroup>
tWorkloadGroups) {
this.tWorkloadGroups = tWorkloadGroups;
}
@@ -335,7 +348,7 @@ public class Coordinator implements CoordInterface {
nextInstanceId.setHi(queryId.hi);
nextInstanceId.setLo(queryId.lo + 1);
this.assignedRuntimeFilters = planner.getRuntimeFilters();
- this.executionProfile = new ExecutionProfile(queryId,
fragments.size());
+ this.executionProfile = new ExecutionProfile(queryId, fragments);
}
@@ -357,7 +370,7 @@ public class Coordinator implements CoordInterface {
this.nextInstanceId = new TUniqueId();
nextInstanceId.setHi(queryId.hi);
nextInstanceId.setLo(queryId.lo + 1);
- this.executionProfile = new ExecutionProfile(queryId,
fragments.size());
+ this.executionProfile = new ExecutionProfile(queryId, fragments);
}
private void setFromUserProperty(ConnectContext connectContext) {
@@ -510,11 +523,6 @@ public class Coordinator implements CoordInterface {
return result;
}
- @Override
- public int getInstanceTotalNum() {
- return instanceTotalNum;
- }
-
// Initialize
private void prepare() {
for (PlanFragment fragment : fragments) {
@@ -600,7 +608,6 @@ public class Coordinator implements CoordInterface {
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
scanRangeNum);
LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId),
addressToBackendID.keySet());
- executionProfile.markInstances(instanceIds);
List<TExecPlanFragmentParams> tExecPlanFragmentParams
= ((FragmentExecParams)
this.fragmentExecParamsMap.values().toArray()[0]).toThrift(0);
TExecPlanFragmentParams fragmentParams =
tExecPlanFragmentParams.get(0);
@@ -708,11 +715,6 @@ public class Coordinator implements CoordInterface {
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
scanRangeNum);
LOG.info("dispatch load job: {} to {}",
DebugUtil.printId(queryId), addressToBackendID.keySet());
}
- if (enablePipelineXEngine) {
- executionProfile.markFragments(fragments.size());
- } else {
- executionProfile.markInstances(instanceIds);
- }
if (enablePipelineEngine) {
sendPipelineCtx();
@@ -775,7 +777,6 @@ public class Coordinator implements CoordInterface {
// 1. set up exec states
int instanceNum = params.instanceExecParams.size();
Preconditions.checkState(instanceNum > 0);
- instanceTotalNum += instanceNum;
List<TExecPlanFragmentParams> tParams =
params.toThrift(backendIdx);
// 2. update memory limit for colocate join
@@ -802,8 +803,7 @@ public class Coordinator implements CoordInterface {
for (TExecPlanFragmentParams tParam : tParams) {
BackendExecState execState =
new BackendExecState(fragment.getFragmentId(),
instanceId++,
- profileFragmentId, tParam,
this.addressToBackendID,
- executionProfile.getLoadChannelProfile());
+ tParam, this.addressToBackendID,
executionProfile);
// Each tParam will set the total number of Fragments that
need to be executed on the same BE,
// and the BE will determine whether all Fragments have
been executed based on this information.
// Notice. load fragment has a small probability that
FragmentNumOnHost is 0, for unknown reasons.
@@ -870,9 +870,6 @@ public class Coordinator implements CoordInterface {
}
waitRpc(futures, this.timeoutDeadline -
System.currentTimeMillis(), "send execution start");
}
- if (context != null &&
context.getSessionVariable().enableProfile()) {
- attachInstanceProfileToFragmentProfile();
- }
} finally {
unlock();
}
@@ -891,8 +888,9 @@ public class Coordinator implements CoordInterface {
int backendIdx = 0;
int profileFragmentId = 0;
beToPipelineExecCtxs.clear();
- // If #fragments > 1 and BE amount is bigger than 1, use
twoPhaseExecution with
- // exec_plan_fragments_prepare and exec_plan_fragments_start,
+ // fragment:backend
+ List<Pair<PlanFragmentId, Long>> backendFragments =
Lists.newArrayList();
+ // If #fragments >=2, use twoPhaseExecution with
exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
// we choose #fragments > 1 because in some cases
// we need ensure that A fragment is already prepared to receive
data before B fragment sends data.
@@ -918,12 +916,10 @@ public class Coordinator implements CoordInterface {
needCheckBackendState = true;
}
- Map<TUniqueId, RuntimeProfile> fragmentInstancesMap = new
HashMap<TUniqueId, RuntimeProfile>();
+ Map<TUniqueId, Boolean> fragmentInstancesMap = new
HashMap<TUniqueId, Boolean>();
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
for (TPipelineInstanceParams instanceParam :
entry.getValue().local_params) {
- String name = "Instance " +
DebugUtil.printId(instanceParam.fragment_instance_id)
- + " (host=" + entry.getKey() + ")";
-
fragmentInstancesMap.put(instanceParam.fragment_instance_id, new
RuntimeProfile(name));
+
fragmentInstancesMap.put(instanceParam.fragment_instance_id, false);
}
}
@@ -932,10 +928,10 @@ public class Coordinator implements CoordInterface {
// So that we can use one RPC to send all fragment instances
of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
Long backendId =
this.addressToBackendID.get(entry.getKey());
+ backendFragments.add(Pair.of(fragment.getFragmentId(),
backendId));
PipelineExecContext pipelineExecContext = new
PipelineExecContext(fragment.getFragmentId(),
- profileFragmentId, entry.getValue(), backendId,
fragmentInstancesMap,
- executionProfile.getLoadChannelProfile(),
this.enablePipelineXEngine,
- this.executionProfile);
+ entry.getValue(), backendId, fragmentInstancesMap,
+ this.enablePipelineXEngine, executionProfile);
// Each tParam will set the total number of Fragments that
need to be executed on the same BE,
// and the BE will determine whether all Fragments have
been executed based on this information.
// Notice. load fragment has a small probability that
FragmentNumOnHost is 0, for unknown reasons.
@@ -988,6 +984,14 @@ public class Coordinator implements CoordInterface {
profileFragmentId += 1;
} // end for fragments
+ // Init the mark done in order to track the finished state of the
query
+ if (this.enablePipelineXEngine) {
+ fragmentsDoneLatch = new
MarkedCountDownLatch<>(backendFragments.size());
+ for (Pair<PlanFragmentId, Long> pair : backendFragments) {
+ fragmentsDoneLatch.addMark(pair.first.asInt(),
pair.second);
+ }
+ }
+
// 4. send and wait fragments rpc
List<Triple<PipelineExecContexts, BackendServiceProxy,
Future<InternalService.PExecPlanFragmentResult>>>
futures = Lists.newArrayList();
@@ -1019,9 +1023,6 @@ public class Coordinator implements CoordInterface {
}
waitPipelineRpc(futures, this.timeoutDeadline -
System.currentTimeMillis(), "send execution start");
}
- if (context != null &&
context.getSessionVariable().enableProfile()) {
- attachInstanceProfileToFragmentProfile();
- }
} finally {
unlock();
}
@@ -1446,8 +1447,10 @@ public class Coordinator implements CoordInterface {
lock();
try {
if (!queryStatus.ok()) {
- // we can't cancel twice
- return;
+ // Print an error stack here to know why send cancel again.
+ LOG.warn("Query {} already in abnormal status {}, but received
cancel again,"
+ + "so that send cancel to BE again",
+ DebugUtil.printId(queryId), queryStatus.toString(),
new Exception());
} else {
queryStatus.setStatus(Status.CANCELLED);
}
@@ -1459,6 +1462,15 @@ public class Coordinator implements CoordInterface {
}
}
+ private void cancelLatch() {
+ if (instancesDoneLatch != null) {
+ instancesDoneLatch.countDownToZero(new Status());
+ }
+ if (fragmentsDoneLatch != null) {
+ fragmentsDoneLatch.countDownToZero(new Status());
+ }
+ }
+
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
if (null != receiver) {
receiver.cancel(cancelReason.toString());
@@ -1468,7 +1480,7 @@ public class Coordinator implements CoordInterface {
return;
}
cancelRemoteFragmentsAsync(cancelReason);
- executionProfile.onCancel();
+ cancelLatch();
}
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason
cancelReason) {
@@ -1507,6 +1519,12 @@ public class Coordinator implements CoordInterface {
}
}
+ // Init instancesDoneLatch, it will be used to track if the instances
has finished for insert stmt
+ instancesDoneLatch = new MarkedCountDownLatch<>(instanceIds.size());
+ for (TUniqueId instanceId : instanceIds) {
+ instancesDoneLatch.addMark(instanceId, -1L /* value is meaningless
*/);
+ }
+
// compute multi cast fragment params
computeMultiCastFragmentParams();
@@ -2437,31 +2455,30 @@ public class Coordinator implements CoordInterface {
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (!ctx.updateProfile(params)) {
+ if (ctx == null || !ctx.updatePipelineStatus(params)) {
return;
}
- // print fragment instance profile
- if (LOG.isDebugEnabled()) {
- StringBuilder builder = new StringBuilder();
- ctx.printProfile(builder);
- if (LOG.isDebugEnabled()) {
- LOG.debug("profile for query_id={} fragment_id={}\n{}",
- DebugUtil.printId(queryId),
- params.getFragmentId(),
- builder.toString());
- }
- }
-
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the
error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
- if (!(returnedAllResults && status.isCancelled()) && !status.ok())
{
- LOG.warn("one instance report fail, query_id={}
instance_id={}, error message: {}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.getErrorMsg());
- updateStatus(status);
+ if (!status.ok()) {
+ if (returnedAllResults && status.isCancelled()) {
+ LOG.warn("Query {} has returned all results,
fragment_id={} instance_id={}, be={}"
+ + " is reporting failed status {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(),
+ status.toString());
+ } else {
+ LOG.warn("one instance report fail, query_id={}
fragment_id={} instance_id={}, be={},"
+ + " error message: {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(), status.toString());
+ updateStatus(status);
+ }
}
if (params.isSetDeltaUrls()) {
updateDeltas(params.getDeltaUrls());
@@ -2489,39 +2506,36 @@ public class Coordinator implements CoordInterface {
if (ctx.done) {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} fragment {} is marked done",
- DebugUtil.printId(queryId), ctx.profileFragmentId);
+ DebugUtil.printId(queryId), ctx.fragmentId);
}
- executionProfile.markOneFragmentDone(ctx.profileFragmentId);
+ fragmentsDoneLatch.markedCountDown(params.getFragmentId(),
params.getBackendId());
}
} else if (enablePipelineEngine) {
PipelineExecContext ctx =
pipelineExecContexts.get(Pair.of(params.getFragmentId(),
params.getBackendId()));
- if (!ctx.updateProfile(params)) {
+ if (ctx == null || !ctx.updatePipelineStatus(params)) {
return;
}
- // print fragment instance profile
- if (LOG.isDebugEnabled()) {
- StringBuilder builder = new StringBuilder();
- ctx.printProfile(builder);
- if (LOG.isDebugEnabled()) {
- LOG.debug("profile for query_id={} instance_id={}\n{}",
- DebugUtil.printId(queryId),
- DebugUtil.printId(params.getFragmentInstanceId()),
- builder.toString());
- }
- }
-
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the
error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already
been initiated)
- if (!(returnedAllResults && status.isCancelled()) && !status.ok())
{
- LOG.warn("one instance report fail, query_id={} fragment_id={}
instance_id={}, be={},"
- + " error message: {}",
- DebugUtil.printId(queryId), params.getFragmentId(),
- DebugUtil.printId(params.getFragmentInstanceId()),
- params.getBackendId(), status.getErrorMsg());
- updateStatus(status);
+ if (!status.ok()) {
+ if (returnedAllResults && status.isCancelled()) {
+ LOG.warn("Query {} has returned all results,
fragment_id={} instance_id={}, be={}"
+ + " is reporting failed status {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(),
+ status.toString());
+ } else {
+ LOG.warn("one instance report fail, query_id={}
fragment_id={} instance_id={}, be={},"
+ + " error message: {}",
+ DebugUtil.printId(queryId), params.getFragmentId(),
+ DebugUtil.printId(params.getFragmentInstanceId()),
+ params.getBackendId(), status.toString());
+ updateStatus(status);
+ }
}
// params.isDone() should be promised.
@@ -2530,7 +2544,7 @@ public class Coordinator implements CoordInterface {
// The last report causes the counter to decrease to zero,
// but it is possible that the report without commit-info
triggered the commit operation,
// resulting in the data not being published.
- if
(ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone() &&
params.isDone()) {
+ if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) &&
params.isDone()) {
if (params.isSetDeltaUrls()) {
updateDeltas(params.getDeltaUrls());
}
@@ -2556,7 +2570,7 @@ public class Coordinator implements CoordInterface {
LOG.debug("Query {} instance {} is marked done",
DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
}
-
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} instance {} is not marked done",
@@ -2571,22 +2585,14 @@ public class Coordinator implements CoordInterface {
return;
}
BackendExecState execState =
backendExecStates.get(params.backend_num);
- if (!execState.updateProfile(params)) {
+ if (!execState.updateInstanceStatus(params)) {
+ // Has to return here, to avoid out of order report messages.
For example,
+ // the first message is done, then we update commit messages,
but the new
+ // message is running, then we will also update commit
messages. It will
+ // lead to data corrupt.
return;
}
- // print fragment instance profile
- if (LOG.isDebugEnabled()) {
- StringBuilder builder = new StringBuilder();
- execState.printProfile(builder);
- if (LOG.isDebugEnabled()) {
- LOG.debug("profile for query_id={} instance_id={}\n{}",
- DebugUtil.printId(queryId),
- DebugUtil.printId(params.getFragmentInstanceId()),
- builder.toString());
- }
- }
-
Status status = new Status(params.status);
// for now, abort the query if we see any error except if the
error is cancelled
// and returned_all_results_ is true.
@@ -2595,11 +2601,11 @@ public class Coordinator implements CoordInterface {
if (status.isCancelled() && returnedAllResults) {
LOG.warn("Query {} has returned all results, its instance
{} is reporting failed status {}",
DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.getErrorMsg());
+ status.toString());
} else {
LOG.warn("Instance {} of query {} report failed status,
error msg: {}",
DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
- status.getErrorMsg());
+ status.toString());
updateStatus(status);
}
}
@@ -2632,7 +2638,7 @@ public class Coordinator implements CoordInterface {
if (params.isSetHivePartitionUpdates()) {
updateHivePartitionUpdates(params.getHivePartitionUpdates());
}
-
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
}
}
@@ -2667,10 +2673,10 @@ public class Coordinator implements CoordInterface {
long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
boolean awaitRes = false;
try {
- if (enablePipelineXEngine) {
- awaitRes =
executionProfile.awaitAllFragmentsDone(waitTime);
+ if (fragmentsDoneLatch != null) {
+ awaitRes = fragmentsDoneLatch.await(waitTime,
TimeUnit.SECONDS);
} else {
- awaitRes =
executionProfile.awaitAllInstancesDone(waitTime);
+ awaitRes = instancesDoneLatch.await(waitTime,
TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
// Do nothing
@@ -2714,10 +2720,10 @@ public class Coordinator implements CoordInterface {
}
public boolean isDone() {
- if (enablePipelineXEngine) {
- return executionProfile.isAllFragmentsDone();
+ if (fragmentsDoneLatch != null) {
+ return fragmentsDoneLatch.getCount() == 0;
} else {
- return executionProfile.isAllInstancesDone();
+ return instancesDoneLatch.getCount() == 0;
}
}
@@ -3024,20 +3030,17 @@ public class Coordinator implements CoordInterface {
PlanFragmentId fragmentId;
boolean initiated;
volatile boolean done;
- boolean hasCanceled;
- int profileFragmentId;
- RuntimeProfile instanceProfile;
- RuntimeProfile loadChannelProfile;
TNetworkAddress brpcAddress;
TNetworkAddress address;
Backend backend;
long lastMissingHeartbeatTime = -1;
TUniqueId instanceId;
+ private boolean hasCancelled = false;
+ private boolean cancelInProcess = false;
- public BackendExecState(PlanFragmentId fragmentId, int instanceId, int
profileFragmentId,
+ public BackendExecState(PlanFragmentId fragmentId, int instanceId,
TExecPlanFragmentParams rpcParams,
Map<TNetworkAddress, Long> addressToBackendID,
- RuntimeProfile loadChannelProfile) {
- this.profileFragmentId = profileFragmentId;
+ ExecutionProfile executionProfile) {
this.fragmentId = fragmentId;
this.rpcParams = rpcParams;
this.initiated = false;
@@ -3047,12 +3050,10 @@ public class Coordinator implements CoordInterface {
this.address = fi.host;
this.backend = idToBackend.get(addressToBackendID.get(address));
this.brpcAddress = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
-
- String name = "Instance " + DebugUtil.printId(fi.instanceId) + "
(host=" + address + ")";
- this.loadChannelProfile = loadChannelProfile;
- this.instanceProfile = new RuntimeProfile(name);
- this.hasCanceled = false;
this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
+ String profileName = "Instance " +
DebugUtil.printId(fi.instanceId) + " (host=" + address + ")";
+ RuntimeProfile instanceProfile = new RuntimeProfile(profileName);
+ executionProfile.addInstanceProfile(fragmentId, fi.instanceId,
instanceProfile);
}
/**
@@ -3070,19 +3071,12 @@ public class Coordinator implements CoordInterface {
this.rpcParams.setIsSimplifiedParam(true);
}
- // update profile.
- // return true if profile is updated. Otherwise, return false.
- public synchronized boolean updateProfile(TReportExecStatusParams
params) {
+ // update the instance status, if it is already finished, then not
update any more.
+ public synchronized boolean
updateInstanceStatus(TReportExecStatusParams params) {
if (this.done) {
// duplicate packet
return false;
}
- if (params.isSetProfile()) {
- instanceProfile.update(params.profile);
- }
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
- }
this.done = params.done;
if (statsErrorEstimator != null) {
statsErrorEstimator.updateExactReturnedRows(params);
@@ -3090,54 +3084,75 @@ public class Coordinator implements CoordInterface {
return true;
}
- public synchronized void printProfile(StringBuilder builder) {
- this.instanceProfile.computeTimeInProfile();
- this.instanceProfile.prettyPrint(builder, "");
- }
-
// cancel the fragment instance.
// return true if cancel success. Otherwise, return false
- public synchronized boolean
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
- LOG.warn("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend: {},"
+ public synchronized void
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+ LOG.warn("cancelRemoteFragments initiated={} done={} backend: {},"
+ " fragment instance id={}, reason: {}",
- this.initiated, this.done, this.hasCanceled,
backend.getId(),
+ this.initiated, this.done, backend.getId(),
DebugUtil.printId(fragmentInstanceId()),
cancelReason.name());
try {
if (!this.initiated) {
- return false;
+ return;
}
// don't cancel if it is already finished
if (this.done) {
- return false;
+ return;
}
- if (this.hasCanceled) {
- return false;
+ if (this.hasCancelled || this.cancelInProcess) {
+ LOG.info("Fragment instance has already been cancelled {}
or under cancel {}."
+ + " initiated={} done={} backend: {},"
+ + "fragment instance id={}, reason: {}",
+ this.hasCancelled, this.cancelInProcess,
+ this.initiated, this.done, backend.getId(),
+ DebugUtil.printId(fragmentInstanceId()),
cancelReason.name());
+ return;
}
-
try {
-
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
- fragmentInstanceId(), cancelReason);
+
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+ fragmentInstanceId(), cancelReason);
+ Futures.addCallback(cancelResult, new
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+ public void
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+ cancelInProcess = false;
+ if (result.hasStatus()) {
+ Status status = new Status();
+ status.setPstatus(result.getStatus());
+ if (status.getErrorCode() == TStatusCode.OK) {
+ hasCancelled = true;
+ } else {
+ LOG.warn("Failed to cancel query {}
instance initiated={} done={} backend: {},"
+ + "fragment instance id={},
reason: {}",
+ DebugUtil.printId(queryId),
initiated, done, backend.getId(),
+
DebugUtil.printId(fragmentInstanceId()), status.toString());
+ }
+ }
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment instance id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+ DebugUtil.printId(fragmentInstanceId()),
"without status");
+ }
+
+ public void onFailure(Throwable t) {
+ cancelInProcess = false;
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment instance id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+ DebugUtil.printId(fragmentInstanceId()),
cancelReason.name(), t);
+ }
+ }, backendRpcCallbackExecutor);
+ cancelInProcess = true;
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress),
e.getMessage());
}
- this.hasCanceled = true;
} catch (Exception e) {
LOG.warn("catch a exception", e);
- return false;
- }
- return true;
- }
-
- public synchronized boolean computeTimeInProfile(int maxFragmentId) {
- if (this.profileFragmentId < 0 || this.profileFragmentId >
maxFragmentId) {
- LOG.warn("profileFragmentId {} should be in [0, {})",
profileFragmentId, maxFragmentId);
- return false;
+ return;
}
- instanceProfile.computeTimeInProfile();
- return true;
+ return;
}
public boolean isBackendStateHealthy() {
@@ -3163,16 +3178,12 @@ public class Coordinator implements CoordInterface {
TPipelineFragmentParams rpcParams;
PlanFragmentId fragmentId;
boolean initiated;
- volatile boolean done;
- boolean hasCanceled;
+ boolean done;
// use for pipeline
- Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
+ Map<TUniqueId, Boolean> fragmentInstancesMap;
// use for pipelineX
- List<RuntimeProfile> taskProfile;
boolean enablePipelineX;
- RuntimeProfile loadChannelProfile;
- int profileFragmentId;
TNetworkAddress brpcAddress;
TNetworkAddress address;
Backend backend;
@@ -3180,19 +3191,17 @@ public class Coordinator implements CoordInterface {
long profileReportProgress = 0;
long beProcessEpoch = 0;
private final int numInstances;
- final ExecutionProfile executionProfile;
+ private boolean hasCancelled = false;
+ private boolean cancelInProcess = false;
- public PipelineExecContext(PlanFragmentId fragmentId, int
profileFragmentId,
+ public PipelineExecContext(PlanFragmentId fragmentId,
TPipelineFragmentParams rpcParams, Long backendId,
- Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
- RuntimeProfile loadChannelProfile, boolean enablePipelineX,
final ExecutionProfile executionProfile) {
- this.profileFragmentId = profileFragmentId;
+ Map<TUniqueId, Boolean> fragmentInstancesMap,
+ boolean enablePipelineX, ExecutionProfile executionProfile) {
this.fragmentId = fragmentId;
this.rpcParams = rpcParams;
this.numInstances = rpcParams.local_params.size();
this.fragmentInstancesMap = fragmentInstancesMap;
- this.taskProfile = new ArrayList<RuntimeProfile>();
- this.loadChannelProfile = loadChannelProfile;
this.initiated = false;
this.done = false;
@@ -3202,27 +3211,18 @@ public class Coordinator implements CoordInterface {
this.brpcAddress = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
this.beProcessEpoch = backend.getProcessEpoch();
- this.hasCanceled = false;
this.lastMissingHeartbeatTime =
backend.getLastMissingHeartbeatTime();
this.enablePipelineX = enablePipelineX;
- this.executionProfile = executionProfile;
- if (enablePipelineX) {
- executionProfile.addFragments(profileFragmentId);
- }
- }
-
- public Stream<RuntimeProfile> profileStream() {
- if (enablePipelineX) {
- return taskProfile.stream();
+ if (this.enablePipelineX) {
+ executionProfile.addFragmentBackend(fragmentId, backendId);
+ } else {
+ for (TPipelineInstanceParams instanceParam :
rpcParams.local_params) {
+ String profileName = "Instance " +
DebugUtil.printId(instanceParam.fragment_instance_id)
+ + " (host=" + address + ")";
+ executionProfile.addInstanceProfile(fragmentId,
instanceParam.fragment_instance_id,
+ new RuntimeProfile(profileName));
+ }
}
- return fragmentInstancesMap.values().stream();
- }
-
- public void attachPipelineProfileToFragmentProfile() {
- profileStream()
- .forEach(p ->
executionProfile.addInstanceProfile(this.profileFragmentId, p));
- executionProfile.addMultiBeProfileByPipelineX(profileFragmentId,
address,
- taskProfile);
}
/**
@@ -3243,46 +3243,33 @@ public class Coordinator implements CoordInterface {
// update profile.
// return true if profile is updated. Otherwise, return false.
- public synchronized boolean updateProfile(TReportExecStatusParams
params) {
+ // Has to use synchronized to ensure there are not concurrent update
threads. Or the done
+ // state maybe update wrong and will lose data. see
https://github.com/apache/doris/pull/29802/files.
+ public synchronized boolean
updatePipelineStatus(TReportExecStatusParams params) {
+ // The fragment or instance is not finished, not need update
+ if (!params.done) {
+ return false;
+ }
if (enablePipelineX) {
- taskProfile.clear();
- int pipelineIdx = 0;
- for (TDetailedReportParams param : params.detailed_report) {
- String name = "Pipeline :" + pipelineIdx + " "
- + " (host=" + address + ")";
- RuntimeProfile profile = new RuntimeProfile(name);
- taskProfile.add(profile);
- if (param.isSetProfile()) {
- profile.update(param.profile);
- }
- if (params.done) {
- profile.setIsDone(true);
- }
- pipelineIdx++;
- }
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
- }
- this.done = params.done;
- attachPipelineProfileToFragmentProfile();
- return this.done;
- } else {
- RuntimeProfile profile =
fragmentInstancesMap.get(params.fragment_instance_id);
- if (params.done && profile.getIsDone()) {
+ if (this.done) {
// duplicate packet
return false;
}
-
- if (params.isSetProfile()) {
- profile.update(params.profile);
- }
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
+ this.done = true;
+ return true;
+ } else {
+ // could not find the related instances, not update and return
false, to indicate
+ // that the caller should not update any more.
+ if
(!fragmentInstancesMap.containsKey(params.fragment_instance_id)) {
+ return false;
}
- if (params.done) {
- profile.setIsDone(true);
- profileReportProgress++;
+ Boolean instanceDone =
fragmentInstancesMap.get(params.fragment_instance_id);
+ if (instanceDone) {
+ // duplicate packet
+ return false;
}
+ fragmentInstancesMap.put(params.fragment_instance_id, true);
+ profileReportProgress++;
if (profileReportProgress == numInstances) {
this.done = true;
}
@@ -3290,32 +3277,57 @@ public class Coordinator implements CoordInterface {
}
}
- public synchronized void printProfile(StringBuilder builder) {
- this.profileStream().forEach(p -> {
- p.computeTimeInProfile();
- p.prettyPrint(builder, "");
- });
- }
-
- // cancel all fragment instances.
- // return true if cancel success. Otherwise, return false
-
- private synchronized boolean
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
- for (RuntimeProfile profile : taskProfile) {
- profile.setIsCancel(true);
- }
+ // Just send the cancel message to BE, not care about the result,
because there is no retry
+ // logic in upper logic.
+ private synchronized void
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
if (LOG.isDebugEnabled()) {
- LOG.debug("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend: {},"
+ LOG.debug("cancelRemoteFragments initiated={} done={} backend:
{},"
+ " fragment id={} query={}, reason: {}",
- this.initiated, this.done, this.hasCanceled,
backend.getId(),
- this.profileFragmentId,
+ this.initiated, this.done, backend.getId(),
+ this.fragmentId,
DebugUtil.printId(queryId), cancelReason.name());
}
+
+ if (this.hasCancelled || this.cancelInProcess) {
+ LOG.info("Frangment has already been cancelled. Query {}
backend: {}, fragment id={}",
+ DebugUtil.printId(queryId), backend.getId(),
this.fragmentId);
+ return;
+ }
try {
try {
-
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
+
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
this.fragmentId, queryId, cancelReason);
- this.hasCanceled = true;
+ Futures.addCallback(cancelResult, new
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+ public void
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+ cancelInProcess = false;
+ if (result.hasStatus()) {
+ Status status = new Status();
+ status.setPstatus(result.getStatus());
+ if (status.getErrorCode() == TStatusCode.OK) {
+ hasCancelled = true;
+ } else {
+ LOG.warn("Failed to cancel query {}
instance initiated={} done={} backend: {},"
+ + "fragment id={}, reason: {}",
+ DebugUtil.printId(queryId),
initiated, done, backend.getId(),
+ fragmentId, status.toString());
+ }
+ }
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+ fragmentId, "without status");
+ }
+
+ public void onFailure(Throwable t) {
+ cancelInProcess = false;
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+ fragmentId, cancelReason.name(), t);
+ }
+ }, backendRpcCallbackExecutor);
+ cancelInProcess = true;
} catch (RpcException e) {
LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddress.getHostname(),
brpcAddress.getPort());
@@ -3323,78 +3335,91 @@ public class Coordinator implements CoordInterface {
}
} catch (Exception e) {
LOG.warn("catch a exception", e);
- return false;
+ return;
}
- return true;
+ return;
}
- private synchronized boolean
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
+ // Just send the cancel logic to BE, not care about the result, and
there is no retry logic
+ // in upper logic.
+ private synchronized void
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
- LOG.warn("cancelRemoteFragments initiated={} done={}
hasCanceled={} backend:{},"
+ LOG.warn("cancelRemoteFragments initiated={} done={}
backend:{},"
+ " fragment instance id={} query={}, reason: {}",
- this.initiated, this.done, this.hasCanceled,
backend.getId(),
+ this.initiated, this.done, backend.getId(),
DebugUtil.printId(localParam.fragment_instance_id),
DebugUtil.printId(queryId), cancelReason.name());
-
- RuntimeProfile profile =
fragmentInstancesMap.get(localParam.fragment_instance_id);
- if (profile.getIsDone() || profile.getIsCancel()) {
- continue;
+ if (this.hasCancelled || this.cancelInProcess) {
+ LOG.info("fragment instance has already been cancelled {}
or in process {}. "
+ + "initiated={} done={} backend:{},"
+ + " fragment instance id={} query={}, reason: {}",
+ this.hasCancelled, this.cancelInProcess,
+ this.initiated, this.done, backend.getId(),
+ DebugUtil.printId(localParam.fragment_instance_id),
+ DebugUtil.printId(queryId), cancelReason.name());
+ return;
}
-
- this.hasCanceled = true;
try {
- try {
-
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
- localParam.fragment_instance_id, cancelReason);
- } catch (RpcException e) {
- LOG.warn("cancel plan fragment get a exception,
address={}:{}", brpcAddress.getHostname(),
- brpcAddress.getPort());
-
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress),
e.getMessage());
- }
+
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+ localParam.fragment_instance_id,
cancelReason);
+ Futures.addCallback(cancelResult, new
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+ public void
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+ cancelInProcess = false;
+ if (result.hasStatus()) {
+ Status status = new Status();
+ status.setPstatus(result.getStatus());
+ if (status.getErrorCode() == TStatusCode.OK) {
+ hasCancelled = true;
+ } else {
+ LOG.warn("Failed to cancel query {}
instance initiated={} done={} backend: {},"
+ + "fragment instance id={},
reason: {}",
+ DebugUtil.printId(queryId),
initiated, done, backend.getId(),
+
DebugUtil.printId(localParam.fragment_instance_id), status.toString());
+ }
+ }
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment instance id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+
DebugUtil.printId(localParam.fragment_instance_id), "without status");
+ }
+
+ public void onFailure(Throwable t) {
+ cancelInProcess = false;
+ LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
+ + "fragment instance id={}, reason: {}",
+ DebugUtil.printId(queryId), initiated,
done, backend.getId(),
+
DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name(), t);
+ }
+ }, backendRpcCallbackExecutor);
+ cancelInProcess = true;
} catch (Exception e) {
LOG.warn("catch a exception", e);
- return false;
+ return;
}
}
- if (!this.hasCanceled) {
- return false;
- }
- for (int i = 0; i < this.numInstances; i++) {
-
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
- }
- return true;
+ return;
}
/// TODO: refactor rpcParams
- public synchronized boolean
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+ public synchronized void
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
if (!this.initiated) {
LOG.warn("Query {}, ccancel before initiated",
DebugUtil.printId(queryId));
- return false;
+ return;
}
// don't cancel if it is already finished
if (this.done) {
LOG.warn("Query {}, cancel after finished",
DebugUtil.printId(queryId));
- return false;
- }
- if (this.hasCanceled) {
- LOG.warn("Query {}, cancel after cancelled",
DebugUtil.printId(queryId));
- return false;
+ return;
}
if (this.enablePipelineX) {
- return cancelFragment(cancelReason);
+ cancelFragment(cancelReason);
+ return;
} else {
- return cancelInstance(cancelReason);
- }
- }
-
- public synchronized boolean computeTimeInProfile(int maxFragmentId) {
- if (this.profileFragmentId < 0 || this.profileFragmentId >
maxFragmentId) {
- LOG.warn("profileFragmentId {} should be in [0, {})",
profileFragmentId, maxFragmentId);
- return false;
+ cancelInstance(cancelReason);
+ return;
}
- // profile.computeTimeInProfile();
- return true;
}
public boolean isBackendStateHealthy() {
@@ -4020,22 +4045,6 @@ public class Coordinator implements CoordInterface {
return result;
}
- private void attachInstanceProfileToFragmentProfile() {
- if (enablePipelineEngine) {
- for (PipelineExecContext ctx : pipelineExecContexts.values()) {
- if (!enablePipelineXEngine) {
- ctx.profileStream()
- .forEach(p ->
executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
- }
- }
- } else {
- for (BackendExecState backendExecState : backendExecStates) {
-
executionProfile.addInstanceProfile(backendExecState.profileFragmentId,
- backendExecState.instanceProfile);
- }
- }
- }
-
// Runtime filter target fragment instance param
static class FRuntimeFilterTargetParam {
public TUniqueId targetFragmentInstanceId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 4b0302cc403..f41e9a4d896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -168,12 +168,6 @@ public class PointQueryExec implements CoordInterface {
requestBuilder.addKeyTuples(kBuilder);
}
- @Override
- public int getInstanceTotalNum() {
- // TODO
- return 1;
- }
-
@Override
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
// Do nothing
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
index 44999ecef64..c5aff2c9d5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
@@ -31,8 +31,6 @@ public interface QeProcessor {
TReportExecStatusResult reportExecStatus(TReportExecStatusParams params,
TNetworkAddress beAddr);
- void registerQuery(TUniqueId queryId, Coordinator coord) throws
UserException;
-
void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info)
throws UserException;
void registerInstances(TUniqueId queryId, Integer instancesNum) throws
UserException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 03144fc797c..a62f1b66f08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.ExecutionProfile;
import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
import org.apache.doris.thrift.TNetworkAddress;
@@ -53,6 +54,7 @@ public final class QeProcessorImpl implements QeProcessor {
private Map<TUniqueId, Integer> queryToInstancesNum;
private Map<String, AtomicInteger> userToInstancesCount;
+ private ExecutorService writeProfileExecutor;
public static final QeProcessor INSTANCE;
@@ -60,15 +62,13 @@ public final class QeProcessorImpl implements QeProcessor {
INSTANCE = new QeProcessorImpl();
}
- private ExecutorService writeProfileExecutor;
-
private QeProcessorImpl() {
coordinatorMap = new ConcurrentHashMap<>();
- // write profile to ProfileManager when query is running.
- writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1,
100,
- "profile-write-pool", true);
queryToInstancesNum = new ConcurrentHashMap<>();
userToInstancesCount = new ConcurrentHashMap<>();
+ // write profile to ProfileManager when query is running.
+ writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(3,
100,
+ "profile-write-pool", true);
}
@Override
@@ -90,11 +90,6 @@ public final class QeProcessorImpl implements QeProcessor {
return res;
}
- @Override
- public void registerQuery(TUniqueId queryId, Coordinator coord) throws
UserException {
- registerQuery(queryId, new QueryInfo(coord));
- }
-
@Override
public void registerQuery(TUniqueId queryId, QueryInfo info) throws
UserException {
if (LOG.isDebugEnabled()) {
@@ -104,6 +99,10 @@ public final class QeProcessorImpl implements QeProcessor {
if (result != null) {
throw new UserException("queryId " + queryId + " already exists");
}
+
+ // Should add the execution profile to profile manager, BE will report
the profile to FE and FE
+ // will update it in ProfileManager
+
ProfileManager.getInstance().addExecutionProfile(info.getCoord().getExecutionProfile());
}
@Override
@@ -145,7 +144,18 @@ public final class QeProcessorImpl implements QeProcessor {
if (LOG.isDebugEnabled()) {
LOG.debug("Deregister query id {}",
DebugUtil.printId(queryId));
}
-
+ ExecutionProfile executionProfile =
ProfileManager.getInstance().getExecutionProfile(queryId);
+ if (executionProfile != null) {
+
executionProfile.setQueryFinishTime(System.currentTimeMillis());
+ if (queryInfo.connectContext != null) {
+ long autoProfileThresholdMs = queryInfo.connectContext
+ .getSessionVariable().getAutoProfileThresholdMs();
+ if (autoProfileThresholdMs > 0 &&
System.currentTimeMillis() - queryInfo.getStartExecTime()
+ < autoProfileThresholdMs) {
+
ProfileManager.getInstance().removeProfile(executionProfile.getSummaryProfile().getProfileId());
+ }
+ }
+ }
if (queryInfo.getConnectContext() != null
&&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
@@ -187,7 +197,7 @@ public final class QeProcessorImpl implements QeProcessor {
.connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
.catalog(context.getDefaultCatalog())
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
-
.profile(info.getCoord().getExecutionProfile().getExecutionProfile())
+ .profile(info.getCoord().getExecutionProfile().getRoot())
.isReportSucc(context.getSessionVariable().enableProfile()).build();
querySet.put(queryIdStr, item);
}
@@ -196,13 +206,25 @@ public final class QeProcessorImpl implements QeProcessor
{
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams
params, TNetworkAddress beAddr) {
- if (params.isSetProfile()) {
+ if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
LOG.info("ReportExecStatus(): fragment_instance_id={}, query
id={}, backend num: {}, ip: {}",
DebugUtil.printId(params.fragment_instance_id),
DebugUtil.printId(params.query_id),
params.backend_num, beAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("params: {}", params);
}
+ ExecutionProfile executionProfile =
ProfileManager.getInstance().getExecutionProfile(params.query_id);
+ if (executionProfile != null) {
+ // Update profile may cost a lot of time, use a seperate pool
to deal with it.
+ writeProfileExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ executionProfile.updateProfile(params, beAddr);
+ }
+ });
+ } else {
+ LOG.info("Could not find execution profile with query id {}",
DebugUtil.printId(params.query_id));
+ }
}
final TReportExecStatusResult result = new TReportExecStatusResult();
@@ -229,12 +251,9 @@ public final class QeProcessorImpl implements QeProcessor {
}
try {
info.getCoord().updateFragmentExecStatus(params);
- if (params.isSetProfile()) {
- writeProfileExecutor.submit(new WriteProfileTask(params,
info));
- }
} catch (Exception e) {
LOG.warn("Exception during handle report, response: {}, query: {},
instance: {}", result.toString(),
- DebugUtil.printId(params.query_id),
DebugUtil.printId(params.fragment_instance_id));
+ DebugUtil.printId(params.query_id),
DebugUtil.printId(params.fragment_instance_id), e);
return result;
}
result.setStatus(new TStatus(TStatusCode.OK));
@@ -266,6 +285,7 @@ public final class QeProcessorImpl implements QeProcessor {
private final ConnectContext connectContext;
private final Coordinator coord;
private final String sql;
+ private long registerTimeMs = 0L;
// from Export, Pull load, Insert
public QueryInfo(Coordinator coord) {
@@ -277,6 +297,7 @@ public final class QeProcessorImpl implements QeProcessor {
this.connectContext = connectContext;
this.coord = coord;
this.sql = sql;
+ this.registerTimeMs = System.currentTimeMillis();
}
public ConnectContext getConnectContext() {
@@ -295,7 +316,7 @@ public final class QeProcessorImpl implements QeProcessor {
if (coord.getQueueToken() != null) {
return coord.getQueueToken().getQueueEndTime();
}
- return -1;
+ return registerTimeMs;
}
public long getQueueStartTime() {
@@ -319,26 +340,4 @@ public final class QeProcessorImpl implements QeProcessor {
return null;
}
}
-
- private class WriteProfileTask implements Runnable {
- private TReportExecStatusParams params;
-
- private QueryInfo queryInfo;
-
- WriteProfileTask(TReportExecStatusParams params, QueryInfo queryInfo) {
- this.params = params;
- this.queryInfo = queryInfo;
- }
-
- @Override
- public void run() {
- QueryInfo info = coordinatorMap.get(params.query_id);
- if (info == null) {
- return;
- }
-
- ExecutionProfile executionProfile =
info.getCoord().getExecutionProfile();
- executionProfile.update(-1, false);
- }
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7ae0b9c6301..20a9ad42bd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -84,6 +84,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
+ public static final String AUTO_PROFILE_THRESHOLD_MS =
"auto_profile_threshold_ms";
public static final String SQL_MODE = "sql_mode";
public static final String WORKLOAD_VARIABLE = "workload_group";
public static final String RESOURCE_VARIABLE = "resource_group";
@@ -629,6 +630,10 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;
+ // if true, need report to coordinator when plan fragment execute
successfully.
+ @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
+ public int autoProfileThresholdMs = -1;
+
@VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
public boolean runtimeFilterPruneForExternal = true;
@@ -1969,6 +1974,10 @@ public class SessionVariable implements Serializable,
Writable {
return enableProfile;
}
+ public int getAutoProfileThresholdMs() {
+ return this.autoProfileThresholdMs;
+ }
+
public boolean enableSingleDistinctColumnOpt() {
return enableSingleDistinctColumnOpt;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f31fe76bca4..6236009a7ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -220,7 +220,6 @@ public class StmtExecutor {
private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
public static final String NULL_VALUE_FOR_LOAD = "\\N";
- private final Object writeProfileLock = new Object();
private ConnectContext context;
private final StatementContext statementContext;
private MysqlSerializer serializer;
@@ -260,7 +259,9 @@ public class StmtExecutor {
this.isProxy = isProxy;
this.statementContext = new StatementContext(context, originStmt);
this.context.setStatementContext(statementContext);
- this.profile = new Profile("Query",
this.context.getSessionVariable().enableProfile);
+ this.profile = new Profile("Query",
this.context.getSessionVariable().enableProfile,
+ this.context.getSessionVariable().profileLevel,
+ this.context.getSessionVariable().getEnablePipelineXEngine());
}
// for test
@@ -290,7 +291,8 @@ public class StmtExecutor {
this.statementContext.setParsedStatement(parsedStmt);
}
this.context.setStatementContext(statementContext);
- this.profile = new Profile("Query",
context.getSessionVariable().enableProfile());
+ this.profile = new Profile("Query",
context.getSessionVariable().enableProfile(),
+ context.getSessionVariable().profileLevel,
context.getSessionVariable().getEnablePipelineXEngine());
}
public static InternalService.PDataRow getRowStringValue(List<Expr> cols)
throws UserException {
@@ -993,9 +995,7 @@ public class StmtExecutor {
// and ensure the sql is finished normally. For example, if update
profile
// failed, the insert stmt should be success
try {
- profile.update(context.startTime, getSummaryInfo(isFinished),
isFinished,
- context.getSessionVariable().profileLevel, this.planner,
- context.getSessionVariable().getEnablePipelineXEngine());
+ profile.updateSummary(context.startTime,
getSummaryInfo(isFinished), isFinished, this.planner);
} catch (Throwable t) {
LOG.warn("failed to update profile, ingore this error", t);
}
@@ -1600,9 +1600,9 @@ public class StmtExecutor {
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {
coord = new Coordinator(context, analyzer, planner,
context.getStatsErrorEstimator());
+ profile.addExecutionProfile(coord.getExecutionProfile());
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context,
originStmt.originStmt, coord));
- profile.addExecutionProfile(coord.getExecutionProfile());
coordBase = coord;
}
@@ -1610,35 +1610,10 @@ public class StmtExecutor {
coordBase.exec();
profile.getSummaryProfile().setQueryScheduleFinishTime();
updateProfile(false);
- if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Start to execute fragment. user: {}, db:
{}, sql: {}, fragment instance num: {}",
- context.getQualifiedUser(),
context.getDatabase(),
-
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
- coordBase.getInstanceTotalNum());
- }
- } catch (Exception e) {
- LOG.warn("Fail to print fragment concurrency for Query.",
e);
- }
- }
if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL))
{
Preconditions.checkState(!context.isReturnResultFromLocal());
profile.getSummaryProfile().setTempStartTime();
- if (coordBase.getInstanceTotalNum() > 1 &&
LOG.isDebugEnabled()) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finish to execute fragment. user: {},
db: {}, sql: {}, "
- + "fragment instance num: {}",
- context.getQualifiedUser(),
context.getDatabase(),
-
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
- coordBase.getInstanceTotalNum());
- }
- } catch (Exception e) {
- LOG.warn("Fail to print fragment concurrency for
Query.", e);
- }
- }
return;
}
@@ -1723,18 +1698,6 @@ public class StmtExecutor {
throw e;
} finally {
coordBase.close();
- if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Finish to execute fragment. user: {}, db:
{}, sql: {}, fragment instance num: {}",
- context.getQualifiedUser(),
context.getDatabase(),
-
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
- coordBase.getInstanceTotalNum());
- }
- } catch (Exception e) {
- LOG.warn("Fail to print fragment concurrency for Query.",
e);
- }
- }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 50c24a42330..3f4bb846767 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -22,6 +22,7 @@ import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.thrift.TNetworkAddress;
+import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
@@ -82,7 +83,7 @@ public class BackendServiceClient {
.execPlanFragmentStart(request);
}
- public Future<InternalService.PCancelPlanFragmentResult>
cancelPlanFragmentAsync(
+ public ListenableFuture<InternalService.PCancelPlanFragmentResult>
cancelPlanFragmentAsync(
InternalService.PCancelPlanFragmentRequest request) {
return stub.cancelPlanFragment(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index af21194263f..5a89614bab7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -36,6 +36,7 @@ import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -225,7 +226,7 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PCancelPlanFragmentResult>
cancelPlanFragmentAsync(TNetworkAddress address,
+ public ListenableFuture<InternalService.PCancelPlanFragmentResult>
cancelPlanFragmentAsync(TNetworkAddress address,
TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason)
throws RpcException {
final InternalService.PCancelPlanFragmentRequest pRequest =
InternalService.PCancelPlanFragmentRequest.newBuilder()
@@ -241,8 +242,8 @@ public class BackendServiceProxy {
}
}
- public Future<InternalService.PCancelPlanFragmentResult>
cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
- PlanFragmentId fragmentId, TUniqueId queryId,
+ public ListenableFuture<InternalService.PCancelPlanFragmentResult>
cancelPipelineXPlanFragmentAsync(
+ TNetworkAddress address, PlanFragmentId fragmentId, TUniqueId
queryId,
Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
final InternalService.PCancelPlanFragmentRequest pRequest =
InternalService.PCancelPlanFragmentRequest
.newBuilder()
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
index 15b4175759c..56ed66c0504 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
@@ -98,7 +98,7 @@ public class RuntimeProfileTest {
@Test
public void testCounter() {
- RuntimeProfile profile = new RuntimeProfile();
+ RuntimeProfile profile = new RuntimeProfile("test counter");
profile.addCounter("key", TUnit.UNIT, "");
Assert.assertNotNull(profile.getCounterMap().get("key"));
Assert.assertNull(profile.getCounterMap().get("key2"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 29c0adae124..cd9e0cb048a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -33,6 +33,7 @@ import org.apache.doris.analysis.UseStmt;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.profile.Profile;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
@@ -172,7 +173,8 @@ public class StmtExecutorTest {
public void testSelect(@Mocked QueryStmt queryStmt,
@Mocked SqlParser parser,
@Mocked OriginalPlanner planner,
- @Mocked Coordinator coordinator) throws Exception {
+ @Mocked Coordinator coordinator,
+ @Mocked Profile profile) throws Exception {
Env env = Env.getCurrentEnv();
Deencapsulation.setField(env, "canRead", new AtomicBoolean(true));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]