This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 52dd83c541 [GLUTEN-7795][CH] Add backend task id log (#7801)
52dd83c541 is described below
commit 52dd83c541caa51c603ca439a72f0865bdb53ec7
Author: Shuai li <[email protected]>
AuthorDate: Wed Nov 6 16:17:20 2024 +0800
[GLUTEN-7795][CH] Add backend task id log (#7801)
* fix2
* Add task id
* add more task id log
* fix ci error
* fix ci build error
* fix review
* fix rebase
---
.../org/apache/gluten/memory/CHThreadGroup.java | 14 +++++++++----
cpp-ch/local-engine/Common/QueryContext.cpp | 22 ++++++++++++++++-----
cpp-ch/local-engine/Common/QueryContext.h | 5 +++--
.../local-engine/Parser/SerializedPlanParser.cpp | 23 +++++++++++-----------
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 2 ++
cpp-ch/local-engine/local_engine_jni.cpp | 5 +++--
6 files changed, 47 insertions(+), 24 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
index 119dc61893..36f7c388ba 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
@@ -28,7 +28,13 @@ public class CHThreadGroup implements TaskResource {
*/
public static void registerNewThreadGroup() {
if (TaskResources.isResourceRegistered(CHThreadGroup.class.getName()))
return;
- CHThreadGroup group = new CHThreadGroup();
+
+ String taskId = "";
+ if (TaskResources.getLocalTaskContext() != null) {
+ taskId =
String.valueOf(TaskResources.getLocalTaskContext().taskAttemptId());
+ }
+
+ CHThreadGroup group = new CHThreadGroup(taskId);
TaskResources.addResource(CHThreadGroup.class.getName(), group);
TaskContext.get()
.addTaskCompletionListener(
@@ -40,8 +46,8 @@ public class CHThreadGroup implements TaskResource {
private long thread_group_id = 0;
private long peak_memory = -1;
- private CHThreadGroup() {
- thread_group_id = createThreadGroup();
+ private CHThreadGroup(String taskId) {
+ thread_group_id = createThreadGroup(taskId);
}
public long getPeakMemory() {
@@ -69,7 +75,7 @@ public class CHThreadGroup implements TaskResource {
return "CHThreadGroup";
}
- private static native long createThreadGroup();
+ private static native long createThreadGroup(String taskId);
private static native long threadGroupPeakMemory(long id);
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp
b/cpp-ch/local-engine/Common/QueryContext.cpp
index 4992ba0a45..eca9ad5b18 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -44,6 +44,7 @@ struct QueryContext::Data
std::shared_ptr<ThreadStatus> thread_status;
std::shared_ptr<ThreadGroup> thread_group;
ContextMutablePtr query_context;
+ String task_id;
static DB::ContextMutablePtr global_context;
static SharedContextHolder shared_context;
@@ -83,11 +84,12 @@ DB::ContextPtr QueryContext::globalContext()
return Data::global_context;
}
-int64_t QueryContext::initializeQuery()
+int64_t QueryContext::initializeQuery(const String & task_id)
{
std::shared_ptr<Data> query_context = std::make_shared<Data>();
query_context->query_context = Context::createCopy(globalContext());
query_context->query_context->makeQueryContext();
+ query_context->task_id = task_id;
// empty input will trigger random query id to be set
// FileCache will check if query id is set to decide whether to skip cache
or not
@@ -95,7 +97,7 @@ int64_t QueryContext::initializeQuery()
//
// Notice:
// this generated random query id a qualified global queryid for the spark
query
-
query_context->query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
+
query_context->query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())
+ "_" + task_id);
auto config = MemoryConfig::loadFromContext(query_context->query_context);
query_context->thread_status = std::make_shared<ThreadStatus>(false);
query_context->thread_group =
std::make_shared<ThreadGroup>(query_context->query_context);
@@ -124,14 +126,24 @@ std::shared_ptr<DB::ThreadGroup>
QueryContext::currentThreadGroup()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}
-void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters &
counters) const
+String QueryContext::currentTaskIdOrEmpty()
+{
+ if (auto thread_group = CurrentThread::getGroup())
+ {
+ const int64_t id = reinterpret_cast<int64_t>(thread_group.get());
+ return query_map_.get(id)->task_id;
+ }
+ return "";
+}
+
+void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters &
counters, const String & task_id) const
{
if (!CurrentThread::getGroup())
return;
if (logger_->information())
{
std::ostringstream msg;
- msg << "\n---------------------Task Performance
Counters-----------------------------\n";
+ msg << "\n---------------------Task Performance Counters(" << task_id
<< ")-----------------------------\n";
for (ProfileEvents::Event event = ProfileEvents::Event(0); event <
counters.num_counters; event++)
{
const auto * name = ProfileEvents::getName(event);
@@ -167,7 +179,7 @@ void QueryContext::finalizeQuery(int64_t id)
if (currentThreadGroupMemoryUsage() > 2_MiB)
LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a
memory leak!", currentThreadGroupMemoryUsage());
- logCurrentPerformanceCounters(context->thread_group->performance_counters);
+ logCurrentPerformanceCounters(context->thread_group->performance_counters,
context->task_id);
context->thread_status->detachFromGroup();
context->thread_group.reset();
context->thread_status.reset();
diff --git a/cpp-ch/local-engine/Common/QueryContext.h
b/cpp-ch/local-engine/Common/QueryContext.h
index 821144f5fc..6ced1f6320 100644
--- a/cpp-ch/local-engine/Common/QueryContext.h
+++ b/cpp-ch/local-engine/Common/QueryContext.h
@@ -40,10 +40,11 @@ public:
static QueryContext instance;
return instance;
}
- int64_t initializeQuery();
+ int64_t initializeQuery(const String & task_id);
DB::ContextMutablePtr currentQueryContext();
+ String currentTaskIdOrEmpty();
static std::shared_ptr<DB::ThreadGroup> currentThreadGroup();
- void logCurrentPerformanceCounters(ProfileEvents::Counters & counters)
const;
+ void logCurrentPerformanceCounters(ProfileEvents::Counters & counters,
const String & task_id) const;
size_t currentPeakMemory(int64_t id);
void finalizeQuery(int64_t id);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 74c1d35001..4c1d5b902d 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -62,6 +62,7 @@
#include <Common/Exception.h>
#include <Common/GlutenConfig.h>
#include <Common/JNIUtils.h>
+#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
@@ -101,7 +102,7 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v,
char c)
return res;
}
-void adjustOutput(const DB::QueryPlanPtr & query_plan, const
substrait::PlanRel & root_rel)
+void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::PlanRel & root_rel) const
{
if (root_rel.root().names_size())
{
@@ -110,8 +111,8 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::PlanRel
const auto cols =
query_plan->getCurrentHeader().getNamesAndTypesList();
if (cols.getNames().size() !=
static_cast<size_t>(root_rel.root().names_size()))
{
- debug::dumpPlan(*query_plan, true);
- debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+ debug::dumpPlan(*query_plan, true, log);
+ debug::dumpMessage(root_rel, "substrait::PlanRel", true, log);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait
plan name size {}.",
@@ -134,8 +135,8 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::PlanRel
const auto & original_cols =
original_header.getColumnsWithTypeAndName();
if (static_cast<size_t>(output_schema.types_size()) !=
original_cols.size())
{
- debug::dumpPlan(*query_plan, true);
- debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+ debug::dumpPlan(*query_plan, true, log);
+ debug::dumpMessage(root_rel, "substrait::PlanRel", true, log);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait
plan output schema size {}, subtrait plan name size {}.",
@@ -183,7 +184,7 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::PlanRel
QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
{
- debug::dumpMessage(plan, "substrait::Plan");
+ debug::dumpMessage(plan, "substrait::Plan", false, log);
//parseExtensions(plan.extensions());
if (plan.relations_size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found");
@@ -204,7 +205,7 @@ QueryPlanPtr SerializedPlanParser::parse(const
substrait::Plan & plan)
PlanUtil::checkOuputType(*query_plan);
#endif
- debug::dumpPlan(*query_plan);
+ debug::dumpPlan(*query_plan, false, log);
return query_plan;
}
@@ -323,7 +324,7 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
}
catch (...)
{
- LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}",
PlanUtil::explainPlan(*query_plan));
+ LOG_ERROR(log, "Invalid plan:\n{}",
PlanUtil::explainPlan(*query_plan));
throw;
}
@@ -332,10 +333,9 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
assert(root_rel.has_root());
if (root_rel.root().input().has_write())
addSinkTransform(parser_context->queryContext(),
root_rel.root().input().write(), builder);
- auto * logger = &Poco::Logger::get("SerializedPlanParser");
- LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() /
1000.0);
+ LOG_INFO(log, "build pipeline {} ms", stopwatch.elapsedMicroseconds() /
1000.0);
LOG_DEBUG(
- logger,
+ log,
"clickhouse plan [optimization={}]:\n{}",
settings[Setting::query_plan_enable_optimizations],
PlanUtil::explainPlan(*query_plan));
@@ -347,6 +347,7 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
SerializedPlanParser::SerializedPlanParser(ParserContextPtr parser_context_) :
parser_context(parser_context_)
{
context = parser_context->queryContext();
+ log = getLogger("SerializedPlanParser(" +
QueryContext::instance().currentTaskIdOrEmpty() + ")");
}
NonNullableColumnsResolver::NonNullableColumnsResolver(
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 201fc46b2e..5fd8443313 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -118,7 +118,9 @@ public:
private:
DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const
substrait::Rel *> & rel_stack);
+ void adjustOutput(const DB::QueryPlanPtr & query_plan, const
substrait::PlanRel & root_rel) const;
+ LoggerPtr log;
std::vector<jobject> input_iters;
std::vector<std::string> split_infos;
int split_info_index = 0;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 8ff8a866b7..952f6e50e9 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1310,10 +1310,11 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNex
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
-JNIEXPORT jlong
Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env,
jclass)
+JNIEXPORT jlong
Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env,
jclass, jstring task_id_)
{
LOCAL_ENGINE_JNI_METHOD_START
- return local_engine::QueryContext::instance().initializeQuery();
+ auto task_id = jstring2string(env, task_id_);
+ return local_engine::QueryContext::instance().initializeQuery(task_id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]