This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7056f99d46a [profile](Brokerload) Support get broker load profile
(#35110)
7056f99d46a is described below
commit 7056f99d46abe9865ec5a614c30892d5f1c3737a
Author: zhiqiang <[email protected]>
AuthorDate: Wed May 22 15:15:57 2024 +0800
[profile](Brokerload) Support get broker load profile (#35110)
---
.../doris/common/profile/ExecutionProfile.java | 11 +++
.../org/apache/doris/common/profile/Profile.java | 1 +
.../apache/doris/common/util/ProfileManager.java | 83 +++++++++++++++++-----
.../apache/doris/load/loadv2/BrokerLoadJob.java | 4 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 16 +++++
.../main/java/org/apache/doris/qe/Coordinator.java | 4 ++
.../java/org/apache/doris/qe/QeProcessorImpl.java | 2 +-
7 files changed, 100 insertions(+), 21 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 76420045be6..e8b450b530c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -250,8 +250,14 @@ public class ExecutionProfile {
}
public Status updateProfile(TQueryProfile profile, TNetworkAddress
backendHBAddress, boolean isDone) {
+ if (!profile.isSetQueryId()) {
+ LOG.warn("QueryId is not set");
+ return new Status(TStatusCode.INVALID_ARGUMENT, "QueryId is not
set");
+ }
+
if (isPipelineXProfile) {
if (!profile.isSetFragmentIdToProfile()) {
+ LOG.warn("{} FragmentIdToProfile is not set",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT,
"FragmentIdToProfile is not set");
}
@@ -266,6 +272,7 @@ public class ExecutionProfile {
RuntimeProfile profileNode = new RuntimeProfile(name);
taskProfile.add(profileNode);
if (!pipelineProfile.isSetProfile()) {
+ LOG.warn("Profile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT,
"Profile is not set");
}
@@ -278,10 +285,13 @@ public class ExecutionProfile {
}
} else {
if (!profile.isSetInstanceProfiles() ||
!profile.isSetFragmentInstanceIds()) {
+ LOG.warn("InstanceIdToProfile is not set, {}",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile is not set");
}
if (profile.fragment_instance_ids.size() !=
profile.instance_profiles.size()) {
+ LOG.warn("InstanceIdToProfile size is not equal, {}",
+ DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT,
"InstanceIdToProfile size is not equal");
}
@@ -289,6 +299,7 @@ public class ExecutionProfile {
TUniqueId instanceId =
profile.getFragmentInstanceIds().get(idx);
TRuntimeProfileTree instanceProfile =
profile.getInstanceProfiles().get(idx);
if (instanceProfile == null) {
+ LOG.warn("Profile is not set {}",
DebugUtil.printId(profile.getQueryId()));
return new Status(TStatusCode.INVALID_ARGUMENT, "Profile
is not set");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index da0790ff57c..f94e0d235ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -140,6 +140,7 @@ public class Profile {
}
}
try {
+ // For load task, they will have multiple execution_profiles.
for (ExecutionProfile executionProfile : executionProfiles) {
builder.append("\n");
executionProfile.getRoot().prettyPrint(builder, "");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 5b36d81dd68..eaeb3635c58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -17,6 +17,7 @@
package org.apache.doris.common.util;
+import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.ClientPool;
@@ -29,6 +30,7 @@ import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.profile.ProfileTreeBuilder;
import org.apache.doris.common.profile.ProfileTreeNode;
import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.qe.CoordInterface;
import org.apache.doris.qe.QeProcessorImpl;
@@ -326,28 +328,72 @@ public class ProfileManager {
return resp;
}
- public String getProfile(String queryID) {
- TUniqueId thriftQueryId = Util.parseTUniqueIdFromString(queryID);
- List<TNetworkAddress> involvedBackends = null;
- if (thriftQueryId != null) {
- CoordInterface coor =
QeProcessorImpl.INSTANCE.getCoordinator(thriftQueryId);
- if (coor != null) {
- involvedBackends = coor.getInvolvedBackends();
- }
+ private List<Future<TGetRealtimeExecStatusResponse>>
createFetchRealTimeProfileTasks(String id) {
+ // For query, id is queryId, for load, id is LoadLoadingTaskId
+ class QueryIdAndAddress {
+ public TUniqueId id;
+ public TNetworkAddress beAddress;
}
List<Future<TGetRealtimeExecStatusResponse>> futures =
Lists.newArrayList();
+ TUniqueId queryId = Util.parseTUniqueIdFromString(id);
+ List<QueryIdAndAddress> involvedBackends = Lists.newArrayList();
+
+ if (queryId != null) {
+ CoordInterface coor =
QeProcessorImpl.INSTANCE.getCoordinator(queryId);
- if (involvedBackends != null) {
- for (TNetworkAddress beAddress : involvedBackends) {
- Callable<TGetRealtimeExecStatusResponse> task = () -> {
- return getRealtimeQueryProfile(thriftQueryId, beAddress);
- };
- Future<TGetRealtimeExecStatusResponse> future =
fetchRealTimeProfileExecutor.submit(task);
- futures.add(future);
+ if (coor != null) {
+ for (TNetworkAddress addr : coor.getInvolvedBackends()) {
+ QueryIdAndAddress tmp = new QueryIdAndAddress();
+ tmp.id = queryId;
+ tmp.beAddress = addr;
+ involvedBackends.add(tmp);
+ }
}
+ } else {
+ Long loadJobId = (long) -1;
+ try {
+ loadJobId = Long.parseLong(id);
+ } catch (Exception e) {
+ return futures;
+ }
+
+ LoadJob loadJob =
Env.getCurrentEnv().getLoadManager().getLoadJob(loadJobId);
+ if (loadJob.getLoadTaskIds() == null) {
+ return futures;
+ }
+
+ for (TUniqueId taskId : loadJob.getLoadTaskIds()) {
+ CoordInterface coor =
QeProcessorImpl.INSTANCE.getCoordinator(taskId);
+ if (coor != null) {
+ if (coor.getInvolvedBackends() != null) {
+ for (TNetworkAddress beAddress :
coor.getInvolvedBackends()) {
+ QueryIdAndAddress tmp = new QueryIdAndAddress();
+ tmp.id = taskId;
+ tmp.beAddress = beAddress;
+ involvedBackends.add(tmp);
+ }
+ } else {
+ LOG.warn("Involved backends is null, load job {}, task
{}", id, DebugUtil.printId(taskId));
+ }
+ }
+ }
+ }
+
+ for (QueryIdAndAddress idAndAddress : involvedBackends) {
+ Callable<TGetRealtimeExecStatusResponse> task = () -> {
+ return getRealtimeQueryProfile(idAndAddress.id,
idAndAddress.beAddress);
+ };
+ Future<TGetRealtimeExecStatusResponse> future =
fetchRealTimeProfileExecutor.submit(task);
+ futures.add(future);
}
+ return futures;
+ }
+
+ public String getProfile(String id) {
+ List<Future<TGetRealtimeExecStatusResponse>> futures =
createFetchRealTimeProfileTasks(id);
+
// beAddr of reportExecStatus of QeProcessorImpl is meaningless, so
assign a dummy address
// to avoid compile failing.
TNetworkAddress dummyAddr = new TNetworkAddress();
@@ -358,18 +404,17 @@ public class ProfileManager {
QeProcessorImpl.INSTANCE.reportExecStatus(resp.getReportExecStatusParams(),
dummyAddr);
}
} catch (Exception e) {
- LOG.warn("Failed to get real-time profile, query {}, error:
{}",
- DebugUtil.printId(thriftQueryId), e.getMessage(), e);
+ LOG.warn("Failed to get real-time profile, id {}, error: {}",
id, e.getMessage(), e);
}
}
if (!futures.isEmpty()) {
- LOG.info("Get real-time exec status finished, query {}", queryID);
+ LOG.info("Get real-time exec status finished, id {}", id);
}
readLock.lock();
try {
- ProfileElement element = queryIdToProfileMap.get(queryID);
+ ProfileElement element = queryIdToProfileMap.get(id);
if (element == null) {
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index fd70e03caa7..275cd371cdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -238,6 +238,8 @@ public class BrokerLoadJob extends BulkLoadJob {
this.jobProfile = new Profile("BrokerLoadJob " + id + ". " +
label, true,
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL,
"3")),
false);
+ // profile is registered in ProfileManager, so that we can get
realtime profile
+ jobProfile.updateSummary(loadStartTimestamp,
getSummaryInfo(false), false, null);
}
ProgressManager progressManager = Env.getCurrentProgressManager();
progressManager.registerProgressSimple(String.valueOf(id));
@@ -391,7 +393,7 @@ public class BrokerLoadJob extends BulkLoadJob {
builder.endTime(TimeUtils.longToTimeString(currentTimestamp));
builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp -
createTimestamp));
}
- builder.taskState("FINISHED");
+ builder.taskState(isFinished ? "FINISHED" : "RUNNING");
builder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser()
: "N/A");
builder.defaultDb(getDefaultDb());
builder.sqlStatement(getOriginStmt().originStmt);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 65befadb13a..f28394957bd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -700,6 +700,22 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
failMsg = loadJobFinalOperation.getFailMsg();
}
+ public List<TUniqueId> getLoadTaskIds() {
+ readLock();
+ try {
+ List<TUniqueId> res = Lists.newArrayList();
+ for (LoadTask task : idToTasks.values()) {
+ if (task instanceof LoadLoadingTask) {
+ LoadLoadingTask loadLoadingTask = (LoadLoadingTask) task;
+ res.add(loadLoadingTask.getLoadId());
+ }
+ }
+ return res;
+ } finally {
+ readUnlock();
+ }
+ }
+
public List<Comparable> getShowInfo() throws DdlException {
readLock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 07b5daefaa1..4467a7150ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -389,6 +389,10 @@ public class Coordinator implements CoordInterface {
//
https://github.com/apache/doris/blob/bd6f5b6a0e5f1b12744607336123d7f97eb76af9/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java#L155
this.enablePipelineEngine = Config.enable_pipeline_load;
this.enablePipelineXEngine = Config.enable_pipeline_load;
+ // make sure Coordinator can update profile correctlly
+ if (this.enablePipelineXEngine) {
+ this.executionProfile.setPipelineX();
+ }
}
private void setFromUserProperty(ConnectContext connectContext) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index f6b4671b20a..b762fdda87c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -247,7 +247,7 @@ public final class QeProcessorImpl implements QeProcessor {
}
} else {
LOG.warn("Invalid report profile req, this is a logical error,
BE must set backendId and isDone"
- + " at same time, query id: {}" +
DebugUtil.printId(params.query_id));
+ + " at same time, query id: {}",
DebugUtil.printId(params.query_id));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]