lgbo-ustc commented on code in PR #6558:
URL: https://github.com/apache/incubator-gluten/pull/6558#discussion_r1696412373


##########
cpp-ch/local-engine/Common/QueryContext.cpp:
##########
@@ -15,90 +15,163 @@
  * limitations under the License.
  */
 #include "QueryContext.h"
+
+#include <format>
+
 #include <Interpreters/Context.h>
 #include <Parser/SerializedPlanParser.h>
-#include <Common/ConcurrentMap.h>
 #include <Common/CurrentThread.h>
 #include <Common/ThreadStatus.h>
+#include <Common/CHUtil.h>
+#include <Common/GlutenConfig.h>
+#include <base/unit.h>
+#include <sstream>
+#include <iomanip>
 
 
 namespace DB
 {
 namespace ErrorCodes
 {
-    extern const int LOGICAL_ERROR;
+extern const int LOGICAL_ERROR;
 }
 }
 
 namespace local_engine
 {
 using namespace DB;
-thread_local std::shared_ptr<CurrentThread::QueryScope> query_scope;
-thread_local std::shared_ptr<ThreadStatus> thread_status;
-ConcurrentMap<int64_t, NativeAllocatorContextPtr> allocator_map;
 
-int64_t initializeQuery(ReservationListenerWrapperPtr listener)
+struct QueryContext
+{
+    std::shared_ptr<ThreadStatus> thread_status;
+    std::shared_ptr<ThreadGroup> thread_group;
+    ContextMutablePtr query_context;
+};
+
+std::unordered_map<int64_t, std::shared_ptr<QueryContext>> query_map;
+std::mutex query_map_mutex;

Review Comment:
   这个需要锁?



##########
cpp-ch/local-engine/Common/QueryContext.cpp:
##########
@@ -15,90 +15,163 @@
  * limitations under the License.
  */
 #include "QueryContext.h"
+
+#include <format>
+
 #include <Interpreters/Context.h>
 #include <Parser/SerializedPlanParser.h>
-#include <Common/ConcurrentMap.h>
 #include <Common/CurrentThread.h>
 #include <Common/ThreadStatus.h>
+#include <Common/CHUtil.h>
+#include <Common/GlutenConfig.h>
+#include <base/unit.h>
+#include <sstream>
+#include <iomanip>
 
 
 namespace DB
 {
 namespace ErrorCodes
 {
-    extern const int LOGICAL_ERROR;
+extern const int LOGICAL_ERROR;
 }
 }
 
 namespace local_engine
 {
 using namespace DB;
-thread_local std::shared_ptr<CurrentThread::QueryScope> query_scope;
-thread_local std::shared_ptr<ThreadStatus> thread_status;
-ConcurrentMap<int64_t, NativeAllocatorContextPtr> allocator_map;
 
-int64_t initializeQuery(ReservationListenerWrapperPtr listener)
+struct QueryContext
+{
+    std::shared_ptr<ThreadStatus> thread_status;
+    std::shared_ptr<ThreadGroup> thread_group;
+    ContextMutablePtr query_context;
+};
+
+std::unordered_map<int64_t, std::shared_ptr<QueryContext>> query_map;
+std::mutex query_map_mutex;
+
+int64_t QueryContextManager::initializeQuery()
 {
-    if (thread_status) return -1;
-    auto query_context = 
Context::createCopy(SerializedPlanParser::global_context);
-    query_context->makeQueryContext();
+    std::shared_ptr<QueryContext> query_context = 
std::make_shared<QueryContext>();
+    query_context->query_context = 
Context::createCopy(SerializedPlanParser::global_context);
+    query_context->query_context->makeQueryContext();
 
     // empty input will trigger random query id to be set
     // FileCache will check if query id is set to decide whether to skip cache 
or not
     // check FileCache.isQueryInitialized()
     //
     // Notice:
     // this generated random query id a qualified global queryid for the spark 
query
-    query_context->setCurrentQueryId("");
-
-    auto allocator_context = std::make_shared<NativeAllocatorContext>();
-    allocator_context->thread_status = std::make_shared<ThreadStatus>(true);
-    allocator_context->query_scope = 
std::make_shared<CurrentThread::QueryScope>(query_context);
-    allocator_context->group = std::make_shared<ThreadGroup>(query_context);
-    allocator_context->query_context = query_context;
-    allocator_context->listener = listener;
-    thread_status = allocator_context->thread_status;
-    query_scope = allocator_context->query_scope;
-    auto allocator_id = reinterpret_cast<int64_t>(allocator_context.get());
-    CurrentMemoryTracker::before_alloc = [listener](Int64 size, bool 
throw_if_memory_exceed) -> void
+    
query_context->query_context->setCurrentQueryId(toString(UUIDHelpers::generateV4()));
+    auto config = MemoryConfig::loadFromContext(query_context->query_context);
+    query_context->thread_status = std::make_shared<ThreadStatus>(false);
+    query_context->thread_group = 
std::make_shared<ThreadGroup>(query_context->query_context);
+    CurrentThread::attachToGroup(query_context->thread_group);
+    auto memory_limit = config.off_heap_per_task;
+
+    query_context->thread_group->memory_tracker.setSoftLimit(memory_limit);
+    query_context->thread_group->memory_tracker.setHardLimit(memory_limit + 
config.extra_memory_hard_limit);
+    std::lock_guard<std::mutex> lock_guard(query_map_mutex);
+    int64_t id = reinterpret_cast<int64_t>(query_context->thread_group.get());
+    query_map.emplace(id, query_context);
+    return id;
+}
+
+DB::ContextMutablePtr QueryContextManager::currentQueryContext()
+{
+    if (!CurrentThread::getGroup())
     {
-        if (throw_if_memory_exceed)
-            listener->reserveOrThrow(size);
-        else
-            listener->reserve(size);
-    };
-    CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { 
listener->tryFree(size); };
-    CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return 
listener->currentMemory(); };
-    allocator_map.insert(allocator_id, allocator_context);
-    return allocator_id;
+        throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not 
found.");
+    }
+    std::lock_guard lock_guard(query_map_mutex);
+    int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
+    return query_map[id]->query_context;
 }
 
-void releaseAllocator(int64_t allocator_id)
+void 
QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & 
counters)
 {
-    if (!allocator_map.get(allocator_id))
+    if (!CurrentThread::getGroup())
+    {
+        return;
+    }
+    if (logger->information())
     {
-        throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "allocator {} not 
found", allocator_id);
+        std::ostringstream msg;

Review Comment:
   ostringstream is not recommended in CH



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to