This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 52dd83c541 [GLUTEN-7795][CH] Add backend task id log (#7801)
52dd83c541 is described below

commit 52dd83c541caa51c603ca439a72f0865bdb53ec7
Author: Shuai li <[email protected]>
AuthorDate: Wed Nov 6 16:17:20 2024 +0800

    [GLUTEN-7795][CH] Add backend task id log (#7801)
    
    * fix2
    
    * Add task id
    
    * add more task id log
    
    * fix ci error
    
    * fix ci build error
    
    * fix review
    
    * fix rebase
---
 .../org/apache/gluten/memory/CHThreadGroup.java    | 14 +++++++++----
 cpp-ch/local-engine/Common/QueryContext.cpp        | 22 ++++++++++++++++-----
 cpp-ch/local-engine/Common/QueryContext.h          |  5 +++--
 .../local-engine/Parser/SerializedPlanParser.cpp   | 23 +++++++++++-----------
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |  2 ++
 cpp-ch/local-engine/local_engine_jni.cpp           |  5 +++--
 6 files changed, 47 insertions(+), 24 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java 
b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
index 119dc61893..36f7c388ba 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/memory/CHThreadGroup.java
@@ -28,7 +28,13 @@ public class CHThreadGroup implements TaskResource {
    */
   public static void registerNewThreadGroup() {
     if (TaskResources.isResourceRegistered(CHThreadGroup.class.getName())) 
return;
-    CHThreadGroup group = new CHThreadGroup();
+
+    String taskId = "";
+    if (TaskResources.getLocalTaskContext() != null) {
+      taskId = 
String.valueOf(TaskResources.getLocalTaskContext().taskAttemptId());
+    }
+
+    CHThreadGroup group = new CHThreadGroup(taskId);
     TaskResources.addResource(CHThreadGroup.class.getName(), group);
     TaskContext.get()
         .addTaskCompletionListener(
@@ -40,8 +46,8 @@ public class CHThreadGroup implements TaskResource {
   private long thread_group_id = 0;
   private long peak_memory = -1;
 
-  private CHThreadGroup() {
-    thread_group_id = createThreadGroup();
+  private CHThreadGroup(String taskId) {
+    thread_group_id = createThreadGroup(taskId);
   }
 
   public long getPeakMemory() {
@@ -69,7 +75,7 @@ public class CHThreadGroup implements TaskResource {
     return "CHThreadGroup";
   }
 
-  private static native long createThreadGroup();
+  private static native long createThreadGroup(String taskId);
 
   private static native long threadGroupPeakMemory(long id);
 
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp 
b/cpp-ch/local-engine/Common/QueryContext.cpp
index 4992ba0a45..eca9ad5b18 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -44,6 +44,7 @@ struct QueryContext::Data
     std::shared_ptr<ThreadStatus> thread_status;
     std::shared_ptr<ThreadGroup> thread_group;
     ContextMutablePtr query_context;
+    String task_id;
 
     static DB::ContextMutablePtr global_context;
     static SharedContextHolder shared_context;
@@ -83,11 +84,12 @@ DB::ContextPtr QueryContext::globalContext()
     return Data::global_context;
 }
 
-int64_t QueryContext::initializeQuery()
+int64_t QueryContext::initializeQuery(const String & task_id)
 {
     std::shared_ptr<Data> query_context = std::make_shared<Data>();
     query_context->query_context = Context::createCopy(globalContext());
     query_context->query_context->makeQueryContext();
+    query_context->task_id = task_id;
 
     // empty input will trigger random query id to be set
     // FileCache will check if query id is set to decide whether to skip cache 
or not
@@ -95,7 +97,7 @@ int64_t QueryContext::initializeQuery()
     //
     // Notice:
     // this generated random query id a qualified global queryid for the spark 
query
-    
query_context->query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
+    
query_context->query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4())
 + "_" + task_id);
     auto config = MemoryConfig::loadFromContext(query_context->query_context);
     query_context->thread_status = std::make_shared<ThreadStatus>(false);
     query_context->thread_group = 
std::make_shared<ThreadGroup>(query_context->query_context);
@@ -124,14 +126,24 @@ std::shared_ptr<DB::ThreadGroup> 
QueryContext::currentThreadGroup()
     throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
 }
 
-void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters & 
counters) const
+String QueryContext::currentTaskIdOrEmpty()
+{
+    if (auto thread_group = CurrentThread::getGroup())
+    {
+        const int64_t id = reinterpret_cast<int64_t>(thread_group.get());
+        return query_map_.get(id)->task_id;
+    }
+   return "";
+}
+
+void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters & 
counters, const String & task_id) const
 {
     if (!CurrentThread::getGroup())
         return;
     if (logger_->information())
     {
         std::ostringstream msg;
-        msg << "\n---------------------Task Performance 
Counters-----------------------------\n";
+        msg << "\n---------------------Task Performance Counters(" << task_id 
<< ")-----------------------------\n";
         for (ProfileEvents::Event event = ProfileEvents::Event(0); event < 
counters.num_counters; event++)
         {
             const auto * name = ProfileEvents::getName(event);
@@ -167,7 +179,7 @@ void QueryContext::finalizeQuery(int64_t id)
 
     if (currentThreadGroupMemoryUsage() > 2_MiB)
         LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a 
memory leak!", currentThreadGroupMemoryUsage());
-    logCurrentPerformanceCounters(context->thread_group->performance_counters);
+    logCurrentPerformanceCounters(context->thread_group->performance_counters, 
context->task_id);
     context->thread_status->detachFromGroup();
     context->thread_group.reset();
     context->thread_status.reset();
diff --git a/cpp-ch/local-engine/Common/QueryContext.h 
b/cpp-ch/local-engine/Common/QueryContext.h
index 821144f5fc..6ced1f6320 100644
--- a/cpp-ch/local-engine/Common/QueryContext.h
+++ b/cpp-ch/local-engine/Common/QueryContext.h
@@ -40,10 +40,11 @@ public:
         static QueryContext instance;
         return instance;
     }
-    int64_t initializeQuery();
+    int64_t initializeQuery(const String & task_id);
     DB::ContextMutablePtr currentQueryContext();
+    String currentTaskIdOrEmpty();
     static std::shared_ptr<DB::ThreadGroup> currentThreadGroup();
-    void logCurrentPerformanceCounters(ProfileEvents::Counters & counters) 
const;
+    void logCurrentPerformanceCounters(ProfileEvents::Counters & counters, 
const String & task_id) const;
     size_t currentPeakMemory(int64_t id);
     void finalizeQuery(int64_t id);
 
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 74c1d35001..4c1d5b902d 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -62,6 +62,7 @@
 #include <Common/Exception.h>
 #include <Common/GlutenConfig.h>
 #include <Common/JNIUtils.h>
+#include <Common/QueryContext.h>
 #include <Common/logger_useful.h>
 #include <Common/typeid_cast.h>
 
@@ -101,7 +102,7 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, 
char c)
     return res;
 }
 
-void adjustOutput(const DB::QueryPlanPtr & query_plan, const 
substrait::PlanRel & root_rel)
+void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel & root_rel) const
 {
     if (root_rel.root().names_size())
     {
@@ -110,8 +111,8 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel
         const auto cols = 
query_plan->getCurrentHeader().getNamesAndTypesList();
         if (cols.getNames().size() != 
static_cast<size_t>(root_rel.root().names_size()))
         {
-            debug::dumpPlan(*query_plan, true);
-            debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+            debug::dumpPlan(*query_plan, true, log);
+            debug::dumpMessage(root_rel, "substrait::PlanRel", true, log);
             throw Exception(
                 ErrorCodes::LOGICAL_ERROR,
                 "Missmatch result columns size. plan column size {}, subtrait 
plan name size {}.",
@@ -134,8 +135,8 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel
         const auto & original_cols = 
original_header.getColumnsWithTypeAndName();
         if (static_cast<size_t>(output_schema.types_size()) != 
original_cols.size())
         {
-            debug::dumpPlan(*query_plan, true);
-            debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+            debug::dumpPlan(*query_plan, true, log);
+            debug::dumpMessage(root_rel, "substrait::PlanRel", true, log);
             throw Exception(
                 ErrorCodes::LOGICAL_ERROR,
                 "Missmatch result columns size. plan column size {}, subtrait 
plan output schema size {}, subtrait plan name size {}.",
@@ -183,7 +184,7 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel
 
 QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
 {
-    debug::dumpMessage(plan, "substrait::Plan");
+    debug::dumpMessage(plan, "substrait::Plan", false, log);
     //parseExtensions(plan.extensions());
     if (plan.relations_size() != 1)
         throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found");
@@ -204,7 +205,7 @@ QueryPlanPtr SerializedPlanParser::parse(const 
substrait::Plan & plan)
     PlanUtil::checkOuputType(*query_plan);
 #endif
 
-    debug::dumpPlan(*query_plan);
+    debug::dumpPlan(*query_plan, false, log);
     return query_plan;
 }
 
@@ -323,7 +324,7 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
     }
     catch (...)
     {
-        LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}", 
PlanUtil::explainPlan(*query_plan));
+        LOG_ERROR(log, "Invalid plan:\n{}", 
PlanUtil::explainPlan(*query_plan));
         throw;
     }
 
@@ -332,10 +333,9 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
     assert(root_rel.has_root());
     if (root_rel.root().input().has_write())
         addSinkTransform(parser_context->queryContext(), 
root_rel.root().input().write(), builder);
-    auto * logger = &Poco::Logger::get("SerializedPlanParser");
-    LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 
1000.0);
+    LOG_INFO(log, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 
1000.0);
     LOG_DEBUG(
-        logger,
+        log,
         "clickhouse plan [optimization={}]:\n{}",
         settings[Setting::query_plan_enable_optimizations],
         PlanUtil::explainPlan(*query_plan));
@@ -347,6 +347,7 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
 SerializedPlanParser::SerializedPlanParser(ParserContextPtr parser_context_) : 
parser_context(parser_context_)
 {
     context = parser_context->queryContext();
+    log = getLogger("SerializedPlanParser(" + 
QueryContext::instance().currentTaskIdOrEmpty() + ")");
 }
 
 NonNullableColumnsResolver::NonNullableColumnsResolver(
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 201fc46b2e..5fd8443313 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -118,7 +118,9 @@ public:
 
 private:
     DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack);
+    void adjustOutput(const DB::QueryPlanPtr & query_plan, const 
substrait::PlanRel & root_rel) const;
 
+    LoggerPtr log;
     std::vector<jobject> input_iters;
     std::vector<std::string> split_infos;
     int split_info_index = 0;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 8ff8a866b7..952f6e50e9 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1310,10 +1310,11 @@ JNIEXPORT jlong 
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNex
     LOCAL_ENGINE_JNI_METHOD_END(env, -1)
 }
 
-JNIEXPORT jlong 
Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env, 
jclass)
+JNIEXPORT jlong 
Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env, 
jclass, jstring task_id_)
 {
     LOCAL_ENGINE_JNI_METHOD_START
-    return local_engine::QueryContext::instance().initializeQuery();
+    auto task_id = jstring2string(env, task_id_);
+    return local_engine::QueryContext::instance().initializeQuery(task_id);
     LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to