This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new f65087ece3 [CH] Fix issues due to 
https://github.com/ClickHouse/ClickHouse/pull/71539. (#7952)
f65087ece3 is described below

commit f65087ece35fc13a56189c450c56075107aad446
Author: Chang chen <[email protected]>
AuthorDate: Thu Nov 14 23:51:01 2024 +0800

    [CH] Fix issues due to https://github.com/ClickHouse/ClickHouse/pull/71539. 
(#7952)
    
    * Fix issues due to https://github.com/ClickHouse/ClickHouse/pull/71539.
    
    Issue 1
    BuildQueryPipelineSettings is created manually instead of calling  
BuildQueryPipelineSettings::fromContext(); so even 
https://github.com/ClickHouse/ClickHouse/pull/71890 disable 
'query_plan_merge_filters', UTs are still failed.
    
    To fix  this issue, we need set correct default parameters in CHUtil.cpp
    
    Issue 2
    If we set query_plan_merge_filters to true, then 
https://github.com/ClickHouse/ClickHouse/pull/71539 will try to split the left 
most AND atom to a separate DAG and hence create FilterTransformer for each And 
atom, which cause collecting metrics failed.
    
    I am not sure the benefits of setting it to true, let's keep it to false.
    
    * Calling `QueryPlan::explainPlan` after building pipeline is not correct, 
due to `action_dag`  is 
[moved](https://github.com/ClickHouse/ClickHouse/blob/22d2c856a70dfb8b6e4c506fcb22ac03d59df9be/src/Processors/QueryPlan/FilterStep.cpp#L161).
---
 cpp-ch/local-engine/Common/CHUtil.cpp              | 11 ++++++++
 cpp-ch/local-engine/Common/DebugUtils.cpp          | 16 +++++++----
 cpp-ch/local-engine/Common/DebugUtils.h            |  2 +-
 .../local-engine/Parser/SerializedPlanParser.cpp   | 33 +++++++++-------------
 cpp-ch/local-engine/Parser/SerializedPlanParser.h  |  4 +--
 5 files changed, 37 insertions(+), 29 deletions(-)

diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 778c4f257b..2413fae9e3 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -80,6 +80,9 @@ namespace Setting
 {
 extern const SettingsUInt64 prefer_external_sort_block_bytes;
 extern const SettingsUInt64 max_bytes_before_external_sort;
+extern const SettingsBool query_plan_merge_filters;
+extern const SettingsBool compile_expressions;
+extern const SettingsShortCircuitFunctionEvaluation 
short_circuit_function_evaluation;
 }
 namespace ErrorCodes
 {
@@ -722,6 +725,14 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
     settings.set("max_download_threads", 1);
     settings.set("input_format_parquet_enable_row_group_prefetch", false);
 
+    /// update per https://github.com/ClickHouse/ClickHouse/pull/71539
+    /// if true, we can't get correct metrics for the query
+    settings[Setting::query_plan_merge_filters] = false;
+    /// We now set BuildQueryPipelineSettings according to config.
+    settings[Setting::compile_expressions] = true;
+    settings[Setting::short_circuit_function_evaluation] = 
ShortCircuitFunctionEvaluation::DISABLE;
+    ///
+
     for (const auto & [key, value] : spark_conf_map)
     {
         // Firstly apply 
spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to 
settings
diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp 
b/cpp-ch/local-engine/Common/DebugUtils.cpp
index 2fcab59bf8..8a4323cb1c 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.cpp
+++ b/cpp-ch/local-engine/Common/DebugUtils.cpp
@@ -38,7 +38,7 @@ namespace pb_util = google::protobuf::util;
 namespace debug
 {
 
-void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
+void dumpPlan(DB::QueryPlan & plan, const char * type, bool force, LoggerPtr 
logger)
 {
     if (!logger)
     {
@@ -51,10 +51,12 @@ void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr 
logger)
         return;
 
     auto out = local_engine::PlanUtil::explainPlan(plan);
+    auto task_id = 
local_engine::QueryContext::instance().currentTaskIdOrEmpty();
+    task_id = task_id.empty() ? "" : "(" + task_id + ")";
     if (force) // force
-        LOG_ERROR(logger, "clickhouse plan({}):\n{}", 
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
+        LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, out);
     else
-        LOG_DEBUG(logger, "clickhouse plan({}):\n{}", 
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
+        LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, out);
 }
 
 void dumpMessage(const google::protobuf::Message & message, const char * type, 
bool force, LoggerPtr logger)
@@ -70,13 +72,15 @@ void dumpMessage(const google::protobuf::Message & message, 
const char * type, b
         return;
     pb_util::JsonOptions options;
     std::string json;
-    if (auto s = google::protobuf::json::MessageToJsonString(message, &json, 
options); !s.ok())
+    if (auto s = MessageToJsonString(message, &json, options); !s.ok())
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {} 
to Json", type);
 
+    auto task_id = 
local_engine::QueryContext::instance().currentTaskIdOrEmpty();
+    task_id = task_id.empty() ? "" : "(" + task_id + ")";
     if (force) // force
-        LOG_ERROR(logger, "{}({}):\n{}", type, 
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
+        LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, json);
     else
-        LOG_DEBUG(logger, "{}({}):\n{}", type, 
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
+        LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, json);
 }
 
 void headBlock(const DB::Block & block, size_t count)
diff --git a/cpp-ch/local-engine/Common/DebugUtils.h 
b/cpp-ch/local-engine/Common/DebugUtils.h
index 55a0be5140..338326b05e 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.h
+++ b/cpp-ch/local-engine/Common/DebugUtils.h
@@ -29,7 +29,7 @@ class QueryPlan;
 namespace debug
 {
 
-void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr);
+void dumpPlan(DB::QueryPlan & plan, const char * type = "clickhouse plan", 
bool force = false, LoggerPtr = nullptr);
 void dumpMessage(const google::protobuf::Message & message, const char * type, 
bool force = false, LoggerPtr = nullptr);
 
 void headBlock(const DB::Block & block, size_t count = 10);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 748ff88acb..4e461a5c49 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -101,8 +101,9 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v, 
char c)
     return res;
 }
 
-void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::PlanRel & root_rel) const
+void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, 
const substrait::Plan & plan)
 {
+    const substrait::PlanRel & root_rel = plan.relations().at(0);
     if (root_rel.root().names_size())
     {
         ActionsDAG 
actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())};
@@ -110,8 +111,8 @@ void SerializedPlanParser::adjustOutput(const 
DB::QueryPlanPtr & query_plan, con
         const auto cols = 
query_plan->getCurrentHeader().getNamesAndTypesList();
         if (cols.getNames().size() != 
static_cast<size_t>(root_rel.root().names_size()))
         {
-            debug::dumpPlan(*query_plan, true);
-            debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+            debug::dumpPlan(*query_plan, "clickhouse plan", true);
+            debug::dumpMessage(plan, "substrait::Plan", true);
             throw Exception(
                 ErrorCodes::LOGICAL_ERROR,
                 "Missmatch result columns size. plan column size {}, subtrait 
plan name size {}.",
@@ -134,8 +135,8 @@ void SerializedPlanParser::adjustOutput(const 
DB::QueryPlanPtr & query_plan, con
         const auto & original_cols = 
original_header.getColumnsWithTypeAndName();
         if (static_cast<size_t>(output_schema.types_size()) != 
original_cols.size())
         {
-            debug::dumpPlan(*query_plan, true);
-            debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+            debug::dumpPlan(*query_plan, "clickhouse plan", true);
+            debug::dumpMessage(plan, "substrait::Plan", true);
             throw Exception(
                 ErrorCodes::LOGICAL_ERROR,
                 "Missmatch result columns size. plan column size {}, subtrait 
plan output schema size {}, subtrait plan name size {}.",
@@ -198,7 +199,7 @@ QueryPlanPtr SerializedPlanParser::parse(const 
substrait::Plan & plan)
     std::list<const substrait::Rel *> rel_stack;
     auto query_plan = parseOp(first_read_rel, rel_stack);
     if (!writePipeline)
-        adjustOutput(query_plan, root_rel);
+        adjustOutput(query_plan, plan);
 
 #ifndef NDEBUG
     PlanUtil::checkOuputType(*query_plan);
@@ -297,12 +298,10 @@ DB::QueryPipelineBuilderPtr 
SerializedPlanParser::buildQueryPipeline(DB::QueryPl
         settings,
         0);
     const QueryPlanOptimizationSettings optimization_settings{.optimize_plan = 
settings[Setting::query_plan_enable_optimizations]};
-    return query_plan.buildQueryPipeline(
-        optimization_settings,
-        BuildQueryPipelineSettings{
-            .actions_settings
-            = ExpressionActionsSettings{.can_compile_expressions = true, 
.min_count_to_compile_expression = 3, .compile_expressions = 
CompileExpressions::yes},
-            .process_list_element = query_status});
+    BuildQueryPipelineSettings build_settings = 
BuildQueryPipelineSettings::fromContext(context);
+    build_settings.process_list_element = query_status;
+    build_settings.progress_callback = nullptr;
+    return query_plan.buildQueryPipeline(optimization_settings,build_settings);
 }
 
 std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const 
std::string_view plan)
@@ -311,11 +310,10 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(const std::s
     return createExecutor(parse(s_plan), s_plan);
 }
 
-std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const 
substrait::Plan & s_plan)
+std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const 
substrait::Plan & s_plan) const
 {
     Stopwatch stopwatch;
 
-    const Settings & settings = 
parser_context->queryContext()->getSettingsRef();
     DB::QueryPipelineBuilderPtr builder = nullptr;
     try
     {
@@ -323,7 +321,7 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
     }
     catch (...)
     {
-        LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}", 
PlanUtil::explainPlan(*query_plan));
+        debug::dumpPlan(*query_plan, "Invalid clickhouse plan", true);
         throw;
     }
 
@@ -333,11 +331,6 @@ std::unique_ptr<LocalExecutor> 
SerializedPlanParser::createExecutor(DB::QueryPla
     if (root_rel.root().input().has_write())
         addSinkTransform(parser_context->queryContext(), 
root_rel.root().input().write(), builder);
     LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms", 
stopwatch.elapsedMicroseconds() / 1000.0);
-    LOG_DEBUG(
-        getLogger("SerializedPlanParser"),
-        "clickhouse plan [optimization={}]:\n{}",
-        settings[Setting::query_plan_enable_optimizations],
-        PlanUtil::explainPlan(*query_plan));
 
     auto config = 
ExecutorConfig::loadFromContext(parser_context->queryContext());
     return std::make_unique<LocalExecutor>(std::move(query_plan), 
std::move(builder), config.dump_pipeline);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index f0ec608a33..eadc7112c2 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -68,7 +68,7 @@ private:
 class SerializedPlanParser
 {
 private:
-    std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, 
const substrait::Plan & s_plan);
+    std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan, 
const substrait::Plan & s_plan) const;
 
 public:
     explicit SerializedPlanParser(std::shared_ptr<const ParserContext> 
parser_context_);
@@ -118,7 +118,7 @@ public:
 
 private:
     DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const 
substrait::Rel *> & rel_stack);
-    void adjustOutput(const DB::QueryPlanPtr & query_plan, const 
substrait::PlanRel & root_rel) const;
+    static void adjustOutput(const DB::QueryPlanPtr & query_plan, const 
substrait::Plan & plan);
 
     std::vector<jobject> input_iters;
     std::vector<std::string> split_infos;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to