This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 23cddd0e24a [refactor][profile] log enhancement and minor change of
profile (#46564)
23cddd0e24a is described below
commit 23cddd0e24ada257cb58b52907146fef728dc723
Author: zhiqiang <[email protected]>
AuthorDate: Mon Jan 13 09:48:26 2025 +0800
[refactor][profile] log enhancement and minor change of profile (#46564)
---
be/src/runtime/query_context.cpp | 5 +-
be/src/runtime/runtime_query_statistics_mgr.cpp | 6 +-
.../apache/doris/common/proc/BackendsProcDir.java | 2 +-
.../common/proc/CurrentQueryInfoProvider.java | 4 +-
.../doris/common/{util => profile}/AggCounter.java | 2 +-
.../doris/common/{util => profile}/Counter.java | 2 +-
.../doris/common/profile/ExecutionProfile.java | 12 +-
.../org/apache/doris/common/profile/Profile.java | 64 +++--
.../doris/common/profile/ProfileManager.java | 119 ++++++---
.../common/{util => profile}/RuntimeProfile.java | 4 +-
.../doris/common/profile/SummaryProfile.java | 5 +-
.../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +-
.../nereids/trees/plans/commands/LoadCommand.java | 3 +-
.../java/org/apache/doris/qe/QeProcessorImpl.java | 34 ++-
.../org/apache/doris/qe/QueryStatisticsItem.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
.../doris/common/profile/ProfileManagerTest.java | 277 +++++++++++++++++++++
.../{util => profile}/ProfilePersistentTest.java | 8 +-
.../{util => profile}/RuntimeProfileTest.java | 2 +-
.../pipeline/cloud_p0/conf/be_custom.conf | 2 +
.../pipeline/cloud_p0/conf/fe_custom.conf | 5 +-
regression-test/pipeline/p0/conf/be.conf | 2 +-
regression-test/pipeline/p0/conf/fe.conf | 5 +
23 files changed, 468 insertions(+), 103 deletions(-)
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c777c8100ef..1ff08f7d9bb 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -351,8 +351,9 @@ void QueryContext::add_fragment_profile(
#endif
std::lock_guard<std::mutex> l(_profile_mutex);
- LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
- print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+ VLOG_ROW << fmt::format(
+ "Query add fragment profile, query {}, fragment {}, pipeline
profile count {} ",
+ print_id(this->_query_id), fragment_id, pipeline_profiles.size());
_profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 93d5256cad7..ebcaf30eab1 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -65,6 +65,8 @@ static Status _do_report_exec_stats_rpc(const
TNetworkAddress& coor_addr,
return Status::RpcError("Client rpc client failed");
}
+ VLOG_DEBUG << "Sending profile";
+
try {
try {
rpc_client->reportExecStatus(res, req);
@@ -272,13 +274,13 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile(
void RuntimeQueryStatisticsMgr::_report_query_profiles_function() {
decltype(_profile_map) profile_copy;
decltype(_load_channel_profile_map) load_channel_profile_copy;
-
+ VLOG_DEBUG << "Beging reporting profile";
{
std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
_profile_map.swap(profile_copy);
_load_channel_profile_map.swap(load_channel_profile_copy);
}
-
+ VLOG_DEBUG << "After swap profile map";
// query_id -> {coordinator_addr, {fragment_id ->
std::vectpr<pipeline_profile>}}
for (auto& entry : profile_copy) {
const auto& query_id = entry.first;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
index c5273304137..e3db2a3a716 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
@@ -20,9 +20,9 @@ package org.apache.doris.common.proc;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.profile.RuntimeProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.ListComparator;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
index 2b6d8f6702e..de7247ab3ab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -18,9 +18,9 @@
package org.apache.doris.common.proc;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.util.Counter;
+import org.apache.doris.common.profile.Counter;
+import org.apache.doris.common.profile.RuntimeProfile;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
similarity index 98%
rename from
fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java
rename to
fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
index 3f080b3752f..ab3b43bd3e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
import java.util.LinkedList;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
similarity index 98%
rename from fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
rename to fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
index f6c06890047..f306d7c73fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d2300cd667d..fffcca49bf6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.profile;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TNetworkAddress;
@@ -235,7 +234,9 @@ public class ExecutionProfile {
String suffix = " (host=" + backendHBAddress + ")";
for (TDetailedReportParams pipelineProfile : fragmentProfile) {
String name = "";
- if (pipelineProfile.isSetIsFragmentLevel() &&
pipelineProfile.is_fragment_level) {
+ boolean isFragmentLevel =
(pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level);
+ if (isFragmentLevel) {
+ // Fragment Level profile is also represented by
TDetailedReportParams.
name = "Fragment Level Profile: " + suffix;
} else {
name = "Pipeline :" + pipelineIdx + " " + suffix;
@@ -243,9 +244,9 @@ public class ExecutionProfile {
}
RuntimeProfile profileNode = new RuntimeProfile(name);
- // The taskprofile is used to save the profile of the
pipeline, without
+ // The taskProfile is used to save the profile of the
pipeline, without
// considering the FragmentLevel.
- if (!(pipelineProfile.isSetIsFragmentLevel() &&
pipelineProfile.is_fragment_level)) {
+ if (!isFragmentLevel) {
taskProfile.add(profileNode);
}
if (!pipelineProfile.isSetProfile()) {
@@ -260,6 +261,9 @@ public class ExecutionProfile {
setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
}
+ LOG.info("Profile update finished query: {} fragments: {} isDone: {}",
+ DebugUtil.printId(getQueryId()),
profile.getFragmentIdToProfile().size(), isDone);
+
if (profile.isSetLoadChannelProfiles()) {
for (TRuntimeProfileTree loadChannelProfile :
profile.getLoadChannelProfiles()) {
this.loadChannelProfile.update(loadChannelProfile);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index a5173247403..80a79ebffbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.profile;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -81,10 +80,6 @@ public class Profile {
// profile file name format: time_id
private static final String SEPERATOR = "_";
- // id will be assigned to id of SummaryProfile.
- // For broker load, its SummaryProfile id is a string representation of a
long integer,
- // for others, it is queryID
- private String id = "";
// summaryProfile will be serialized to storage as JSON, and we can
recover it from storage
// recover of SummaryProfile is important, because it contains the meta
information of the profile
// we need it to construct memory index for profile retrieving.
@@ -103,7 +98,7 @@ public class Profile {
// when coordinator finishes, it will mark finish time.
// we will wait for about 5 seconds to see if all profiles have been
reported.
// if not, we will store the profile to storage, and release the memory,
- // futher report will be ignored.
+ // further report will be ignored.
// why MAX_VALUE? So that we can use PriorityQueue to sort profile by
finish time decreasing order.
private long queryFinishTimestamp = Long.MAX_VALUE;
private Map<Integer, String> planNodeMap = Maps.newHashMap();
@@ -193,7 +188,6 @@ public class Profile {
DataInput dataInput = new DataInputStream(profileFileInputStream);
Profile res = new Profile();
res.summaryProfile = SummaryProfile.read(dataInput);
- res.setId(res.summaryProfile.getProfileId());
res.profileStoragePath = path;
res.isQueryFinished = true;
res.profileSize = fileSize;
@@ -262,7 +256,7 @@ public class Profile {
// For load task, the profile contains many execution profiles
public void addExecutionProfile(ExecutionProfile executionProfile) {
if (executionProfile == null) {
- LOG.warn("try to set a null excecution profile, it is abnormal",
new Exception());
+ LOG.warn("try to set a null execution profile, it is abnormal",
new Exception());
return;
}
executionProfile.setSummaryProfile(summaryProfile);
@@ -298,18 +292,17 @@ public class Profile {
}
summaryProfile.update(summaryInfo);
- this.setId(summaryProfile.getProfileId());
if (isFinished) {
this.markQueryFinished(System.currentTimeMillis());
}
- // Nerids native insert not set planner, so it is null
+ // Nereids native insert not set planner, so it is null
if (planner != null) {
this.planNodeMap = planner.getExplainStringMap();
}
ProfileManager.getInstance().pushProfile(this);
} catch (Throwable t) {
- LOG.warn("update profile {} failed", id, t);
+ LOG.warn("update profile {} failed", getId(), t);
throw t;
}
}
@@ -352,7 +345,7 @@ public class Profile {
}
private RuntimeProfile composeRootProfile() {
- RuntimeProfile rootProfile = new RuntimeProfile(id);
+ RuntimeProfile rootProfile = new RuntimeProfile(getId());
rootProfile.addChild(summaryProfile.getSummary());
rootProfile.addChild(summaryProfile.getExecutionSummary());
for (ExecutionProfile executionProfile : executionProfiles) {
@@ -378,8 +371,7 @@ public class Profile {
return;
}
- // Only generate merged profile for select, insert into select.
- // Not support broker load now.
+ // For broker load, if it has more than one execution profile, we will
not generate merged profile.
RuntimeProfile mergedProfile = null;
if (this.profileLevel == MergedProfileLevel &&
this.executionProfiles.size() == 1) {
try {
@@ -389,7 +381,7 @@ public class Profile {
updateActualRowCountOnPhysicalPlan(physicalPlan);
}
} catch (Throwable aggProfileException) {
- LOG.warn("build merged simple profile {} failed", this.id,
aggProfileException);
+ LOG.warn("build merged simple profile {} failed", getId(),
aggProfileException);
}
}
@@ -433,10 +425,6 @@ public class Profile {
return this.queryFinishTimestamp;
}
- public void setId(String id) {
- this.id = id;
- }
-
// For UT
public void setSummaryProfile(SummaryProfile summaryProfile) {
this.summaryProfile = summaryProfile;
@@ -483,7 +471,7 @@ public class Profile {
> (this.executionProfiles.size() * autoProfileDurationMs))
{
if (LOG.isDebugEnabled()) {
LOG.debug("Query/LoadJob {} costs {} ms, begin {} finish
{}, need store its profile",
- id, durationMs,
summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp);
+ getId(), durationMs,
summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp);
}
return true;
}
@@ -491,7 +479,7 @@ public class Profile {
}
if (this.queryFinishTimestamp == Long.MAX_VALUE) {
- LOG.warn("Logical error, query {} has finished, but
queryFinishTimestamp is not set,", id);
+ LOG.warn("Logical error, query {} has finished, but
queryFinishTimestamp is not set,", getId());
return false;
}
@@ -501,7 +489,8 @@ public class Profile {
> Config.profile_waiting_time_for_spill_seconds *
1000) {
LOG.warn("Profile {} should be stored to storage without waiting
for incoming profile,"
+ " since it has been waiting for {} ms, current time {}
query finished time: {}",
- id, currentTimeMillis - this.queryFinishTimestamp,
currentTimeMillis, this.queryFinishTimestamp);
+ getId(), currentTimeMillis - this.queryFinishTimestamp,
currentTimeMillis,
+ this.queryFinishTimestamp);
this.summaryProfile.setSystemMessage(
"This profile is not complete, since its
collection does not finish in time."
@@ -526,7 +515,7 @@ public class Profile {
public void markQueryFinished(long queryFinishTime) {
try {
if (this.profileHasBeenStored()) {
- LOG.error("Logical error, profile {} has already been stored
to storage", this.id);
+ LOG.error("Logical error, profile {} has already been stored
to storage", getId());
return;
}
@@ -539,13 +528,13 @@ public class Profile {
}
public void writeToStorage(String systemProfileStorageDir) {
- if (Strings.isNullOrEmpty(id)) {
+ if (Strings.isNullOrEmpty(getId())) {
LOG.warn("store profile failed, name is empty");
return;
}
if (!Strings.isNullOrEmpty(profileStoragePath)) {
- LOG.error("Logical error, profile {} has already been stored to
storage", id);
+ LOG.error("Logical error, profile {} has already been stored to
storage", getId());
return;
}
@@ -586,7 +575,7 @@ public class Profile {
dataOutputStream.flush();
this.profileSize = profileFile.length();
} catch (Exception e) {
- LOG.error("write {} summary profile failed", id, e);
+ LOG.error("write {} summary profile failed", getId(), e);
return;
} finally {
try {
@@ -690,7 +679,7 @@ public class Profile {
return;
}
- LOG.info("Profile {} has been stored to storage, reading it from
storage", id);
+ LOG.info("Profile {} has been stored to storage, reading it from
storage", getId());
FileInputStream fileInputStream = null;
@@ -728,4 +717,25 @@ public class Profile {
return;
}
+
+ public String debugInfo() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ProfileId:").append(getId()).append("|");
+ builder.append("StoragePath:").append(profileStoragePath).append("|");
+
builder.append("StartTimeStamp:").append(summaryProfile.getQueryBeginTime()).append("|");
+ builder.append("IsFinished:").append(isQueryFinished).append("|");
+
builder.append("FinishTimestamp:").append(queryFinishTimestamp).append("|");
+ builder.append("AutoProfileDuration:
").append(autoProfileDurationMs).append("|");
+ builder.append("ExecutionProfileCnt:
").append(executionProfiles.size()).append("|");
+ builder.append("ProfileOnStorageSize:").append(profileSize);
+ return builder.toString();
+ }
+
+ public void setQueryFinishTimestamp(long l) {
+ this.queryFinishTimestamp = l;
+ }
+
+ public String getId() {
+ return summaryProfile.getProfileId();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index ef9f2f7bbaf..f174b6b7dcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -65,7 +65,7 @@ import
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
* All attributes can be seen from the above.
*
* why the element in the finished profile array is not RuntimeProfile,
- * the purpose is let coordinator can destruct earlier(the fragment profile is
in Coordinator)
+ * the purpose is let coordinator can destruct earlier (the fragment profile
is in Coordinator)
*
*/
public class ProfileManager extends MasterDaemon {
@@ -83,7 +83,7 @@ public class ProfileManager extends MasterDaemon {
this.profile = profile;
}
- private final Profile profile;
+ final Profile profile;
public Map<String, String> infoStrings = Maps.newHashMap();
public String errMsg = "";
@@ -655,11 +655,6 @@ public class ProfileManager extends MasterDaemon {
}
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} profiles size on storage: {}", profileDeque.size(),
- DebugUtil.printByteWithUnit(totalProfileSize));
- }
-
final int maxSpilledProfileNum = Config.max_spilled_profile_num;
final long spilledProfileLimitBytes =
Config.spilled_profile_storage_limit_bytes;
List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList();
@@ -803,29 +798,57 @@ public class ProfileManager extends MasterDaemon {
}
}
+ // The init value of query finish time of profile is MAX_VALUE,
+ // So a more recent query will be on the top of the heap.
+ PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTimeDesc() {
+ readLock.lock();
+ try {
+ PriorityQueue<ProfileElement> queryIdDeque = new
PriorityQueue<>(Comparator.comparingLong(
+ (ProfileElement profileElement) ->
profileElement.profile.getQueryFinishTimestamp()).reversed());
+
+ queryIdToProfileMap.forEach((queryId, profileElement) -> {
+ queryIdDeque.add(profileElement);
+ });
+
+ return queryIdDeque;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
// The init value of query finish time of profile is MAX_VALUE
- // So more recent query will be on the top of heap.
- private PriorityQueue<ProfileElement>
getProfileOrderByQueryFinishTimeDesc() {
- PriorityQueue<ProfileElement> queryIdDeque = new
PriorityQueue<>(Comparator.comparingLong(
- (ProfileElement profileElement) ->
profileElement.profile.getQueryFinishTimestamp()).reversed());
+ // So query finished earlier will be on the top of heap
+ PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() {
+ readLock.lock();
+ try {
+ PriorityQueue<ProfileElement> queryIdDeque = new
PriorityQueue<>(Comparator.comparingLong(
+ (ProfileElement profileElement) ->
profileElement.profile.getQueryFinishTimestamp()));
- queryIdToProfileMap.forEach((queryId, profileElement) -> {
- queryIdDeque.add(profileElement);
- });
+ queryIdToProfileMap.forEach((queryId, profileElement) -> {
+ queryIdDeque.add(profileElement);
+ });
- return queryIdDeque;
+ return queryIdDeque;
+ } finally {
+ readLock.unlock();
+ }
}
// Older query will be on the top of heap
- private PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() {
- PriorityQueue<ProfileElement> queryIdDeque = new
PriorityQueue<>(Comparator.comparingLong(
- (ProfileElement profileElement) ->
profileElement.profile.getSummaryProfile().getQueryBeginTime()));
+ PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() {
+ readLock.lock();
+ try {
+ PriorityQueue<ProfileElement> queryIdDeque = new
PriorityQueue<>(Comparator.comparingLong(
+ (ProfileElement profileElement) ->
profileElement.profile.getSummaryProfile().getQueryBeginTime()));
- queryIdToProfileMap.forEach((queryId, profileElement) -> {
- queryIdDeque.add(profileElement);
- });
+ queryIdToProfileMap.forEach((queryId, profileElement) -> {
+ queryIdDeque.add(profileElement);
+ });
- return queryIdDeque;
+ return queryIdDeque;
+ } finally {
+ readLock.unlock();
+ }
}
// When the query is finished, the execution profile should be marked as
finished
@@ -885,7 +908,7 @@ public class ProfileManager extends MasterDaemon {
writeLock.unlock();
if (stringBuilder.length() != 0) {
LOG.warn("Remove expired execution profiles {}, current
execution profile map size {},"
- + "Config.max_query_profile_num{},
Config.profile_async_collect_expire_time_secs {}",
+ + "Config.max_query_profile_num {},
Config.profile_async_collect_expire_time_secs {}",
stringBuilder.toString(), executionProfileNum,
Config.max_query_profile_num,
Config.profile_async_collect_expire_time_secs);
}
@@ -894,7 +917,7 @@ public class ProfileManager extends MasterDaemon {
private void deleteOutdatedProfilesFromMemory() {
StringBuilder stringBuilder = new StringBuilder();
- int profileNum = 0;
+ StringBuilder stringBuilderTTL = new StringBuilder();
writeLock.lock();
try {
@@ -903,7 +926,13 @@ public class ProfileManager extends MasterDaemon {
for (ProfileElement profileElement :
this.queryIdToProfileMap.values()) {
if (profileElement.profile.shouldBeRemoveFromMemory()) {
-
profilesToRemove.add(profileElement.profile.getSummaryProfile().getProfileId());
+ String profileId =
profileElement.profile.getSummaryProfile().getProfileId();
+ profilesToRemove.add(profileId);
+ stringBuilder.append(profileId).append(",");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Profile {} should be filtered from memory,
information {}", profileId,
+ profileElement.profile.debugInfo());
+ }
}
}
@@ -913,34 +942,54 @@ public class ProfileManager extends MasterDaemon {
for (ExecutionProfile executionProfile :
profileElement.profile.getExecutionProfiles()) {
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
}
-
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
}
if (this.queryIdToProfileMap.size() <=
Config.max_query_profile_num) {
return;
}
- PriorityQueue<ProfileElement> queueIdDeque =
getProfileOrderByQueryStartTime();
+ // profile is ordered by query finish time
+ // query finished earlier will be on the top of heap
+ // query finished time of unfinished query is INT_MAX, so they
will be on the bottom of the heap.
+ PriorityQueue<ProfileElement> queueIdDeque =
getProfileOrderByQueryFinishTime();
- while (queueIdDeque.size() > Config.max_query_profile_num) {
+ while (queueIdDeque.size() > Config.max_query_profile_num &&
!queueIdDeque.isEmpty()) {
ProfileElement profileElement = queueIdDeque.poll();
-
-
queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId());
+ String profileId =
profileElement.profile.getSummaryProfile().getProfileId();
+ stringBuilderTTL.append(profileId).append(",");
+ queryIdToProfileMap.remove(profileId);
for (ExecutionProfile executionProfile :
profileElement.profile.getExecutionProfiles()) {
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
}
-
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove profile {} since ttl from memory, info
{}", profileId,
+ profileElement.profile.debugInfo());
+ }
}
} finally {
- profileNum = queryIdToProfileMap.size();
+ int profileNum = queryIdToProfileMap.size();
writeLock.unlock();
- if (stringBuilder.length() != 0) {
- LOG.info("Remove outdated profiles {} from memoy, current
profile map size {}",
- stringBuilder.toString(), profileNum);
+ if (stringBuilder.length() != 0 || stringBuilderTTL.length() != 0)
{
+ LOG.info("Filtered profiles {}, outdated profiles {}, they are
removed from memory,"
+ + " current profile map size {}",
+ stringBuilder.toString(), stringBuilderTTL.toString(),
profileNum);
+ }
+ }
+ }
+
+ String getDebugInfo() {
+ StringBuilder stringBuilder = new StringBuilder();
+ readLock.lock();
+ try {
+ for (ProfileElement profileElement : queryIdToProfileMap.values())
{
+
stringBuilder.append(profileElement.profile.debugInfo()).append("\n");
}
+ } finally {
+ readLock.unlock();
}
+ return stringBuilder.toString();
}
public List<List<String>> getProfileMetaWithType(ProfileType profileType,
long limit) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
similarity index 99%
rename from
fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
rename to
fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index 19082959034..78ff0f58ed2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.io.Text;
-import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCounter;
import org.apache.doris.thrift.TRuntimeProfileNode;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 5b0d5ba3533..7215b8a9c65 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -19,7 +19,6 @@ package org.apache.doris.common.profile;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TNetworkAddress;
@@ -543,8 +542,8 @@ public class SummaryProfile {
this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
}
- public void setQueryBeginTime() {
- this.queryBeginTime = TimeUtils.getStartTimeMs();
+ public void setQueryBeginTime(long queryBeginTime) {
+ this.queryBeginTime = queryBeginTime;
}
public void setQueryAnalysisFinishTime() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index b2fd6746f2f..efb2fcc06d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -247,7 +247,7 @@ public class BrokerLoadJob extends BulkLoadJob {
true,
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL,
"3")),
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.AUTO_PROFILE_THRESHOLD_MS,
"-1")));
- this.jobProfile.getSummaryProfile().setQueryBeginTime();
+
this.jobProfile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
// TODO: 怎么给这些 load job 设置 profile 记录时间
// this.jobProfile.setId("BrokerLoadJob " + id + ". " + label);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index aed3cb4c1f2..69cbf762c2a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.NereidsException;
import org.apache.doris.common.profile.Profile;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.base.JobExecutionConfiguration;
@@ -133,7 +134,7 @@ public class LoadCommand extends Command implements
ForwardWithSync {
ctx.getSessionVariable().enableProfile,
ctx.getSessionVariable().profileLevel,
ctx.getSessionVariable().getAutoProfileThresholdMs());
- profile.getSummaryProfile().setQueryBeginTime();
+
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
if (sourceInfos.size() == 1) {
plans = ImmutableList.of(new
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
Optional.of(labelName), Optional.empty(),
Optional.empty()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index d9b4583d71f..378d37d082f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -75,18 +75,25 @@ public final class QeProcessorImpl implements QeProcessor {
private Status processQueryProfile(TQueryProfile profile, TNetworkAddress
address, boolean isDone) {
ExecutionProfile executionProfile =
ProfileManager.getInstance().getExecutionProfile(profile.query_id);
if (executionProfile == null) {
- LOG.warn("Could not find execution profile with query id {}",
DebugUtil.printId(profile.query_id));
+ LOG.warn("Could not find execution profile, query {} be {}",
+ DebugUtil.printId(profile.query_id),
address.toString());
return new Status(TStatusCode.NOT_FOUND, "Could not find execution
profile with query id "
+ DebugUtil.printId(profile.query_id));
}
- // Update profile may cost a lot of time, use a seperate pool to deal
with it.
- writeProfileExecutor.submit(new Runnable() {
- @Override
- public void run() {
- executionProfile.updateProfile(profile, address, isDone);
- }
- });
+ // Update profile may cost a lot of time, use a separate pool to deal
with it.
+ try {
+ writeProfileExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ executionProfile.updateProfile(profile, address, isDone);
+ }
+ });
+ } catch (Exception e) {
+ LOG.warn("Failed to submit profile write task, query {} be {}",
+ DebugUtil.printId(profile.query_id),
address.toString());
+ return new Status(TStatusCode.INTERNAL_ERROR, "Failed to submit
profile write task");
+ }
return Status.OK;
}
@@ -231,9 +238,16 @@ public final class QeProcessorImpl implements QeProcessor {
// with profile in a single rpc, this will make FE ignore the exec
status and may lead to bug in query
// like insert into select.
if (params.isSetBackendId() && params.isSetDone()) {
+ LOG.info("Receive profile {} report from {}, isDone {},
fragments {}",
+
DebugUtil.printId(params.getQueryProfile().getQueryId()), beAddr.toString(),
+ params.isDone(),
params.getQueryProfile().fragment_id_to_profile.size());
+
Backend backend =
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
- boolean isDone = params.isDone();
- if (backend != null) {
+ if (backend == null) {
+ LOG.warn("Invalid report profile req, backend {} not
found, query id: {}",
+ params.getBackendId(),
DebugUtil.printId(params.getQueryProfile().getQueryId()));
+ } else {
+ boolean isDone = params.isDone();
// the process status is ignored by design.
// actually be does not care the process status of profile
on fe.
processQueryProfile(params.getQueryProfile(),
backend.getHeartbeatAddress(), isDone);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index 76b528464d6..c51ff24ca14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -17,7 +17,7 @@
package org.apache.doris.qe;
-import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.profile.RuntimeProfile;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f693f5b82aa..8d712dda76a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -691,7 +691,7 @@ public class StmtExecutor {
}
context.setQueryId(queryId);
context.setStartTime();
- profile.getSummaryProfile().setQueryBeginTime();
+
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
List<List<String>> changedSessionVar =
VariableMgr.dumpChangedVars(context.getSessionVariable());
profile.setChangedSessionVar(DebugUtil.prettyPrintChangedSessionVar(changedSessionVar));
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
@@ -995,7 +995,7 @@ public class StmtExecutor {
public void executeByLegacy(TUniqueId queryId) throws Exception {
context.setStartTime();
- profile.getSummaryProfile().setQueryBeginTime();
+
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
context.setQueryId(queryId);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
new file mode 100644
index 00000000000..d0d17505861
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.profile;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class ProfileManagerTest {
+ // We need a logger.
+ private static final Logger LOG =
LogManager.getLogger(ProfilePersistentTest.class);
+
+ private static ProfileManager profileManager;
+
+ @BeforeAll
+ static void setUp() {
+ profileManager = new ProfileManager();
+ }
+
+ @BeforeEach
+ void cleanProfile() {
+ profileManager.cleanProfile();
+ }
+
+ @Test
+ void returnsEmptyQueueWhenNoProfiles() {
+ PriorityQueue<ProfileManager.ProfileElement> result =
profileManager.getProfileOrderByQueryFinishTimeDesc();
+ Assertions.assertTrue(result.isEmpty());
+ result = profileManager.getProfileOrderByQueryFinishTime();
+ Assertions.assertTrue(result.isEmpty());
+ result = profileManager.getProfileOrderByQueryStartTime();
+ Assertions.assertTrue(result.isEmpty());
+ }
+
+ static Profile constructProfile(String id) {
+ Profile profile = new Profile();
+ SummaryProfile summaryProfile = new SummaryProfile();
+
summaryProfile.getSummary().getInfoStrings().put(SummaryProfile.PROFILE_ID, id);
+ profile.setSummaryProfile(summaryProfile);
+ return profile;
+ }
+
+ @Test
+ void getProfileByOrder() {
+ final int normalProfiles = 100;
+ for (int i = 0; i < normalProfiles; i++) {
+ Profile profile = constructProfile(String.valueOf(i));
+ Random random = new Random();
+ profile.setQueryFinishTimestamp(random.nextInt(200 - 101) + 101);
+ // set query start time in range of [0, 1000)
+ profile.getSummaryProfile().setQueryBeginTime(random.nextInt(100));
+ profileManager.pushProfile(profile);
+
+ if (i == 10) {
+ LOG.info("Profile manager debug info: {}",
profileManager.getDebugInfo());
+ }
+ }
+
+ // Insert two profiles with default value.
+ Profile profile1 = constructProfile("Default 1");
+ profileManager.pushProfile(profile1);
+ Profile profile2 = constructProfile("Default 2");
+ profileManager.pushProfile(profile2);
+
+ profile1 = constructProfile("Default 3");
+ profile1.setQueryFinishTimestamp(1000L);
+ profileManager.pushProfile(profile1);
+ profile1 = constructProfile("Default 4");
+ profile1.setQueryFinishTimestamp(1000L);
+ profileManager.pushProfile(profile1);
+
+ profile1 = constructProfile("Default 5");
+ profile1.getSummaryProfile().setQueryBeginTime(1000L);
+ profileManager.pushProfile(profile1);
+ profile1 = constructProfile("Default 6");
+ profile1.getSummaryProfile().setQueryBeginTime(1000L);
+ profileManager.pushProfile(profile1);
+
+
+ Set<String> profileThatHasQueryFinishTime1000 = new HashSet<>();
+ profileThatHasQueryFinishTime1000.add("Default 3");
+ profileThatHasQueryFinishTime1000.add("Default 4");
+ Set<String> profileThatHasQueryStartTime1000 = new HashSet<>();
+ profileThatHasQueryStartTime1000.add("Default 5");
+ profileThatHasQueryStartTime1000.add("Default 6");
+ Set<String> profileThatHasDefaultQueryFinishTime = new HashSet<>();
+ profileThatHasDefaultQueryFinishTime.add("Default 1");
+ profileThatHasDefaultQueryFinishTime.add("Default 2");
+ profileThatHasDefaultQueryFinishTime.add("Default 5");
+ profileThatHasDefaultQueryFinishTime.add("Default 6");
+ Set<String> profileThatHasDefaultQueryStartTime = new HashSet<>();
+ profileThatHasDefaultQueryStartTime.add("Default 1");
+ profileThatHasDefaultQueryStartTime.add("Default 2");
+ profileThatHasDefaultQueryStartTime.add("Default 3");
+ profileThatHasDefaultQueryStartTime.add("Default 4");
+
+
+ // Profile should be ordered by query finish time in descending order.
+ // Meas that the profile with the latest query finish time should be
at the top of the queue.
+ PriorityQueue<ProfileManager.ProfileElement> orderedResults =
profileManager.getProfileOrderByQueryFinishTimeDesc();
+ assert orderedResults != null;
+ assert !orderedResults.isEmpty();
+ Assertions.assertEquals(106, orderedResults.size(),
profileManager.getDebugInfo());
+
+ for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId()));
+ }
+ for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId()));
+ }
+
+ long prevQueryFinishTime = 1000L;
+ for (int i = 0; i < normalProfiles; i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+ Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <=
prevQueryFinishTime);
+ prevQueryFinishTime = result.profile.getQueryFinishTimestamp();
+ }
+
+ orderedResults = profileManager.getProfileOrderByQueryFinishTime();
+ Assertions.assertEquals(orderedResults.size(), 106);
+ // Profile should be ordered by query finish time in ascending order.
+ prevQueryFinishTime = Long.MIN_VALUE;
+ for (int i = 0; i < normalProfiles; i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+ Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >=
prevQueryFinishTime);
+ prevQueryFinishTime = result.profile.getQueryFinishTimestamp();
+ }
+ for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId()));
+ }
+ for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId()));
+ }
+
+ orderedResults = profileManager.getProfileOrderByQueryStartTime();
+ Assertions.assertEquals(orderedResults.size(), 106);
+ // Profile should be ordered by query start time in ascending order.
+ long prevQueryStartTime = -1;
+ for (int i = 0; i < profileThatHasDefaultQueryStartTime.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasDefaultQueryStartTime.contains(result.profile.getId()),
+ result.profile.getId() + " " +
result.profile.getSummaryProfile().getQueryBeginTime());
+ }
+
+ for (int i = 0; i < normalProfiles; i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >=
prevQueryStartTime);
+ prevQueryStartTime =
result.profile.getSummaryProfile().getQueryBeginTime();
+ }
+
+ for (int i = 0; i < profileThatHasQueryStartTime1000.size(); i++) {
+ ProfileManager.ProfileElement result = orderedResults.poll();
+ Assertions.assertNotEquals(result, null);
+
Assertions.assertTrue(profileThatHasQueryStartTime1000.contains(result.profile.getId()),
+ result.profile.getId() + " " +
result.profile.getSummaryProfile().getQueryBeginTime());
+ }
+ }
+
+ @Test
+ void getProfileByOrderParallel() throws InterruptedException {
+ // Test the parallel case.
+ // Create a thread pool with 3 threads.
+ final int threadNum = 3;
+ List<Thread> threads = new ArrayList<>();
+ AtomicBoolean stopFlag = new AtomicBoolean(false);
+
+ // These threads keep adding profiles to the profile manager.
+ // The profile they create has random name, random query finish time
and random query start time.
+ for (int i = 0; i < threadNum; i++) {
+ threads.add(new Thread(() -> {
+ Random random = new Random();
+ for (int j = 0; j < 100; j++) {
+ Profile profile =
constructProfile(String.valueOf(random.nextInt(1000)));
+
profile.getSummaryProfile().setQueryBeginTime(random.nextInt(1000));
+ profile.setQueryFinishTimestamp(random.nextInt(2000) +
1000);
+ profileManager.pushProfile(profile);
+ }
+ }));
+ }
+ // Create another thread to get the profile by different order.
+ for (int i = 0; i < threadNum; i++) {
+ threads.add(new Thread(() -> {
+ while (!stopFlag.get()) {
+ PriorityQueue<ProfileManager.ProfileElement>
orderedResults = profileManager.getProfileOrderByQueryFinishTimeDesc();
+ long prevQueryFinishTime = Long.MAX_VALUE;
+ while (!orderedResults.isEmpty()) {
+ ProfileManager.ProfileElement result =
orderedResults.poll();
+
Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <=
prevQueryFinishTime);
+ prevQueryFinishTime =
result.profile.getQueryFinishTimestamp();
+ }
+ }
+ }));
+ }
+
+ for (int i = 0; i < threadNum; i++) {
+ threads.add(new Thread(() -> {
+ while (!stopFlag.get()) {
+ PriorityQueue<ProfileManager.ProfileElement>
orderedResults = profileManager.getProfileOrderByQueryStartTime();
+ long prevQueryStartTime = -1;
+ while (!orderedResults.isEmpty()) {
+ ProfileManager.ProfileElement result =
orderedResults.poll();
+
Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >=
prevQueryStartTime);
+ prevQueryStartTime =
result.profile.getSummaryProfile().getQueryBeginTime();
+ }
+ }
+ }));
+ }
+
+ for (int i = 0; i < threadNum; i++) {
+ threads.add(new Thread(() -> {
+ while (!stopFlag.get()) {
+ PriorityQueue<ProfileManager.ProfileElement>
orderedResults = profileManager.getProfileOrderByQueryFinishTime();
+ long prevQueryFinishTime = Long.MIN_VALUE;
+ while (!orderedResults.isEmpty()) {
+ ProfileManager.ProfileElement result =
orderedResults.poll();
+
Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >=
prevQueryFinishTime);
+ prevQueryFinishTime =
result.profile.getQueryFinishTimestamp();
+ }
+ }
+ }));
+ }
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ Thread.sleep(5000);
+
+ stopFlag.set(true);
+
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
similarity index 98%
rename from
fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
index c21c57ba1d4..cb1804e2d50 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
-import org.apache.doris.common.profile.ExecutionProfile;
-import org.apache.doris.common.profile.Profile;
-import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.thrift.QueryState;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUnit;
@@ -77,7 +76,6 @@ public class ProfilePersistentTest {
SummaryProfile summaryProfile = constructRandomSummaryProfile();
String stringUniqueId = summaryProfile.getProfileId();
TUniqueId thriftUniqueId =
DebugUtil.parseTUniqueIdFromString(stringUniqueId);
- profile.setId(stringUniqueId);
profile.setSummaryProfile(summaryProfile);
for (int i = 0; i < executionProfileNum; i++) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
similarity index 99%
rename from
fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
rename to
fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
index 56ed66c0504..93cb5e35794 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
import org.apache.doris.thrift.TCounter;
import org.apache.doris.thrift.TRuntimeProfileNode;
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index d201cad3fac..c35859cd6b4 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -39,3 +39,5 @@ pipeline_task_leakage_detect_period_sec=1
crash_in_memory_tracker_inaccurate = true
enable_table_size_correctness_check=true
enable_brpc_connection_check=true
+
+sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
\ No newline at end of file
diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
index 955409d9bb2..40d88d0f207 100644
--- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
@@ -24,7 +24,10 @@ enable_debug_points = true
disable_datev1=false
disable_decimalv2=false
-max_query_profile_num=1000
+sys_log_verbose_modules =
org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl
+# profile related
+max_query_profile_num = 2000
+max_spilled_profile_num = 2000
statistics_sql_mem_limit_in_bytes=21474836480
cpu_resource_limit_per_analyze_task=-1
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index 0b73375b3fb..be798738e68 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -35,7 +35,7 @@ JEMALLOC_PROF_PRFIX="jemalloc_heap_profile_"
# INFO, WARNING, ERROR, FATAL
sys_log_level = INFO
-
+sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
be_port = 9161
webserver_port = 8141
heartbeat_service_port = 9151
diff --git a/regression-test/pipeline/p0/conf/fe.conf
b/regression-test/pipeline/p0/conf/fe.conf
index 44688ff4adc..2f493ff1098 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -34,6 +34,7 @@
JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m
sys_log_level = INFO
sys_log_mode = NORMAL
+sys_log_verbose_modules =
org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl
arrow_flight_sql_port = 8081
catalog_trash_expire_second=1
#enable ssl for test
@@ -85,3 +86,7 @@ enable_deadlock_detection = true
max_lock_hold_threshold_seconds = 10
force_olap_table_replication_allocation=tag.location.default:1
+
+# profile related
+max_query_profile_num = 2000
+max_spilled_profile_num = 2000
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]