This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new f65087ece3 [CH] Fix issues due to
https://github.com/ClickHouse/ClickHouse/pull/71539. (#7952)
f65087ece3 is described below
commit f65087ece35fc13a56189c450c56075107aad446
Author: Chang chen <[email protected]>
AuthorDate: Thu Nov 14 23:51:01 2024 +0800
[CH] Fix issues due to https://github.com/ClickHouse/ClickHouse/pull/71539.
(#7952)
* Fix issues due to https://github.com/ClickHouse/ClickHouse/pull/71539.
Issue 1
BuildQueryPipelineSettings is created manually instead of calling
BuildQueryPipelineSettings::fromContext(); so even
https://github.com/ClickHouse/ClickHouse/pull/71890 disable
'query_plan_merge_filters', UTs are still failed.
To fix this issue, we need set correct default parameters in CHUtil.cpp
Issue 2
If we set query_plan_merge_filters to true, then
https://github.com/ClickHouse/ClickHouse/pull/71539 will try to split the left
most AND atom to a separate DAG and hence create FilterTransformer for each And
atom, which cause collecting metrics failed.
I am not sure the benefits of setting it to true, let's keep it to false.
* Calling `QueryPlan::explainPlan` after building pipeline is not correct,
due to `action_dag` is
[moved](https://github.com/ClickHouse/ClickHouse/blob/22d2c856a70dfb8b6e4c506fcb22ac03d59df9be/src/Processors/QueryPlan/FilterStep.cpp#L161).
---
cpp-ch/local-engine/Common/CHUtil.cpp | 11 ++++++++
cpp-ch/local-engine/Common/DebugUtils.cpp | 16 +++++++----
cpp-ch/local-engine/Common/DebugUtils.h | 2 +-
.../local-engine/Parser/SerializedPlanParser.cpp | 33 +++++++++-------------
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 4 +--
5 files changed, 37 insertions(+), 29 deletions(-)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 778c4f257b..2413fae9e3 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -80,6 +80,9 @@ namespace Setting
{
extern const SettingsUInt64 prefer_external_sort_block_bytes;
extern const SettingsUInt64 max_bytes_before_external_sort;
+extern const SettingsBool query_plan_merge_filters;
+extern const SettingsBool compile_expressions;
+extern const SettingsShortCircuitFunctionEvaluation
short_circuit_function_evaluation;
}
namespace ErrorCodes
{
@@ -722,6 +725,14 @@ void BackendInitializerUtil::initSettings(const
SparkConfigs::ConfigMap & spark_
settings.set("max_download_threads", 1);
settings.set("input_format_parquet_enable_row_group_prefetch", false);
+ /// update per https://github.com/ClickHouse/ClickHouse/pull/71539
+ /// if true, we can't get correct metrics for the query
+ settings[Setting::query_plan_merge_filters] = false;
+ /// We now set BuildQueryPipelineSettings according to config.
+ settings[Setting::compile_expressions] = true;
+ settings[Setting::short_circuit_function_evaluation] =
ShortCircuitFunctionEvaluation::DISABLE;
+ ///
+
for (const auto & [key, value] : spark_conf_map)
{
// Firstly apply
spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to
settings
diff --git a/cpp-ch/local-engine/Common/DebugUtils.cpp
b/cpp-ch/local-engine/Common/DebugUtils.cpp
index 2fcab59bf8..8a4323cb1c 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.cpp
+++ b/cpp-ch/local-engine/Common/DebugUtils.cpp
@@ -38,7 +38,7 @@ namespace pb_util = google::protobuf::util;
namespace debug
{
-void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr logger)
+void dumpPlan(DB::QueryPlan & plan, const char * type, bool force, LoggerPtr
logger)
{
if (!logger)
{
@@ -51,10 +51,12 @@ void dumpPlan(DB::QueryPlan & plan, bool force, LoggerPtr
logger)
return;
auto out = local_engine::PlanUtil::explainPlan(plan);
+ auto task_id =
local_engine::QueryContext::instance().currentTaskIdOrEmpty();
+ task_id = task_id.empty() ? "" : "(" + task_id + ")";
if (force) // force
- LOG_ERROR(logger, "clickhouse plan({}):\n{}",
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
+ LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, out);
else
- LOG_DEBUG(logger, "clickhouse plan({}):\n{}",
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), out);
+ LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, out);
}
void dumpMessage(const google::protobuf::Message & message, const char * type,
bool force, LoggerPtr logger)
@@ -70,13 +72,15 @@ void dumpMessage(const google::protobuf::Message & message,
const char * type, b
return;
pb_util::JsonOptions options;
std::string json;
- if (auto s = google::protobuf::json::MessageToJsonString(message, &json,
options); !s.ok())
+ if (auto s = MessageToJsonString(message, &json, options); !s.ok())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Can not convert {}
to Json", type);
+ auto task_id =
local_engine::QueryContext::instance().currentTaskIdOrEmpty();
+ task_id = task_id.empty() ? "" : "(" + task_id + ")";
if (force) // force
- LOG_ERROR(logger, "{}({}):\n{}", type,
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
+ LOG_ERROR(logger, "{}{} =>\n{}", type, task_id, json);
else
- LOG_DEBUG(logger, "{}({}):\n{}", type,
local_engine::QueryContext::instance().currentTaskIdOrEmpty(), json);
+ LOG_DEBUG(logger, "{}{} =>\n{}", type, task_id, json);
}
void headBlock(const DB::Block & block, size_t count)
diff --git a/cpp-ch/local-engine/Common/DebugUtils.h
b/cpp-ch/local-engine/Common/DebugUtils.h
index 55a0be5140..338326b05e 100644
--- a/cpp-ch/local-engine/Common/DebugUtils.h
+++ b/cpp-ch/local-engine/Common/DebugUtils.h
@@ -29,7 +29,7 @@ class QueryPlan;
namespace debug
{
-void dumpPlan(DB::QueryPlan & plan, bool force = false, LoggerPtr = nullptr);
+void dumpPlan(DB::QueryPlan & plan, const char * type = "clickhouse plan",
bool force = false, LoggerPtr = nullptr);
void dumpMessage(const google::protobuf::Message & message, const char * type,
bool force = false, LoggerPtr = nullptr);
void headBlock(const DB::Block & block, size_t count = 10);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 748ff88acb..4e461a5c49 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -101,8 +101,9 @@ std::string join(const ActionsDAG::NodeRawConstPtrs & v,
char c)
return res;
}
-void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::PlanRel & root_rel) const
+void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan,
const substrait::Plan & plan)
{
+ const substrait::PlanRel & root_rel = plan.relations().at(0);
if (root_rel.root().names_size())
{
ActionsDAG
actions_dag{blockToNameAndTypeList(query_plan->getCurrentHeader())};
@@ -110,8 +111,8 @@ void SerializedPlanParser::adjustOutput(const
DB::QueryPlanPtr & query_plan, con
const auto cols =
query_plan->getCurrentHeader().getNamesAndTypesList();
if (cols.getNames().size() !=
static_cast<size_t>(root_rel.root().names_size()))
{
- debug::dumpPlan(*query_plan, true);
- debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+ debug::dumpPlan(*query_plan, "clickhouse plan", true);
+ debug::dumpMessage(plan, "substrait::Plan", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait
plan name size {}.",
@@ -134,8 +135,8 @@ void SerializedPlanParser::adjustOutput(const
DB::QueryPlanPtr & query_plan, con
const auto & original_cols =
original_header.getColumnsWithTypeAndName();
if (static_cast<size_t>(output_schema.types_size()) !=
original_cols.size())
{
- debug::dumpPlan(*query_plan, true);
- debug::dumpMessage(root_rel, "substrait::PlanRel", true);
+ debug::dumpPlan(*query_plan, "clickhouse plan", true);
+ debug::dumpMessage(plan, "substrait::Plan", true);
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Missmatch result columns size. plan column size {}, subtrait
plan output schema size {}, subtrait plan name size {}.",
@@ -198,7 +199,7 @@ QueryPlanPtr SerializedPlanParser::parse(const
substrait::Plan & plan)
std::list<const substrait::Rel *> rel_stack;
auto query_plan = parseOp(first_read_rel, rel_stack);
if (!writePipeline)
- adjustOutput(query_plan, root_rel);
+ adjustOutput(query_plan, plan);
#ifndef NDEBUG
PlanUtil::checkOuputType(*query_plan);
@@ -297,12 +298,10 @@ DB::QueryPipelineBuilderPtr
SerializedPlanParser::buildQueryPipeline(DB::QueryPl
settings,
0);
const QueryPlanOptimizationSettings optimization_settings{.optimize_plan =
settings[Setting::query_plan_enable_optimizations]};
- return query_plan.buildQueryPipeline(
- optimization_settings,
- BuildQueryPipelineSettings{
- .actions_settings
- = ExpressionActionsSettings{.can_compile_expressions = true,
.min_count_to_compile_expression = 3, .compile_expressions =
CompileExpressions::yes},
- .process_list_element = query_status});
+ BuildQueryPipelineSettings build_settings =
BuildQueryPipelineSettings::fromContext(context);
+ build_settings.process_list_element = query_status;
+ build_settings.progress_callback = nullptr;
+ return query_plan.buildQueryPipeline(optimization_settings,build_settings);
}
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const
std::string_view plan)
@@ -311,11 +310,10 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(const std::s
return createExecutor(parse(s_plan), s_plan);
}
-std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const
substrait::Plan & s_plan)
+std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan, const
substrait::Plan & s_plan) const
{
Stopwatch stopwatch;
- const Settings & settings =
parser_context->queryContext()->getSettingsRef();
DB::QueryPipelineBuilderPtr builder = nullptr;
try
{
@@ -323,7 +321,7 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
}
catch (...)
{
- LOG_ERROR(getLogger("SerializedPlanParser"), "Invalid plan:\n{}",
PlanUtil::explainPlan(*query_plan));
+ debug::dumpPlan(*query_plan, "Invalid clickhouse plan", true);
throw;
}
@@ -333,11 +331,6 @@ std::unique_ptr<LocalExecutor>
SerializedPlanParser::createExecutor(DB::QueryPla
if (root_rel.root().input().has_write())
addSinkTransform(parser_context->queryContext(),
root_rel.root().input().write(), builder);
LOG_INFO(getLogger("SerializedPlanParser"), "build pipeline {} ms",
stopwatch.elapsedMicroseconds() / 1000.0);
- LOG_DEBUG(
- getLogger("SerializedPlanParser"),
- "clickhouse plan [optimization={}]:\n{}",
- settings[Setting::query_plan_enable_optimizations],
- PlanUtil::explainPlan(*query_plan));
auto config =
ExecutorConfig::loadFromContext(parser_context->queryContext());
return std::make_unique<LocalExecutor>(std::move(query_plan),
std::move(builder), config.dump_pipeline);
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index f0ec608a33..eadc7112c2 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -68,7 +68,7 @@ private:
class SerializedPlanParser
{
private:
- std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan,
const substrait::Plan & s_plan);
+ std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan,
const substrait::Plan & s_plan) const;
public:
explicit SerializedPlanParser(std::shared_ptr<const ParserContext>
parser_context_);
@@ -118,7 +118,7 @@ public:
private:
DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const
substrait::Rel *> & rel_stack);
- void adjustOutput(const DB::QueryPlanPtr & query_plan, const
substrait::PlanRel & root_rel) const;
+ static void adjustOutput(const DB::QueryPlanPtr & query_plan, const
substrait::Plan & plan);
std::vector<jobject> input_iters;
std::vector<std::string> split_infos;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]