This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5915c9b6b Rename QueryContextManager => QueryContext (#7147)
5915c9b6b is described below
commit 5915c9b6bca2e3ca2412f3ed29a5f5b418a21602
Author: Chang chen <[email protected]>
AuthorDate: Mon Sep 9 10:20:21 2024 +0800
Rename QueryContextManager => QueryContext (#7147)
Move SerializedPlanParser::global_context to QueryContext:Data
Move SerializedPlanParser::shared_context to QueryContext::Data
Remove SerializedPlanParser config
Cleanup #include <Parser/SerializedPlanParser.h>
---
cpp-ch/local-engine/Common/CHUtil.cpp | 28 +++-----
cpp-ch/local-engine/Common/CHUtil.h | 2 +-
cpp-ch/local-engine/Common/QueryContext.cpp | 78 ++++++++++++++--------
cpp-ch/local-engine/Common/QueryContext.h | 21 ++++--
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 9 ++-
.../Disks/ObjectStorages/GlutenDiskS3.cpp | 14 ++--
.../Disks/ObjectStorages/GlutenDiskS3.h | 13 ++--
cpp-ch/local-engine/Parser/CrossRelParser.cpp | 17 ++---
cpp-ch/local-engine/Parser/ExpandRelParser.cpp | 2 -
cpp-ch/local-engine/Parser/ExpandRelParser.h | 3 +-
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 4 +-
cpp-ch/local-engine/Parser/MergeTreeRelParser.h | 7 +-
cpp-ch/local-engine/Parser/RelMetric.cpp | 6 +-
.../local-engine/Parser/SerializedPlanParser.cpp | 6 --
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 22 +++---
cpp-ch/local-engine/Parser/TypeParser.cpp | 5 +-
cpp-ch/local-engine/Parser/TypeParser.h | 2 +-
cpp-ch/local-engine/Parser/WriteRelParser.cpp | 1 +
.../Parser/example_udf/tests/gtest_my_add.cpp | 5 +-
.../Parser/example_udf/tests/gtest_my_md5.cpp | 5 +-
cpp-ch/local-engine/Rewriter/RelRewriter.h | 21 +++---
cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 10 ++-
cpp-ch/local-engine/Shuffle/PartitionWriter.h | 17 +++--
cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp | 6 +-
.../local-engine/Storages/Cache/CacheManager.cpp | 3 +-
cpp-ch/local-engine/Storages/Cache/CacheManager.h | 2 +-
.../local-engine/Storages/Cache/JobScheduler.cpp | 4 +-
cpp-ch/local-engine/Storages/Cache/JobScheduler.h | 6 +-
.../Storages/MergeTree/MetaDataHelper.cpp | 4 +-
.../Storages/MergeTree/StorageMergeTreeFactory.cpp | 2 +-
.../Storages/MergeTree/StorageMergeTreeFactory.h | 5 +-
.../Storages/Output/FileWriterWrappers.cpp | 1 +
.../Storages/Output/FileWriterWrappers.h | 51 +++++++-------
cpp-ch/local-engine/local_engine_jni.cpp | 22 +++---
.../tests/benchmark_cast_float_function.cpp | 19 +++---
.../local-engine/tests/benchmark_local_engine.cpp | 24 +++----
.../local-engine/tests/benchmark_parquet_read.cpp | 17 +++--
.../tests/benchmark_spark_divide_function.cpp | 5 +-
.../tests/benchmark_spark_floor_function.cpp | 16 ++---
cpp-ch/local-engine/tests/benchmark_spark_row.cpp | 5 +-
.../tests/benchmark_to_datetime_function.cpp | 18 +++--
.../tests/benchmark_unix_timestamp_function.cpp | 32 ++++-----
cpp-ch/local-engine/tests/gluten_test_util.cpp | 6 +-
cpp-ch/local-engine/tests/gluten_test_util.h | 5 +-
cpp-ch/local-engine/tests/gtest_ch_functions.cpp | 20 +++---
cpp-ch/local-engine/tests/gtest_ch_join.cpp | 20 +++---
cpp-ch/local-engine/tests/gtest_ch_storages.cpp | 16 ++---
.../tests/gtest_clickhouse_pr_verify.cpp | 7 +-
.../tests/gtest_parquet_columnindex.cpp | 18 ++---
.../tests/gtest_parquet_columnindex_bug.cpp | 5 +-
cpp-ch/local-engine/tests/gtest_parser.cpp | 5 +-
cpp-ch/local-engine/tests/gtest_write_pipeline.cpp | 25 +++----
52 files changed, 322 insertions(+), 345 deletions(-)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b787888f5..68ed66c45 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -46,7 +46,6 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/registerFunctions.h>
-#include <IO/ReadBufferFromFile.h>
#include <IO/SharedThreadPools.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Parser/RelParser.h>
@@ -73,6 +72,7 @@
#include <Common/CurrentThread.h>
#include <Common/GlutenSignalHandler.h>
#include <Common/LoggerExtend.h>
+#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
@@ -815,14 +815,9 @@ void
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
{
/// Make sure global_context and shared_context are constructed only once.
- auto & shared_context = SerializedPlanParser::shared_context;
- if (!shared_context.get())
- shared_context = SharedContextHolder(Context::createShared());
-
- auto & global_context = SerializedPlanParser::global_context;
- if (!global_context)
+ if (auto global_context = QueryContext::globalMutableContext();
!global_context)
{
- global_context = Context::createGlobal(shared_context.get());
+ global_context = QueryContext::createGlobal();
global_context->makeGlobalContext();
global_context->setConfig(config);
@@ -878,9 +873,9 @@ void
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
}
}
-void
BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr
config, DB::Settings & settings)
+void BackendInitializerUtil::applyGlobalConfigAndSettings(const
DB::Context::ConfigurationPtr & config, const DB::Settings & settings)
{
- auto & global_context = SerializedPlanParser::global_context;
+ const auto global_context = QueryContext::globalMutableContext();
global_context->setConfig(config);
global_context->setSettings(settings);
}
@@ -974,8 +969,8 @@ void BackendInitializerUtil::init(const std::string_view
plan)
// Init the table metadata cache map
StorageMergeTreeFactory::init_cache_map();
- JobScheduler::initialize(SerializedPlanParser::global_context);
- CacheManager::initialize(SerializedPlanParser::global_context);
+ JobScheduler::initialize(QueryContext::globalContext());
+ CacheManager::initialize(QueryContext::globalMutableContext());
std::call_once(
init_flag,
@@ -1025,14 +1020,7 @@ void BackendFinalizerUtil::finalizeGlobally()
// Make sure client caches release before ClientCacheRegistry
ReadBufferBuilderFactory::instance().clean();
StorageMergeTreeFactory::clear();
- auto & global_context = SerializedPlanParser::global_context;
- auto & shared_context = SerializedPlanParser::shared_context;
- if (global_context)
- {
- global_context->shutdown();
- global_context.reset();
- shared_context.reset();
- }
+ QueryContext::resetGlobal();
std::lock_guard lock(paths_mutex);
std::ranges::for_each(paths_need_to_clean, [](const auto & path)
{
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index 3c741c7ff..2e0b7266c 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -213,7 +213,7 @@ private:
static void initContexts(DB::Context::ConfigurationPtr config);
static void initCompiledExpressionCache(DB::Context::ConfigurationPtr
config);
static void registerAllFactories();
- static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr,
DB::Settings &);
+ static void applyGlobalConfigAndSettings(const
DB::Context::ConfigurationPtr & config, const DB::Settings & settings);
static void updateNewSettings(const DB::ContextMutablePtr &, const
DB::Settings &);
static std::vector<String>
wrapDiskPathConfig(const String & path_prefix, const String & path_suffix,
Poco::Util::AbstractConfiguration & config);
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp
b/cpp-ch/local-engine/Common/QueryContext.cpp
index 7cd96f4b1..142738aa3 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -19,7 +19,6 @@
#include <iomanip>
#include <sstream>
#include <Interpreters/Context.h>
-#include <Parser/SerializedPlanParser.h>
#include <base/unit.h>
#include <Common/CHUtil.h>
#include <Common/ConcurrentMap.h>
@@ -35,21 +34,59 @@ extern const int LOGICAL_ERROR;
}
}
+using namespace DB;
+
namespace local_engine
{
-using namespace DB;
-struct QueryContext
+struct QueryContext::Data
{
std::shared_ptr<ThreadStatus> thread_status;
std::shared_ptr<ThreadGroup> thread_group;
ContextMutablePtr query_context;
+
+ static DB::ContextMutablePtr global_context;
+ static SharedContextHolder shared_context;
};
-int64_t QueryContextManager::initializeQuery()
+ContextMutablePtr QueryContext::Data::global_context{};
+SharedContextHolder QueryContext::Data::shared_context{};
+
+DB::ContextMutablePtr QueryContext::globalMutableContext()
+{
+ return Data::global_context;
+}
+void QueryContext::resetGlobal()
+{
+ if (Data::global_context)
+ {
+ Data::global_context->shutdown();
+ Data::global_context.reset();
+ }
+ Data::shared_context.reset();
+}
+
+DB::ContextMutablePtr QueryContext::createGlobal()
+{
+ assert(Data::shared_context.get() == nullptr);
+
+ if (!Data::shared_context.get())
+ Data::shared_context = SharedContextHolder(Context::createShared());
+
+ assert(Data::global_context == nullptr);
+ Data::global_context = Context::createGlobal(Data::shared_context.get());
+ return globalMutableContext();
+}
+
+DB::ContextPtr QueryContext::globalContext()
+{
+ return Data::global_context;
+}
+
+int64_t QueryContext::initializeQuery()
{
- std::shared_ptr<QueryContext> query_context =
std::make_shared<QueryContext>();
- query_context->query_context =
Context::createCopy(SerializedPlanParser::global_context);
+ std::shared_ptr<Data> query_context = std::make_shared<Data>();
+ query_context->query_context = Context::createCopy(globalContext());
query_context->query_context->makeQueryContext();
// empty input will trigger random query id to be set
@@ -72,14 +109,14 @@ int64_t QueryContextManager::initializeQuery()
return id;
}
-DB::ContextMutablePtr QueryContextManager::currentQueryContext()
+DB::ContextMutablePtr QueryContext::currentQueryContext()
{
auto thread_group = currentThreadGroup();
const int64_t id =
reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
return query_map_.get(id)->query_context;
}
-std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
+std::shared_ptr<DB::ThreadGroup> QueryContext::currentThreadGroup()
{
if (auto thread_group = CurrentThread::getGroup())
return thread_group;
@@ -87,12 +124,10 @@ std::shared_ptr<DB::ThreadGroup>
QueryContextManager::currentThreadGroup()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
}
-void
QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters &
counters) const
+void QueryContext::logCurrentPerformanceCounters(ProfileEvents::Counters &
counters) const
{
if (!CurrentThread::getGroup())
- {
return;
- }
if (logger_->information())
{
std::ostringstream msg;
@@ -104,44 +139,37 @@ void
QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters
auto & count = counters[event];
if (count == 0)
continue;
- msg << std::setw(50) << std::setfill(' ') << std::left << name <<
"|"
- << std::setw(20) << std::setfill(' ') << std::left <<
count.load()
- << " | (" << doc << ")\n";
+ msg << std::setw(50) << std::setfill(' ') << std::left << name <<
"|" << std::setw(20) << std::setfill(' ') << std::left
+ << count.load() << " | (" << doc << ")\n";
}
LOG_INFO(logger_, "{}", msg.str());
}
}
-size_t QueryContextManager::currentPeakMemory(int64_t id)
+size_t QueryContext::currentPeakMemory(int64_t id)
{
if (!query_map_.contains(id))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "context released {}",
id);
return query_map_.get(id)->thread_group->memory_tracker.getPeak();
}
-void QueryContextManager::finalizeQuery(int64_t id)
+void QueryContext::finalizeQuery(int64_t id)
{
if (!CurrentThread::getGroup())
- {
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not
found.");
- }
- std::shared_ptr<QueryContext> context;
+ std::shared_ptr<Data> context;
{
context = query_map_.get(id);
}
auto query_context = context->thread_status->getQueryContext();
if (!query_context)
- {
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "query context not
found");
- }
context->thread_status->flushUntrackedMemory();
context->thread_status->finalizePerformanceCounters();
LOG_INFO(logger_, "Task finished, peak memory usage: {} bytes",
currentPeakMemory(id));
if (currentThreadGroupMemoryUsage() > 1_MiB)
- {
LOG_WARNING(logger_, "{} bytes memory didn't release, There may be a
memory leak!", currentThreadGroupMemoryUsage());
- }
logCurrentPerformanceCounters(context->thread_group->performance_counters);
context->thread_status->detachFromGroup();
context->thread_group.reset();
@@ -155,18 +183,14 @@ void QueryContextManager::finalizeQuery(int64_t id)
size_t currentThreadGroupMemoryUsage()
{
if (!CurrentThread::getGroup())
- {
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not
found, please call initializeQuery first.");
- }
return CurrentThread::getGroup()->memory_tracker.get();
}
double currentThreadGroupMemoryUsageRatio()
{
if (!CurrentThread::getGroup())
- {
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not
found, please call initializeQuery first.");
- }
return
static_cast<double>(CurrentThread::getGroup()->memory_tracker.get()) /
CurrentThread::getGroup()->memory_tracker.getSoftLimit();
}
}
diff --git a/cpp-ch/local-engine/Common/QueryContext.h
b/cpp-ch/local-engine/Common/QueryContext.h
index 5079589f4..821144f5f 100644
--- a/cpp-ch/local-engine/Common/QueryContext.h
+++ b/cpp-ch/local-engine/Common/QueryContext.h
@@ -19,16 +19,25 @@
#include <Common/ConcurrentMap.h>
#include <Common/ThreadStatus.h>
+namespace DB
+{
+struct ContextSharedPart;
+}
namespace local_engine
{
-struct QueryContext;
-class QueryContextManager
+class QueryContext
{
+ struct Data;
+
public:
- static QueryContextManager & instance()
+ static DB::ContextMutablePtr createGlobal();
+ static void resetGlobal();
+ static DB::ContextMutablePtr globalMutableContext();
+ static DB::ContextPtr globalContext();
+ static QueryContext & instance()
{
- static QueryContextManager instance;
+ static QueryContext instance;
return instance;
}
int64_t initializeQuery();
@@ -39,9 +48,9 @@ public:
void finalizeQuery(int64_t id);
private:
- QueryContextManager() = default;
+ QueryContext() = default;
LoggerPtr logger_ = getLogger("QueryContextManager");
- ConcurrentMap<int64_t, std::shared_ptr<QueryContext>> query_map_{};
+ ConcurrentMap<int64_t, std::shared_ptr<Data>> query_map_{};
};
size_t currentThreadGroupMemoryUsage();
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index 9c4b390ea..bd005132b 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -17,11 +17,10 @@
#include "GlutenDiskHDFS.h"
#include <ranges>
-
+#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
+#include <Common/QueryContext.h>
#include <Common/Throttler.h>
-#include <Parser/SerializedPlanParser.h>
-#include "CompactObjectStorageDiskTransaction.h"
#if USE_HDFS
namespace local_engine
@@ -30,7 +29,7 @@ using namespace DB;
DiskTransactionPtr GlutenDiskHDFS::createTransaction()
{
- return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+ return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
}
void GlutenDiskHDFS::createDirectory(const String & path)
@@ -78,7 +77,7 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
- SerializedPlanParser::global_context->getConfigRef(),
+ QueryContext::globalContext()->getConfigRef(),
config_prefix,
object_storage_creator);
}
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
index 4a73c5a49..b2a6bb523 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
@@ -17,11 +17,13 @@
#pragma once
-
#include "GlutenDiskS3.h"
+#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
-#include <Parser/SerializedPlanParser.h>
-#include "CompactObjectStorageDiskTransaction.h"
+#include <Interpreters/Context.h>
+#include <Common/QueryContext.h>
+
+using namespace DB;
#if USE_AWS_S3
namespace local_engine
@@ -29,10 +31,10 @@ namespace local_engine
DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
{
- return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+ return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
QueryContext::globalContext()->getTempDataOnDisk()->getVolume()->getDisk());
}
- std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
+ std::unique_ptr<DB::ReadBufferFromFileBase> GlutenDiskS3::readFile(
const String & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
@@ -52,7 +54,7 @@ namespace local_engine
object_key_prefix,
getMetadataStorage(),
getObjectStorage(),
- SerializedPlanParser::global_context->getConfigRef(),
+ QueryContext::globalContext()->getConfigRef(),
config_prefix,
object_storage_creator);
}
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
index 4f0d7a029..00a463b6d 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
@@ -19,8 +19,7 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h>
-#include <Parser/SerializedPlanParser.h>
-#include "CompactObjectStorageDiskTransaction.h"
+
#if USE_AWS_S3
namespace local_engine
@@ -41,13 +40,11 @@ public:
DB::DiskTransactionPtr createTransaction() override;
- std::unique_ptr<ReadBufferFromFileBase> readFile(
- const String & path,
- const ReadSettings & settings,
- std::optional<size_t> read_hint,
- std::optional<size_t> file_size) const override;
+ std::unique_ptr<DB::ReadBufferFromFileBase>
+ readFile(const String & path, const DB::ReadSettings & settings,
std::optional<size_t> read_hint, std::optional<size_t> file_size)
+ const override;
- DiskObjectStoragePtr createDiskObjectStorage() override;
+ DB::DiskObjectStoragePtr createDiskObjectStorage() override;
private:
std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration
& conf, DB::ContextPtr context)> object_storage_creator;
diff --git a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
index 454f0387f..3cb6ff7ed 100644
--- a/cpp-ch/local-engine/Parser/CrossRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/CrossRelParser.cpp
@@ -16,45 +16,40 @@
*/
#include "CrossRelParser.h"
-#include <IO/ReadBufferFromString.h>
-#include <IO/ReadHelpers.h>
#include <Interpreters/CollectJoinOnKeysVisitor.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
#include <Join/BroadCastJoinBuilder.h>
#include <Join/StorageJoinFromReadBuffer.h>
-#include <Parser/SerializedPlanParser.h>
#include <Parser/AdvancedParametersParseUtil.h>
+#include <Parser/SerializedPlanParser.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <google/protobuf/wrappers.pb.h>
#include <Common/CHUtil.h>
+#include <Common/QueryContext.h>
#include <Common/logger_useful.h>
-
namespace DB
{
namespace ErrorCodes
{
- extern const int LOGICAL_ERROR;
- extern const int UNKNOWN_TYPE;
- extern const int BAD_ARGUMENTS;
+extern const int LOGICAL_ERROR;
+extern const int UNKNOWN_TYPE;
+extern const int BAD_ARGUMENTS;
}
}
using namespace DB;
-
-
-
namespace local_engine
{
std::shared_ptr<DB::TableJoin>
createCrossTableJoin(substrait::CrossRel_JoinType join_type)
{
- auto & global_context = SerializedPlanParser::global_context;
+ auto global_context = QueryContext::globalContext();
auto table_join = std::make_shared<TableJoin>(
global_context->getSettingsRef(),
global_context->getGlobalTemporaryVolume(),
global_context->getTempDataOnDisk());
diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp
b/cpp-ch/local-engine/Parser/ExpandRelParser.cpp
index 960c9eba1..c621332db 100644
--- a/cpp-ch/local-engine/Parser/ExpandRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/ExpandRelParser.cpp
@@ -22,9 +22,7 @@
#include <Parser/ExpandField.h>
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
-#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
-#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
diff --git a/cpp-ch/local-engine/Parser/ExpandRelParser.h
b/cpp-ch/local-engine/Parser/ExpandRelParser.h
index 449515001..1ca7cc814 100644
--- a/cpp-ch/local-engine/Parser/ExpandRelParser.h
+++ b/cpp-ch/local-engine/Parser/ExpandRelParser.h
@@ -16,10 +16,11 @@
*/
#pragma once
#include <Parser/RelParser.h>
-#include <Parser/SerializedPlanParser.h>
+
namespace local_engine
{
+class SerializedPlanParser;
class ExpandRelParser : public RelParser
{
public:
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 20340a099..730a013dc 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -60,7 +60,7 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
MergeTreeTableInstance merge_tree_table(extension_table);
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
- auto storage = merge_tree_table.restoreStorage(global_context);
+ auto storage =
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
DB::Block input;
if (rel.has_base_schema() && rel.base_schema().names_size())
@@ -318,7 +318,7 @@ String MergeTreeRelParser::filterRangesOnDriver(const
substrait::ReadRel & read_
MergeTreeTableInstance
merge_tree_table(read_rel.advanced_extension().enhancement());
// ignore snapshot id for query
merge_tree_table.snapshot_id = "";
- auto storage = merge_tree_table.restoreStorage(global_context);
+ auto storage =
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
auto names_and_types_list = input.getNamesAndTypesList();
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 91c335a68..94b4809d3 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -20,7 +20,6 @@
#include <substrait/algebra.pb.h>
#include <Parser/RelParser.h>
-#include <Parser/SerializedPlanParser.h>
namespace DB
{
@@ -37,8 +36,7 @@ using namespace DB;
class MergeTreeRelParser : public RelParser
{
public:
- explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const
ContextPtr & context_)
- : RelParser(plan_paser_), context(context_),
global_context(plan_paser_->global_context)
+ explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const
ContextPtr & context_) : RelParser(plan_paser_), context(context_)
{
}
@@ -88,8 +86,7 @@ private:
static void collectColumns(const substrait::Expression & rel, NameSet &
columns, Block & block);
UInt64 getColumnsSize(const NameSet & columns);
- const ContextPtr & context;
- ContextMutablePtr & global_context;
+ ContextPtr context;
};
}
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index e13864260..98a9b284e 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -49,7 +49,7 @@ namespace local_engine
static void writeCacheHits(Writer<StringBuffer> & writer)
{
- const auto thread_group = QueryContextManager::currentThreadGroup();
+ const auto thread_group = QueryContext::currentThreadGroup();
auto & counters = thread_group->performance_counters;
auto read_cache_hits =
counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
auto miss_cache_hits =
counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
@@ -109,7 +109,7 @@ RelMetricTimes RelMetric::getTotalTime() const
{
for (const auto & processor : step->getProcessors())
{
- timeMetrics.time += processor->getElapsedNs() / 1000U ;
+ timeMetrics.time += processor->getElapsedNs() / 1000U;
timeMetrics.input_wait_elapsed_us +=
processor->getInputWaitElapsedNs() / 1000U;
timeMetrics.output_wait_elapsed_us +=
processor->getInputWaitElapsedNs() / 1000U;
}
@@ -209,9 +209,7 @@ std::string RelMetricSerializer::serializeRelMetric(const
RelMetricPtr & rel_met
auto metric = metrics.top();
metrics.pop();
for (const auto & item : metric->getInputs())
- {
metrics.push(item);
- }
metric->serialize(writer);
}
writer.EndArray();
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 718656ac8..8efbd97d2 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -1324,10 +1324,6 @@ SerializedPlanParser::SerializedPlanParser(const
ContextPtr & context_) : contex
{
}
-ContextMutablePtr SerializedPlanParser::global_context = nullptr;
-
-Context::ConfigurationPtr SerializedPlanParser::config = nullptr;
-
void SerializedPlanParser::collectJoinKeys(
const substrait::Expression & condition, std::vector<std::pair<int32_t,
int32_t>> & join_keys, int32_t right_key_start)
{
@@ -1565,8 +1561,6 @@ void SerializedPlanParser::wrapNullable(
}
}
-SharedContextHolder SerializedPlanParser::shared_context;
-
LocalExecutor::~LocalExecutor()
{
if (dump_pipeline)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 85150c099..112e82a87 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -131,9 +131,6 @@ public:
static std::pair<DataTypePtr, Field> parseLiteral(const
substrait::Expression_Literal & literal);
- static ContextMutablePtr global_context;
- static Context::ConfigurationPtr config;
- static SharedContextHolder shared_context;
std::vector<QueryPlanPtr> extra_plan_holder;
private:
@@ -142,22 +139,19 @@ private:
collectJoinKeys(const substrait::Expression & condition,
std::vector<std::pair<int32_t, int32_t>> & join_keys, int32_t right_key_start);
void parseFunctionOrExpression(
- const substrait::Expression & rel,
- std::string & result_name,
- DB::ActionsDAG& actions_dag,
- bool keep_result = false);
+ const substrait::Expression & rel, std::string & result_name,
DB::ActionsDAG & actions_dag, bool keep_result = false);
void parseJsonTuple(
const substrait::Expression & rel,
std::vector<String> & result_names,
- DB::ActionsDAG& actions_dag,
+ DB::ActionsDAG & actions_dag,
bool keep_result = false,
bool position = false);
const ActionsDAG::Node * parseFunctionWithDAG(
- const substrait::Expression & rel, std::string & result_name,
DB::ActionsDAG& actions_dag, bool keep_result = false);
+ const substrait::Expression & rel, std::string & result_name,
DB::ActionsDAG & actions_dag, bool keep_result = false);
ActionsDAG::NodeRawConstPtrs parseArrayJoinWithDAG(
const substrait::Expression & rel,
std::vector<String> & result_name,
- DB::ActionsDAG& actions_dag,
+ DB::ActionsDAG & actions_dag,
bool keep_result = false,
bool position = false);
void parseFunctionArguments(
@@ -174,14 +168,14 @@ private:
bool & is_map);
- const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG& actions_dag,
const substrait::Expression & rel);
+ const DB::ActionsDAG::Node * parseExpression(DB::ActionsDAG & actions_dag,
const substrait::Expression & rel);
const ActionsDAG::Node *
- toFunctionNode(ActionsDAG& actions_dag, const String & function, const
DB::ActionsDAG::NodeRawConstPtrs & args);
+ toFunctionNode(ActionsDAG & actions_dag, const String & function, const
DB::ActionsDAG::NodeRawConstPtrs & args);
// remove nullable after isNotNull
void removeNullableForRequiredColumns(const std::set<String> &
require_columns, ActionsDAG & actions_dag) const;
std::string getUniqueName(const std::string & name) { return name + "_" +
std::to_string(name_no++); }
void wrapNullable(
- const std::vector<String> & columns, ActionsDAG& actions_dag,
std::map<std::string, std::string> & nullable_measure_names);
+ const std::vector<String> & columns, ActionsDAG & actions_dag,
std::map<std::string, std::string> & nullable_measure_names);
static std::pair<DB::DataTypePtr, DB::Field> convertStructFieldType(const
DB::DataTypePtr & type, const DB::Field & field);
bool isFunction(substrait::Expression_ScalarFunction rel, String
function_name);
@@ -198,7 +192,7 @@ private:
std::vector<RelMetricPtr> metrics;
public:
- const ActionsDAG::Node * addColumn(DB::ActionsDAG& actions_dag, const
DataTypePtr & type, const Field & field);
+ const ActionsDAG::Node * addColumn(DB::ActionsDAG & actions_dag, const
DataTypePtr & type, const Field & field);
};
struct SparkBuffer
diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp
b/cpp-ch/local-engine/Parser/TypeParser.cpp
index 269f35747..84c936226 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.cpp
+++ b/cpp-ch/local-engine/Parser/TypeParser.cpp
@@ -19,14 +19,12 @@
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate32.h>
-#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
-#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypesDecimal.h>
@@ -38,6 +36,7 @@
#include <Parser/TypeParser.h>
#include <Poco/StringTokenizer.h>
#include <Common/Exception.h>
+#include <Common/QueryContext.h>
namespace DB
{
@@ -274,7 +273,7 @@ DB::Block TypeParser::buildBlockFromNamedStruct(const
substrait::NamedStruct & s
auto args_types = tuple_type->getElements();
AggregateFunctionProperties properties;
- auto tmp_ctx =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ auto tmp_ctx =
DB::Context::createCopy(QueryContext::globalContext());
SerializedPlanParser tmp_plan_parser(tmp_ctx);
auto function_parser =
AggregateFunctionParserFactory::instance().get(name_parts[3], &tmp_plan_parser);
/// This may remove elements from args_types, because some of them
are used to determine CH function name, but not needed for the following
diff --git a/cpp-ch/local-engine/Parser/TypeParser.h
b/cpp-ch/local-engine/Parser/TypeParser.h
index 57a12de04..2a498989e 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.h
+++ b/cpp-ch/local-engine/Parser/TypeParser.h
@@ -16,7 +16,7 @@
*/
#pragma once
#include <list>
-#include <optional>
+
#include <unordered_map>
#include <Core/Block.h>
#include <DataTypes/IDataType.h>
diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.cpp
b/cpp-ch/local-engine/Parser/WriteRelParser.cpp
index ecae3f16d..c1d2ee250 100644
--- a/cpp-ch/local-engine/Parser/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/WriteRelParser.cpp
@@ -30,6 +30,7 @@
#include <Common/GlutenSettings.h>
using namespace local_engine;
+using namespace DB;
DB::ProcessorPtr make_sink(
const DB::ContextPtr & context,
diff --git a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp
b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp
index 12284ea00..35e863e12 100644
--- a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp
+++ b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_add.cpp
@@ -15,17 +15,18 @@
* limitations under the License.
*/
#include <iostream>
+#include <DataTypes/DataTypeNullable.h>
#include <Parser/FunctionExecutor.h>
#include <Parser/FunctionParser.h>
#include <gtest/gtest.h>
-#include <DataTypes/DataTypeNullable.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
TEST(MyAdd, Common)
{
- auto context = local_engine::SerializedPlanParser::global_context;
+ auto context = local_engine::QueryContext::globalContext();
auto type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>());
FunctionExecutor executor("my_add", {type, type}, type, context);
diff --git a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp
b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp
index c58262d98..3ff03e0b9 100644
--- a/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp
+++ b/cpp-ch/local-engine/Parser/example_udf/tests/gtest_my_md5.cpp
@@ -15,17 +15,18 @@
* limitations under the License.
*/
#include <iostream>
+#include <DataTypes/DataTypeNullable.h>
#include <Parser/FunctionExecutor.h>
#include <Parser/FunctionParser.h>
#include <gtest/gtest.h>
-#include <DataTypes/DataTypeNullable.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
TEST(MyMd5, Common)
{
- auto context = local_engine::SerializedPlanParser::global_context;
+ auto context = local_engine::QueryContext::globalContext();
auto type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
FunctionExecutor executor("my_md5", {type}, type, context);
diff --git a/cpp-ch/local-engine/Rewriter/RelRewriter.h
b/cpp-ch/local-engine/Rewriter/RelRewriter.h
index 719ec4aff..2830a179b 100644
--- a/cpp-ch/local-engine/Rewriter/RelRewriter.h
+++ b/cpp-ch/local-engine/Rewriter/RelRewriter.h
@@ -16,32 +16,27 @@
*/
#pragma once
-#include <Parser/SerializedPlanParser.h>
-#include <substrait/algebra.pb.h>
-#include <substrait/type.pb.h>
-#include <Interpreters/Context_fwd.h>
-#include <Interpreters/Context.h>
#include <unordered_map>
-#include <Common/Exception.h>
-#include <set>
#include <Functions/SparkFunctionGetJsonObject.h>
+#include <Interpreters/Context_fwd.h>
+#include <Parser/SerializedPlanParser.h>
+#include <substrait/algebra.pb.h>
-#include <Poco/Logger.h>
-#include <Common/logger_useful.h>
namespace local_engine
{
class RelRewriter
{
public:
- RelRewriter(SerializedPlanParser *parser_) : parser(parser_) {}
+ RelRewriter(SerializedPlanParser * parser_) : parser(parser_) { }
virtual ~RelRewriter() = default;
virtual void rewrite(substrait::Rel & rel) = 0;
+
protected:
- SerializedPlanParser *parser;
+ SerializedPlanParser * parser;
- inline DB::ContextPtr getContext() { return parser->context; }
- inline std::unordered_map<std::string, std::string> & getFunctionMapping()
{ return parser->function_mapping; }
+ inline DB::ContextPtr getContext() const { return parser->context; }
+ inline std::unordered_map<std::string, std::string> & getFunctionMapping()
const { return parser->function_mapping; }
};
}
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index 79d640d3b..e4a8f86b0 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -22,18 +22,16 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
+#include <Interpreters/sortBlock.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
+#include <Processors/Transforms/SortingTransform.h>
#include <Shuffle/CachedShuffleWriter.h>
-#include <Storages/IO/AggregateSerializationUtils.h>
#include <Storages/IO/CompressedWriteBuffer.h>
+#include <Storages/IO/NativeWriter.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/CHUtil.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
-#include <Common/QueryContext.h>
-
-#include <Processors/Transforms/SortingTransform.h>
-#include <Storages/IO/NativeWriter.h>
namespace DB
@@ -324,7 +322,7 @@ PartitionWriter::PartitionWriter(CachedShuffleWriter *
shuffle_writer_, LoggerPt
partition_block_buffer[partition_id] =
std::make_shared<ColumnsBuffer>(options->split_size);
partition_buffer[partition_id] = std::make_shared<Partition>();
}
- settings =
MemoryConfig::loadFromContext(SerializedPlanParser::global_context);
+ settings = MemoryConfig::loadFromContext(QueryContext::globalContext());
}
size_t PartitionWriter::bytes() const
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
index 78eb845e1..15f8b5086 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.h
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.h
@@ -18,15 +18,14 @@
#include <cstddef>
#include <memory>
#include <vector>
-#include <Common/GlutenConfig.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Interpreters/TemporaryDataOnDisk.h>
-#include <Parser/SerializedPlanParser.h>
#include <Shuffle/CachedShuffleWriter.h>
#include <Shuffle/ShuffleCommon.h>
#include <jni/CelebornClient.h>
-
+#include <Common/GlutenConfig.h>
+#include <Common/QueryContext.h>
namespace DB
{
@@ -113,7 +112,7 @@ public:
protected:
String getNextSpillFile();
- std::vector<UInt64> mergeSpills(CachedShuffleWriter * shuffle_writer,
WriteBuffer & data_file, ExtraData extra_data = {});
+ std::vector<UInt64> mergeSpills(CachedShuffleWriter * shuffle_writer,
DB::WriteBuffer & data_file, ExtraData extra_data = {});
std::vector<SpillInfo> spill_infos;
private:
@@ -140,7 +139,7 @@ protected:
{
max_merge_block_size = options->split_size;
max_sort_buffer_size = options->max_sort_buffer_size;
- max_merge_block_bytes =
SerializedPlanParser::global_context->getSettingsRef().prefer_external_sort_block_bytes;
+ max_merge_block_bytes =
QueryContext::globalContext()->getSettingsRef().prefer_external_sort_block_bytes;
}
public:
String getName() const override { return "SortBasedPartitionWriter"; }
@@ -161,10 +160,10 @@ protected:
size_t max_merge_block_bytes = 0;
size_t current_accumulated_bytes = 0;
size_t current_accumulated_rows = 0;
- Chunks accumulated_blocks;
- Block output_header;
- Block sort_header;
- SortDescription sort_description;
+ DB::Chunks accumulated_blocks;
+ DB::Block output_header;
+ DB::Block sort_header;
+ DB::SortDescription sort_description;
};
class MemorySortLocalPartitionWriter : public SortBasedPartitionWriter, public
Spillable
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
index 272a6f2f6..755de6402 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
@@ -17,7 +17,6 @@
#include "SelectorBuilder.h"
#include <limits>
#include <memory>
-#include <Columns/ColumnArray.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeArray.h>
@@ -30,6 +29,7 @@
#include <Poco/MemoryStream.h>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
+#include <Common/QueryContext.h>
namespace DB
{
@@ -101,7 +101,7 @@ PartitionInfo HashSelectorBuilder::build(DB::Block & block)
if (!hash_function) [[unlikely]]
{
auto & factory = DB::FunctionFactory::instance();
- auto function = factory.get(hash_function_name,
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get(hash_function_name,
QueryContext::globalContext());
hash_function = function->build(args);
}
@@ -328,7 +328,7 @@ void RangeSelectorBuilder::initActionsDAG(const DB::Block &
block)
std::lock_guard lock(actions_dag_mutex);
if (has_init_actions_dag)
return;
- SerializedPlanParser
plan_parser(local_engine::SerializedPlanParser::global_context);
+ SerializedPlanParser plan_parser(QueryContext::globalContext());
plan_parser.parseExtensions(projection_plan_pb->extensions());
const auto & expressions =
projection_plan_pb->relations().at(0).root().input().project().expressions();
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index a2943f0b7..daa3c0e30 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -25,6 +25,7 @@
#include <Interpreters/Context.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <jni/jni_common.h>
@@ -65,7 +66,7 @@ CacheManager & CacheManager::instance()
return cache_manager;
}
-void CacheManager::initialize(DB::ContextMutablePtr context_)
+void CacheManager::initialize(const DB::ContextMutablePtr & context_)
{
auto & manager = instance();
manager.context = context_;
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index 2c1c01043..b59963ec4 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -39,7 +39,7 @@ public:
static void initJNI(JNIEnv * env);
static CacheManager & instance();
- static void initialize(DB::ContextMutablePtr context);
+ static void initialize(const DB::ContextMutablePtr & context);
JobId cacheParts(const MergeTreeTableInstance & table, const
std::unordered_set<String> & columns);
static jobject getCacheStatus(JNIEnv * env, const String & jobId);
diff --git a/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp
b/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp
index 6a43ad644..add3648f9 100644
--- a/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/JobScheduler.cpp
@@ -18,9 +18,9 @@
#include "JobScheduler.h"
+#include <Interpreters/Context.h>
#include <Common/GlutenConfig.h>
#include <Common/ThreadPool.h>
-#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
namespace DB
@@ -42,7 +42,7 @@ namespace local_engine
{
std::shared_ptr<JobScheduler> global_job_scheduler = nullptr;
-void JobScheduler::initialize(DB::ContextPtr context)
+void JobScheduler::initialize(const DB::ContextPtr & context)
{
auto config = GlutenJobSchedulerConfig::loadFromContext(context);
instance().thread_pool = std::make_unique<ThreadPool>(
diff --git a/cpp-ch/local-engine/Storages/Cache/JobScheduler.h
b/cpp-ch/local-engine/Storages/Cache/JobScheduler.h
index b5c2f601a..379d5cf5f 100644
--- a/cpp-ch/local-engine/Storages/Cache/JobScheduler.h
+++ b/cpp-ch/local-engine/Storages/Cache/JobScheduler.h
@@ -15,10 +15,10 @@
* limitations under the License.
*/
#pragma once
-#include <base/types.h>
-#include <Common/ThreadPool_fwd.h>
#include <Interpreters/Context_fwd.h>
+#include <base/types.h>
#include <Common/Stopwatch.h>
+#include <Common/ThreadPool_fwd.h>
namespace local_engine
{
@@ -108,7 +108,7 @@ public:
return global_job_scheduler;
}
- static void initialize(DB::ContextPtr context);
+ static void initialize(const DB::ContextPtr & context);
JobId scheduleJob(Job&& job);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
index 5cdeaf7a0..c9b56734a 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
@@ -15,12 +15,12 @@
* limitations under the License.
*/
#include "MetaDataHelper.h"
-
#include <filesystem>
#include <Core/Settings.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Storages/MergeTree/MergeSparkMergeTreeTask.h>
#include <Poco/StringTokenizer.h>
+#include <Common/QueryContext.h>
namespace CurrentMetrics
{
@@ -78,7 +78,7 @@ void restoreMetaData(const SparkStorageMergeTreePtr &
storage, const MergeTreeTa
return;
// Increase the speed of metadata recovery
- auto max_concurrency = std::max(10UL,
SerializedPlanParser::global_context->getSettingsRef().max_threads.value);
+ auto max_concurrency = std::max(10UL,
QueryContext::globalContext()->getSettingsRef().max_threads.value);
auto max_threads = std::min(max_concurrency, not_exists_part.size());
FreeThreadPool thread_pool(
CurrentMetrics::LocalThread,
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
index ed41633d3..55cffe04a 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.cpp
@@ -83,7 +83,7 @@ StorageMergeTreeFactory::getDataPartsByNames(const StorageID
& id, const String
{
DataPartsVector res;
auto table_name = getTableName(id, snapshot_id);
- auto config =
MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context);
+ auto config =
MergeTreeConfig::loadFromContext(QueryContext::globalContext());
std::lock_guard lock(datapart_mutex);
std::unordered_set<String> missing_names;
if (!datapart_map->has(table_name)) [[unlikely]]
diff --git a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
index 941819858..2c76fe51e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/StorageMergeTreeFactory.h
@@ -16,10 +16,11 @@
*/
#pragma once
#include <Interpreters/MergeTreeTransaction.h>
-#include <Parser/SerializedPlanParser.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
+#include <Storages/MergeTree/SparkStorageMergeTree.h>
#include <Poco/LRUCache.h>
#include <Common/GlutenConfig.h>
+#include <Common/QueryContext.h>
namespace local_engine
{
@@ -70,7 +71,7 @@ public:
static DataPartsVector getDataPartsByNames(const StorageID & id, const
String & snapshot_id, std::unordered_set<String> part_name);
static void init_cache_map()
{
- auto config =
MergeTreeConfig::loadFromContext(SerializedPlanParser::global_context);
+ auto config =
MergeTreeConfig::loadFromContext(QueryContext::globalContext());
auto & storage_map_v = storage_map;
if (!storage_map_v)
{
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
index 46edb7f30..632fb0a45 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
+++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
#include "FileWriterWrappers.h"
+#include <QueryPipeline/QueryPipeline.h>
namespace local_engine
{
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
index 736f5a95f..49383f8de 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
+++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
@@ -19,9 +19,9 @@
#include <Core/Block.h>
#include <Core/Field.h>
#include <Interpreters/Context.h>
-#include <Parser/SerializedPlanParser.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
+#include <Parsers/ASTLiteral.h>
#include <Processors/Chunk.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/ISimpleTransform.h>
@@ -69,31 +69,25 @@ private:
};
std::unique_ptr<FileWriterWrapper> createFileWriterWrapper(
- const DB::ContextPtr & context,
- const std::string & file_uri,
- const DB::Block & preferred_schema,
- const std::string & format_hint);
+ const DB::ContextPtr & context, const std::string & file_uri, const
DB::Block & preferred_schema, const std::string & format_hint);
OutputFormatFilePtr createOutputFormatFile(
- const DB::ContextPtr & context,
- const std::string & file_uri,
- const DB::Block & preferred_schema,
- const std::string & format_hint);
+ const DB::ContextPtr & context, const std::string & file_uri, const
DB::Block & preferred_schema, const std::string & format_hint);
class WriteStats : public DB::ISimpleTransform
{
bool all_chunks_processed_ = false; /// flag to determine if we have
already processed all chunks
- Arena partition_keys_arena_;
+ DB::Arena partition_keys_arena_;
std::string filename_;
absl::flat_hash_map<StringRef, size_t> fiel_to_count_;
- static Block statsHeader()
+ static DB::Block statsHeader()
{
return makeBlockHeader({{STRING(), "filename"}, {STRING(),
"partition_id"}, {BIGINT(), "record_count"}});
}
- Chunk final_result() const
+ DB::Chunk final_result() const
{
///TODO: improve performance
auto file_col = STRING()->createColumn();
@@ -115,7 +109,7 @@ class WriteStats : public DB::ISimpleTransform
}
public:
- explicit WriteStats(const Block & input_header_) :
ISimpleTransform(input_header_, statsHeader(), true) { }
+ explicit WriteStats(const DB::Block & input_header_) :
ISimpleTransform(input_header_, statsHeader(), true) { }
Status prepare() override
{
@@ -130,7 +124,7 @@ public:
}
String getName() const override { return "WriteStats"; }
- void transform(Chunk & chunk) override
+ void transform(DB::Chunk & chunk) override
{
if (all_chunks_processed_)
chunk = final_result();
@@ -159,11 +153,11 @@ public:
it->second += rows;
return;
}
- throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "File path {} not found
in the stats map", file_path);
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "File path {} not
found in the stats map", file_path);
}
};
-class SubstraitFileSink final : public SinkToStorage
+class SubstraitFileSink final : public DB::SinkToStorage
{
const std::string partition_id_;
const std::string relative_path_;
@@ -187,7 +181,7 @@ public:
const std::string & partition_id,
const std::string & relative,
const std::string & format_hint,
- const Block & header)
+ const DB::Block & header)
: SinkToStorage(header)
, partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id)
, relative_path_(relative)
@@ -205,7 +199,7 @@ public:
}
protected:
- void consume(Chunk & chunk) override
+ void consume(DB::Chunk & chunk) override
{
const size_t row_count = chunk.getNumRows();
output_format_->output->write(materializeBlock(getHeader().cloneWithColumns(chunk.detachColumns())));
@@ -227,12 +221,12 @@ class SubstraitPartitionedFileSink final : public
DB::PartitionedSink
public:
/// visible for UTs
- static ASTPtr make_partition_expression(const DB::Names &
partition_columns)
+ static DB::ASTPtr make_partition_expression(const DB::Names &
partition_columns)
{
/// Parse the following expression into ASTs
/// cancat('/col_name=', 'toString(col_name)')
bool add_slash = false;
- ASTs arguments;
+ DB::ASTs arguments;
for (const auto & column : partition_columns)
{
// partition_column=
@@ -243,7 +237,8 @@ public:
// ifNull(toString(partition_column), DEFAULT_PARTITION_NAME)
// FIXME if toString(partition_column) is empty
auto column_ast = std::make_shared<DB::ASTIdentifier>(column);
- ASTs if_null_args{makeASTFunction("toString", ASTs{column_ast}),
std::make_shared<DB::ASTLiteral>(DEFAULT_PARTITION_NAME)};
+ DB::ASTs if_null_args{
+ makeASTFunction("toString", DB::ASTs{column_ast}),
std::make_shared<DB::ASTLiteral>(DEFAULT_PARTITION_NAME)};
arguments.emplace_back(makeASTFunction("ifNull",
std::move(if_null_args)));
}
return DB::makeASTFunction("concat", std::move(arguments));
@@ -252,17 +247,17 @@ public:
private:
const std::string base_path_;
const std::string filenmame_;
- ContextPtr context_;
- const Block sample_block_;
+ DB::ContextPtr context_;
+ const DB::Block sample_block_;
const std::string format_hint_;
std::shared_ptr<WriteStats> stats_{nullptr};
public:
SubstraitPartitionedFileSink(
- const ContextPtr & context,
- const Names & partition_by,
- const Block & input_header,
- const Block & sample_block,
+ const DB::ContextPtr & context,
+ const DB::Names & partition_by,
+ const DB::Block & input_header,
+ const DB::Block & sample_block,
const std::string & base_path,
const std::string & filename,
const std::string & format_hint)
@@ -274,7 +269,7 @@ public:
, format_hint_(format_hint)
{
}
- SinkPtr createSinkForPartition(const String & partition_id) override
+ DB::SinkPtr createSinkForPartition(const String & partition_id) override
{
assert(stats_);
const auto partition_path = fmt::format("{}/{}", partition_id,
filenmame_);
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 5e862040c..15c721be3 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -211,7 +211,7 @@ JNIEXPORT void
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_i
JNIEnv * env, jclass, jbyteArray temp_path, jbyteArray filename)
{
LOCAL_ENGINE_JNI_METHOD_START
- const auto query_context =
local_engine::QueryContextManager::instance().currentQueryContext();
+ const auto query_context =
local_engine::QueryContext::instance().currentQueryContext();
const auto path_array = local_engine::getByteArrayElementsSafe(env,
temp_path);
const auto filename_array = local_engine::getByteArrayElementsSafe(env,
filename);
@@ -235,7 +235,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
jboolean materialize_input)
{
LOCAL_ENGINE_JNI_METHOD_START
- auto query_context =
local_engine::QueryContextManager::instance().currentQueryContext();
+ auto query_context =
local_engine::QueryContext::instance().currentQueryContext();
// by task update new configs ( in case of dynamic config update )
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env,
conf_plan);
@@ -864,7 +864,7 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
const auto file_uri = jstring2string(env, file_uri_);
// for HiveFileFormat, the file url may not end with .parquet, so we pass
in the format as a hint
const auto format_hint = jstring2string(env, format_hint_);
- const auto context =
local_engine::QueryContextManager::instance().currentQueryContext();
+ const auto context =
local_engine::QueryContext::instance().currentQueryContext();
auto * writer = local_engine::createFileWriterWrapper(context, file_uri,
preferred_schema, format_hint).release();
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
@@ -882,7 +882,7 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
- auto query_context =
local_engine::QueryContextManager::instance().currentQueryContext();
+ auto query_context =
local_engine::QueryContext::instance().currentQueryContext();
// by task update new configs ( in case of dynamic config update )
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env,
conf_plan);
const std::string::size_type conf_plan_size = conf_plan_a.length();
@@ -921,9 +921,9 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
auto read_ptr = local_engine::BinaryToMessage<substrait::Rel>(
{reinterpret_cast<const char *>(read_a.elems()),
static_cast<size_t>(read_a.length())});
- local_engine::SerializedPlanParser
parser(local_engine::SerializedPlanParser::global_context);
+ local_engine::SerializedPlanParser
parser(local_engine::QueryContext::globalContext());
parser.parseExtensions(plan_ptr.extensions());
- local_engine::MergeTreeRelParser mergeTreeParser(&parser,
local_engine::SerializedPlanParser::global_context);
+ local_engine::MergeTreeRelParser mergeTreeParser(&parser,
local_engine::QueryContext::globalContext());
auto res = mergeTreeParser.filterRangesOnDriver(read_ptr.read());
return local_engine::charTojstring(env, res.c_str());
@@ -996,7 +996,7 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
{reinterpret_cast<const char *>(split_info_a.elems()),
static_cast<size_t>(split_info_a.length())});
local_engine::MergeTreeTableInstance merge_tree_table(extension_table);
- auto context =
local_engine::QueryContextManager::instance().currentQueryContext();
+ auto context =
local_engine::QueryContext::instance().currentQueryContext();
// each task using its own CustomStorageMergeTree, don't reuse
auto temp_storage = merge_tree_table.copyToVirtualStorage(context);
// prefetch all needed parts metadata before merge
@@ -1218,7 +1218,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIEnv
* env, jclass, jobject input, jbyteArray plan)
{
LOCAL_ENGINE_JNI_METHOD_START
- const auto context =
DB::Context::createCopy(local_engine::SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(local_engine::QueryContext::globalContext());
local_engine::SerializedPlanParser parser(context);
const jobject iter = env->NewGlobalRef(input);
parser.addInputIter(iter, false);
@@ -1256,21 +1256,21 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_SimpleExpressionEval_nativeNex
JNIEXPORT jlong
Java_org_apache_gluten_memory_CHThreadGroup_createThreadGroup(JNIEnv * env,
jclass)
{
LOCAL_ENGINE_JNI_METHOD_START
- return local_engine::QueryContextManager::instance().initializeQuery();
+ return local_engine::QueryContext::instance().initializeQuery();
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
JNIEXPORT jlong
Java_org_apache_gluten_memory_CHThreadGroup_threadGroupPeakMemory(JNIEnv * env,
jclass, jlong id)
{
LOCAL_ENGINE_JNI_METHOD_START
- return local_engine::QueryContextManager::instance().currentPeakMemory(id);
+ return local_engine::QueryContext::instance().currentPeakMemory(id);
LOCAL_ENGINE_JNI_METHOD_END(env, 0l)
}
JNIEXPORT void
Java_org_apache_gluten_memory_CHThreadGroup_releaseThreadGroup(JNIEnv * env,
jclass, jlong id)
{
LOCAL_ENGINE_JNI_METHOD_START
- local_engine::QueryContextManager::instance().finalizeQuery(id);
+ local_engine::QueryContext::instance().finalizeQuery(id);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
diff --git a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
index 53155b749..4ef9b5771 100644
--- a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
@@ -15,13 +15,14 @@
* limitations under the License.
*/
-#include <Core/Block.h>
#include <Columns/IColumn.h>
-#include <DataTypes/IDataType.h>
+#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
+#include <DataTypes/DataTypeString.h>
+#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
-#include <Parser/SerializedPlanParser.h>
#include <benchmark/benchmark.h>
+#include <Common/QueryContext.h>
using namespace DB;
@@ -30,9 +31,7 @@ static Block createDataBlock(size_t rows)
auto type = DataTypeFactory::instance().get("Float64");
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
- {
column->insert(i * 1.0f);
- }
Block block;
block.insert(ColumnWithTypeAndName(std::move(column), type, "d"));
return std::move(block);
@@ -42,7 +41,7 @@ static void BM_CHCastFloatToInt(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("CAST",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("CAST",
local_engine::QueryContext::globalContext());
Block block = createDataBlock(30000000);
DB::ColumnsWithTypeAndName args;
args.emplace_back(block.getColumnsWithTypeAndName()[0]);
@@ -52,7 +51,7 @@ static void BM_CHCastFloatToInt(benchmark::State & state)
type_name_col.type = std::make_shared<DB::DataTypeString>();
args.emplace_back(type_name_col);
auto executable = function->build(args);
- for (auto _ : state)[[maybe_unused]]
+ for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
@@ -60,11 +59,11 @@ static void BM_SparkCastFloatToInt(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkCastFloatToInt64",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkCastFloatToInt64",
local_engine::QueryContext::globalContext());
Block block = createDataBlock(30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
- auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
+ for (auto _ : state) [[maybe_unused]]
+ auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
BENCHMARK(BM_CHCastFloatToInt)->Unit(benchmark::kMillisecond)->Iterations(100);
diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
index bc3e95c2d..a6e77e72f 100644
--- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
@@ -23,33 +23,25 @@
#include <Interpreters/Context.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
-#include <Interpreters/TreeRewriter.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
-#include <Processors/Formats/IOutputFormat.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
-#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Shuffle/ShuffleReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/MergeTree/SparkStorageMergeTree.h>
-#include <Storages/SelectQueryInfo.h>
-#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <benchmark/benchmark.h>
#include <substrait/plan.pb.h>
-#include <Poco/Util/MapConfiguration.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/PODArray_fwd.h>
-#include <Common/Stopwatch.h>
-#include <Common/logger_useful.h>
-#include "testConfig.h"
+#include <Common/QueryContext.h>
#if defined(__SSE2__)
#include <emmintrin.h>
@@ -90,7 +82,7 @@ DB::ContextMutablePtr global_context;
substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions
parquet_format;
file->mutable_parquet()->CopyFrom(parquet_format);
auto builder = std::make_unique<QueryPipelineBuilder>();
-
builder->init(Pipe(std::make_shared<SubstraitFileSource>(SerializedPlanParser::global_context,
header, files)));
+
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto executor = PullingPipelineExecutor(pipeline);
@@ -206,7 +198,7 @@ DB::ContextMutablePtr global_context;
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" +
std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
- local_engine::SerializedPlanParser
parser(SerializedPlanParser::global_context);
+ local_engine::SerializedPlanParser
parser(QueryContext::globalContext());
auto local_executor = parser.createExecutor(*plan);
state.ResumeTiming();
@@ -220,8 +212,8 @@ DB::ContextMutablePtr global_context;
[[maybe_unused]] static void BM_MERGE_TREE_TPCH_Q6_FROM_TEXT(benchmark::State
& state)
{
- SerializedPlanParser::global_context = global_context;
- local_engine::SerializedPlanParser
parser(SerializedPlanParser::global_context);
+ QueryContext::globalContext() = global_context;
+ local_engine::SerializedPlanParser parser(QueryContext::globalContext());
for (auto _ : state)
{
state.PauseTiming();
@@ -270,7 +262,7 @@ DB::ContextMutablePtr global_context;
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" +
std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
- local_engine::SerializedPlanParser
parser(SerializedPlanParser::global_context);
+ local_engine::SerializedPlanParser
parser(QueryContext::globalContext());
auto local_executor = parser.createExecutor(*plan);
state.ResumeTiming();
@@ -306,7 +298,7 @@ DB::ContextMutablePtr global_context;
std::move(schema))
.build();
- local_engine::SerializedPlanParser
parser(SerializedPlanParser::global_context);
+ local_engine::SerializedPlanParser
parser(QueryContext::globalContext());
auto local_executor = parser.createExecutor(*plan);
local_engine::SparkRowToCHColumn converter;
while (local_executor->hasNext())
@@ -351,7 +343,7 @@ DB::ContextMutablePtr global_context;
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" +
std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
- local_engine::SerializedPlanParser
parser(SerializedPlanParser::global_context);
+ local_engine::SerializedPlanParser
parser(QueryContext::globalContext());
auto local_executor = parser.createExecutor(*plan);
local_engine::SparkRowToCHColumn converter;
while (local_executor->hasNext())
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index 5cfe51389..8ed7c1084 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -18,7 +18,6 @@
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeString.h>
#include <IO/ReadBufferFromFile.h>
-#include <Parser/SerializedPlanParser.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
@@ -31,6 +30,7 @@
#include <tests/gluten_test_util.h>
#include <Poco/Util/MapConfiguration.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
namespace
{
@@ -124,7 +124,7 @@ void BM_OptimizedParquetReadString(benchmark::State & state)
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(
-
Pipe(std::make_shared<local_engine::SubstraitFileSource>(local_engine::SerializedPlanParser::global_context,
header, files)));
+
Pipe(std::make_shared<local_engine::SubstraitFileSource>(local_engine::QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto reader = PullingPipelineExecutor(pipeline);
while (reader.pull(res))
@@ -156,7 +156,7 @@ void BM_OptimizedParquetReadDate32(benchmark::State & state)
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(
-
Pipe(std::make_shared<local_engine::SubstraitFileSource>(local_engine::SerializedPlanParser::global_context,
header, files)));
+
Pipe(std::make_shared<local_engine::SubstraitFileSource>(local_engine::QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto reader = PullingPipelineExecutor(pipeline);
while (reader.pull(res))
@@ -178,7 +178,7 @@ substrait::ReadRel::LocalFiles createLocalFiles(const
std::string & filename, co
auto config = Poco::AutoPtr(new Poco::Util::MapConfiguration());
config->setBool("use_local_format", use_local_format);
- local_engine::SerializedPlanParser::global_context->setConfig(config);
+ local_engine::QueryContext::globalMutableContext()->setConfig(config);
return files;
}
@@ -186,9 +186,8 @@ substrait::ReadRel::LocalFiles createLocalFiles(const
std::string & filename, co
void doRead(const substrait::ReadRel::LocalFiles & files, const
std::optional<DB::ActionsDAG> & pushDown, const DB::Block & header)
{
const auto builder = std::make_unique<DB::QueryPipelineBuilder>();
- const auto source
- =
std::make_shared<local_engine::SubstraitFileSource>(local_engine::SerializedPlanParser::global_context,
header, files);
- source->setKeyCondition(pushDown,
local_engine::SerializedPlanParser::global_context);
+ const auto source =
std::make_shared<local_engine::SubstraitFileSource>(local_engine::QueryContext::globalContext(),
header, files);
+ source->setKeyCondition(pushDown,
local_engine::QueryContext::globalContext());
builder->init(DB::Pipe(source));
auto pipeline = DB::QueryPipelineBuilder::getPipeline(std::move(*builder));
auto reader = DB::PullingPipelineExecutor(pipeline);
@@ -220,7 +219,7 @@ void
BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
for (auto _ : state)
doRead(files, pushDown, header);
-
local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new
Poco::Util::MapConfiguration()));
+
local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new
Poco::Util::MapConfiguration()));
}
void BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
@@ -237,7 +236,7 @@ void
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
for (auto _ : state)
doRead(files, pushDown, header);
-
local_engine::SerializedPlanParser::global_context->setConfig(Poco::AutoPtr(new
Poco::Util::MapConfiguration()));
+
local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new
Poco::Util::MapConfiguration()));
}
}
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp
b/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp
index 2a824174b..b6aac2445 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_divide_function.cpp
@@ -21,10 +21,11 @@
#include <Functions/FunctionFactory.h>
#include <Functions/SparkFunctionDivide.h>
#include <Interpreters/ActionsDAG.h>
-#include <Parser/SerializedPlanParser.h>
+#include <Interpreters/ExpressionActions.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/BlocksSource.h>
#include <benchmark/benchmark.h>
+#include <Common/QueryContext.h>
using namespace DB;
@@ -63,7 +64,7 @@ static std::string join(const ActionsDAG::NodeRawConstPtrs &
v, char c)
static const ActionsDAG::Node *
addFunction(ActionsDAG & actions_dag, const String & function, const
DB::ActionsDAG::NodeRawConstPtrs & args)
{
- auto function_builder = FunctionFactory::instance().get(function,
local_engine::SerializedPlanParser::global_context);
+ auto function_builder = FunctionFactory::instance().get(function,
local_engine::QueryContext::globalContext());
std::string args_name = join(args, ',');
auto result_name = function + "(" + args_name + ")";
return &actions_dag.addFunction(function_builder, args, result_name);
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
index 601355241..ef961f21c 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
@@ -26,8 +26,8 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsRound.h>
#include <Functions/SparkFunctionFloor.h>
-#include <Parser/SerializedPlanParser.h>
#include <benchmark/benchmark.h>
+#include <Common/QueryContext.h>
#include <Common/TargetSpecific.h>
using namespace DB;
@@ -61,7 +61,7 @@ static void BM_CHFloorFunction_For_Int64(benchmark::State &
state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("floor",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("floor",
local_engine::QueryContext::globalContext());
Block int64_block = createDataBlock("Nullable(Int64)", 65536);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state)
@@ -75,7 +75,7 @@ static void BM_CHFloorFunction_For_Float64(benchmark::State &
state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("floor",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("floor",
local_engine::QueryContext::globalContext());
Block float64_block = createDataBlock("Nullable(Float64)", 65536);
auto executable =
function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state)
@@ -89,7 +89,7 @@ static void BM_SparkFloorFunction_For_Int64(benchmark::State
& state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkFloor",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkFloor",
local_engine::QueryContext::globalContext());
Block int64_block = createDataBlock("Nullable(Int64)", 65536);
auto executable = function->build(int64_block.getColumnsWithTypeAndName());
for (auto _ : state)
@@ -103,7 +103,7 @@ static void
BM_SparkFloorFunction_For_Float64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkFloor",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkFloor",
local_engine::QueryContext::globalContext());
Block float64_block = createDataBlock("Nullable(Float64)", 65536);
auto executable =
function->build(float64_block.getColumnsWithTypeAndName());
for (auto _ : state)
@@ -118,7 +118,8 @@ static void nanInfToNullAutoOpt(float * data, uint8_t *
null_map, size_t size)
for (size_t i = 0; i < size; ++i)
{
uint8_t is_nan = (data[i] != data[i]);
- uint8_t is_inf = ((*reinterpret_cast<const uint32_t *>(&data[i]) &
0b01111111111111111111111111111111) == 0b01111111100000000000000000000000);
+ uint8_t is_inf
+ = ((*reinterpret_cast<const uint32_t *>(&data[i]) &
0b01111111111111111111111111111111) == 0b01111111100000000000000000000000);
uint8_t null_flag = is_nan | is_inf;
null_map[i] = null_flag;
@@ -171,8 +172,7 @@ DECLARE_AVX2_SPECIFIC_CODE(
mask >>= 1;
}
}
- }
-)
+ })
static void BMNanInfToNullAVX2(benchmark::State & state)
{
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
index 3169df2e3..28e11a7ba 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_row.cpp
@@ -14,6 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <string>
+#include <vector>
#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/ReadBufferFromFile.h>
@@ -27,9 +29,6 @@
#include <benchmark/benchmark.h>
#include <parquet/arrow/reader.h>
-#include <string>
-#include <vector>
-
using namespace DB;
using namespace local_engine;
diff --git a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
index 9c04b7e31..c72125163 100644
--- a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-#include <Core/Block.h>
#include <Columns/IColumn.h>
-#include <DataTypes/IDataType.h>
+#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
+#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
-#include <Parser/SerializedPlanParser.h>
#include <Parser/FunctionParser.h>
#include <benchmark/benchmark.h>
+#include <Common/QueryContext.h>
using namespace DB;
@@ -31,9 +31,7 @@ static Block createDataBlock(size_t rows)
auto type = DataTypeFactory::instance().get("String");
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
- {
column->insert("2024-01-05 12:12:12");
- }
Block block;
block.insert(ColumnWithTypeAndName(std::move(column), type, "d"));
return std::move(block);
@@ -43,10 +41,10 @@ static void BM_CHParseDateTime64(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("toDateTime64OrNull",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("toDateTime64OrNull",
local_engine::QueryContext::globalContext());
Block block = createDataBlock(30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
+ for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
@@ -55,11 +53,11 @@ static void BM_SparkParseDateTime64(benchmark::State &
state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkToDateTime",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkToDateTime",
local_engine::QueryContext::globalContext());
Block block = createDataBlock(30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
- auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
+ for (auto _ : state) [[maybe_unused]]
+ auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
BENCHMARK(BM_CHParseDateTime64)->Unit(benchmark::kMillisecond)->Iterations(50);
diff --git a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
index 3f053dcee..e7abfda7a 100644
--- a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-#include <Core/Block.h>
#include <Columns/IColumn.h>
-#include <DataTypes/IDataType.h>
+#include <Core/Block.h>
#include <DataTypes/DataTypeFactory.h>
+#include <DataTypes/IDataType.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsRound.h>
-#include <Parser/SerializedPlanParser.h>
#include <Parser/FunctionParser.h>
#include <benchmark/benchmark.h>
+#include <Common/QueryContext.h>
using namespace DB;
@@ -32,16 +32,10 @@ static Block createDataBlock(String type_str, size_t rows)
auto type = DataTypeFactory::instance().get(type_str);
auto column = type->createColumn();
for (size_t i = 0; i < rows; ++i)
- {
if (type_str == "Date32")
- {
column->insert(i);
- }
else if (type_str == "Date")
- {
column->insert(i);
- }
- }
Block block;
block.insert(ColumnWithTypeAndName(std::move(column), type, "d"));
return std::move(block);
@@ -51,10 +45,10 @@ static void BM_CHUnixTimestamp_For_Date32(benchmark::State
& state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("toUnixTimestamp",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("toUnixTimestamp",
local_engine::QueryContext::globalContext());
Block block = createDataBlock("Date32", 30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
+ for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
@@ -62,10 +56,10 @@ static void BM_CHUnixTimestamp_For_Date(benchmark::State &
state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("toUnixTimestamp",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("toUnixTimestamp",
local_engine::QueryContext::globalContext());
Block block = createDataBlock("Date", 30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
+ for (auto _ : state) [[maybe_unused]]
auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
@@ -73,22 +67,22 @@ static void
BM_SparkUnixTimestamp_For_Date32(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkDateToUnixTimestamp",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkDateToUnixTimestamp",
local_engine::QueryContext::globalContext());
Block block = createDataBlock("Date32", 30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
- auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
+ for (auto _ : state) [[maybe_unused]]
+ auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
static void BM_SparkUnixTimestamp_For_Date(benchmark::State & state)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("sparkDateToUnixTimestamp",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("sparkDateToUnixTimestamp",
local_engine::QueryContext::globalContext());
Block block = createDataBlock("Date", 30000000);
auto executable = function->build(block.getColumnsWithTypeAndName());
- for (auto _ : state)[[maybe_unused]]
- auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
+ for (auto _ : state) [[maybe_unused]]
+ auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
}
BENCHMARK(BM_CHUnixTimestamp_For_Date32)->Unit(benchmark::kMillisecond)->Iterations(100);
diff --git a/cpp-ch/local-engine/tests/gluten_test_util.cpp
b/cpp-ch/local-engine/tests/gluten_test_util.cpp
index 7dbc7206d..66ce81a50 100644
--- a/cpp-ch/local-engine/tests/gluten_test_util.cpp
+++ b/cpp-ch/local-engine/tests/gluten_test_util.cpp
@@ -17,7 +17,6 @@
#include "gluten_test_util.h"
#include <filesystem>
#include <sstream>
-
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
@@ -31,6 +30,7 @@
#include <substrait/plan.pb.h>
#include <Common/BlockTypeUtils.h>
#include <Common/Exception.h>
+#include <Common/QueryContext.h>
namespace fs = std::filesystem;
@@ -60,7 +60,7 @@ std::optional<ActionsDAG> parseFilter(const std::string &
filter, const AnotherR
const ASTPtr ast_exp = parseQuery(parser2, filter.data(), filter.data() +
filter.size(), "", 0, 0, 0);
const auto prepared_sets = std::make_shared<PreparedSets>();
ActionsMatcher::Data visitor_data(
- SerializedPlanParser::global_context,
+ QueryContext::globalContext(),
size_limits_for_set,
static_cast<size_t>(0),
name_and_types,
@@ -78,7 +78,7 @@ std::pair<substrait::Plan, std::unique_ptr<LocalExecutor>>
create_plan_and_execu
std::string_view json_plan, std::string_view split_template,
std::string_view file, const std::optional<DB::ContextPtr> & context)
{
const std::string split = replaceLocalFilesWildcards(split_template, file);
- SerializedPlanParser
parser(context.value_or(SerializedPlanParser::global_context));
+ SerializedPlanParser
parser(context.value_or(QueryContext::globalContext()));
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
const auto plan =
local_engine::JsonStringToMessage<substrait::Plan>(json_plan);
return {plan, parser.createExecutor(plan)};
diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h
b/cpp-ch/local-engine/tests/gluten_test_util.h
index 9f7380cf5..a9d8af37b 100644
--- a/cpp-ch/local-engine/tests/gluten_test_util.h
+++ b/cpp-ch/local-engine/tests/gluten_test_util.h
@@ -23,10 +23,13 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/ActionsDAG.h>
-#include <Parser/SerializedPlanParser.h>
#include <boost/algorithm/string/replace.hpp>
#include <parquet/schema.h>
+namespace substrait
+{
+class Plan;
+}
namespace local_engine
{
class LocalExecutor;
diff --git a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
index 613beb9b8..e905bc178 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
@@ -15,18 +15,19 @@
* limitations under the License.
*/
#include <Columns/ColumnSet.h>
+#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeSet.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Set.h>
-#include <Parser/SerializedPlanParser.h>
#include <gtest/gtest.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
TEST(TestFuntion, Hash)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("murmurHash2_64",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("murmurHash2_64",
local_engine::QueryContext::globalContext());
auto type0 = DataTypeFactory::instance().get("String");
auto column0 = type0->createColumn();
column0->insert("A");
@@ -56,7 +57,7 @@ TEST(TestFunction, In)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("in",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("in",
local_engine::QueryContext::globalContext());
auto type0 = DataTypeFactory::instance().get("String");
auto type_set = std::make_shared<DataTypeSet>();
@@ -83,8 +84,7 @@ TEST(TestFunction, In)
auto arg = ColumnSet::create(4, future_set);
ColumnsWithTypeAndName columns
- = {ColumnWithTypeAndName(std::move(column1), type0, "string0"),
- ColumnWithTypeAndName(std::move(arg), type_set, "__set")};
+ = {ColumnWithTypeAndName(std::move(column1), type0, "string0"),
ColumnWithTypeAndName(std::move(arg), type_set, "__set")};
Block block(columns);
std::cerr << "input:\n";
debug::headBlock(block);
@@ -100,7 +100,7 @@ TEST(TestFunction, NotIn1)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("notIn",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("notIn",
local_engine::QueryContext::globalContext());
auto type0 = DataTypeFactory::instance().get("String");
auto type_set = std::make_shared<DataTypeSet>();
@@ -125,7 +125,7 @@ TEST(TestFunction, NotIn1)
auto future_set = std::make_shared<FutureSetFromStorage>(std::move(set));
//TODO: WHY? after https://github.com/ClickHouse/ClickHouse/pull/63723 we
need pass 4 instead of 1
- auto arg = ColumnSet::create(4,future_set);
+ auto arg = ColumnSet::create(4, future_set);
ColumnsWithTypeAndName columns
= {ColumnWithTypeAndName(std::move(column1), type0, "string0"),
ColumnWithTypeAndName(std::move(arg), type_set, "__set")};
@@ -143,7 +143,7 @@ TEST(TestFunction, NotIn2)
{
using namespace DB;
auto & factory = FunctionFactory::instance();
- auto function = factory.get("in",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("in",
local_engine::QueryContext::globalContext());
auto type0 = DataTypeFactory::instance().get("String");
auto type_set = std::make_shared<DataTypeSet>();
@@ -168,7 +168,7 @@ TEST(TestFunction, NotIn2)
auto future_set = std::make_shared<FutureSetFromStorage>(std::move(set));
//TODO: WHY? after https://github.com/ClickHouse/ClickHouse/pull/63723 we
need pass 4 instead of 1
- auto arg = ColumnSet::create(4,future_set);
+ auto arg = ColumnSet::create(4, future_set);
ColumnsWithTypeAndName columns
= {ColumnWithTypeAndName(std::move(column1), type0, "string0"),
ColumnWithTypeAndName(std::move(arg), type_set, "__set")};
@@ -178,7 +178,7 @@ TEST(TestFunction, NotIn2)
auto executable = function->build(block.getColumnsWithTypeAndName());
auto result = executable->execute(block.getColumnsWithTypeAndName(),
executable->getResultType(), block.rows());
- auto function_not = factory.get("not",
local_engine::SerializedPlanParser::global_context);
+ auto function_not = factory.get("not",
local_engine::QueryContext::globalContext());
auto type_bool = DataTypeFactory::instance().get("UInt8");
ColumnsWithTypeAndName columns2 = {ColumnWithTypeAndName(result,
type_bool, "string0")};
Block block2(columns2);
diff --git a/cpp-ch/local-engine/tests/gtest_ch_join.cpp
b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
index 67775dd1a..af661c297 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_join.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_join.cpp
@@ -14,37 +14,37 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+#include <Core/Settings.h>
+#include <DataTypes/DataTypeFactory.h>
#include <Functions/FunctionFactory.h>
+#include <Interpreters/HashJoin/HashJoin.h>
+#include <Interpreters/TableJoin.h>
#include <Join/StorageJoinFromReadBuffer.h>
-#include <Parser/SerializedPlanParser.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Executors/PipelineExecutor.h>
+#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
-
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <gtest/gtest.h>
#include <Common/DebugUtils.h>
-
-#include <Core/Settings.h>
-#include <Interpreters/HashJoin/HashJoin.h>
-#include <Interpreters/TableJoin.h>
-#include <substrait/plan.pb.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
TEST(TestJoin, simple)
{
- auto global_context = SerializedPlanParser::global_context;
-
local_engine::SerializedPlanParser::global_context->setSetting("join_use_nulls",
true);
+ auto global_context = local_engine::QueryContext::globalContext();
+
local_engine::QueryContext::globalMutableContext()->setSetting("join_use_nulls",
true);
auto & factory = DB::FunctionFactory::instance();
- auto function = factory.get("murmurHash2_64",
local_engine::SerializedPlanParser::global_context);
+ auto function = factory.get("murmurHash2_64", global_context);
auto int_type = DataTypeFactory::instance().get("Int32");
auto column0 = int_type->createColumn();
column0->insert(1);
diff --git a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp
b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp
index a32e7c476..a3454f9b0 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_storages.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_storages.cpp
@@ -16,7 +16,6 @@
*/
#include <Functions/FunctionFactory.h>
#include <Parser/MergeTreeRelParser.h>
-#include <Parser/SerializedPlanParser.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
@@ -26,6 +25,7 @@
#include <gtest/gtest.h>
#include <substrait/plan.pb.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
using namespace DB;
using namespace local_engine;
@@ -33,7 +33,7 @@ using namespace local_engine;
TEST(TestBatchParquetFileSource, blob)
{
GTEST_SKIP();
- auto config = local_engine::SerializedPlanParser::config;
+ Context::ConfigurationPtr config;
config->setString("blob.storage_account_url",
"http://127.0.0.1:10000/devstoreaccount1");
config->setString("blob.container_name", "libch");
config->setString("blob.container_already_exists", "true");
@@ -79,7 +79,7 @@ TEST(TestBatchParquetFileSource, blob)
columns.emplace_back(std::move(col));
}
auto header = Block(std::move(columns));
-
builder->init(Pipe(std::make_shared<local_engine::SubstraitFileSource>(SerializedPlanParser::global_context,
header, files)));
+
builder->init(Pipe(std::make_shared<local_engine::SubstraitFileSource>(QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto executor = PullingPipelineExecutor(pipeline);
@@ -101,7 +101,7 @@ TEST(TestBatchParquetFileSource, blob)
TEST(TestBatchParquetFileSource, s3)
{
GTEST_SKIP();
- auto config = local_engine::SerializedPlanParser::config;
+ Context::ConfigurationPtr config;
config->setString("s3.endpoint", "http://localhost:9000/tpch/");
config->setString("s3.region", "us-east-1");
config->setString("s3.access_key_id", "admin");
@@ -143,7 +143,7 @@ TEST(TestBatchParquetFileSource, s3)
columns.emplace_back(std::move(col));
}
auto header = Block(std::move(columns));
-
builder->init(Pipe(std::make_shared<SubstraitFileSource>(SerializedPlanParser::global_context,
header, files)));
+
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto executor = PullingPipelineExecutor(pipeline);
@@ -210,7 +210,7 @@ TEST(TestBatchParquetFileSource, local_file)
columns.emplace_back(std::move(col));
}
auto header = Block(std::move(columns));
-
builder->init(Pipe(std::make_shared<SubstraitFileSource>(SerializedPlanParser::global_context,
header, files)));
+
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(),
header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto executor = PullingPipelineExecutor(pipeline);
@@ -260,11 +260,11 @@ TEST(TestPrewhere, OptimizePrewhereCondition)
}
Block block(std::move(columns));
- ContextPtr context = SerializedPlanParser::global_context;
+ ContextPtr context = QueryContext::globalContext();
SerializedPlanParser * parser = new SerializedPlanParser(context);
parser->parseExtensions(plan_ptr->extensions());
- MergeTreeRelParser mergeTreeParser(parser,
SerializedPlanParser::global_context);
+ MergeTreeRelParser mergeTreeParser(parser, QueryContext::globalContext());
mergeTreeParser.column_sizes["l_discount"] = 0;
mergeTreeParser.column_sizes["l_quantity"] = 1;
diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
index 1e2525b33..012b4ebdd 100644
--- a/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
+++ b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
@@ -17,11 +17,12 @@
#include <gluten_test_util.h>
#include <incbin.h>
#include <Core/Settings.h>
+#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <gtest/gtest.h>
#include <Common/DebugUtils.h>
-
+#include <Common/QueryContext.h>
using namespace local_engine;
@@ -31,7 +32,7 @@ using namespace DB;
INCBIN(_pr_54881_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/clickhouse_pr_54881.json");
TEST(Clickhouse, PR54881)
{
- const auto context1 =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context1 =
DB::Context::createCopy(QueryContext::globalContext());
// context1->setSetting("enable_named_columns_in_function_tuple",
DB::Field(true));
auto settings = context1->getSettingsRef();
EXPECT_FALSE(settings.enable_named_columns_in_function_tuple) << "GLUTEN
NEED set enable_named_columns_in_function_tuple to false";
@@ -81,7 +82,7 @@ INCBIN(_pr_65234_, SOURCE_DIR
"/utils/extern-local-engine/tests/json/clickhouse_
TEST(Clickhouse, PR65234)
{
const std::string split =
R"({"items":[{"uriFile":"file:///foo","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
- SerializedPlanParser parser(SerializedPlanParser::global_context);
+ SerializedPlanParser parser(QueryContext::globalContext());
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
const auto plan =
local_engine::JsonStringToMessage<substrait::Plan>(EMBEDDED_PLAN(_pr_65234_));
auto query_plan = parser.parse(plan);
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
index 45aaf3db6..e42a2a89a 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
@@ -18,17 +18,16 @@
#include "config.h"
#if USE_PARQUET
+#include <charconv>
#include <ranges>
#include <string>
-#include <charconv>
+#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Interpreters/ActionsVisitor.h>
+#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Parser/SerializedPlanParser.h>
#include <Parsers/ExpressionListParsers.h>
-
-#include <Core/Settings.h>
-#include <Common/BlockTypeUtils.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Storages/Parquet/ArrowUtils.h>
#include <Storages/Parquet/ColumnIndexFilter.h>
@@ -40,6 +39,8 @@
#include <parquet/schema.h>
#include <parquet/statistics.h>
#include <tests/gluten_test_util.h>
+#include <Common/BlockTypeUtils.h>
+#include <Common/QueryContext.h>
namespace DB::ErrorCodes
{
@@ -359,7 +360,7 @@ void testCondition(const std::string & exp, const
std::vector<size_t> & expected
static const AnotherRowType name_and_types = buildTestRowType();
static const local_engine::ColumnIndexStore column_index_store =
buildTestColumnIndexStore();
const local_engine::ColumnIndexFilter filter(
- local_engine::test::parseFilter(exp, name_and_types).value(),
local_engine::SerializedPlanParser::global_context);
+ local_engine::test::parseFilter(exp, name_and_types).value(),
local_engine::QueryContext::globalContext());
assertRows(filter.calculateRowRanges(column_index_store, TOTALSIZE),
expectedRows);
}
@@ -470,7 +471,6 @@ TEST(ColumnIndex, FilteringWithAllNullPages)
}
TEST(ColumnIndex, FilteringWithNotFoundColumnName)
{
-
using namespace test_utils;
using namespace local_engine;
const local_engine::ColumnIndexStore column_index_store =
buildTestColumnIndexStore();
@@ -480,7 +480,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
const AnotherRowType upper_name_and_types{{"COLUMN5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_upper(
local_engine::test::parseFilter("COLUMN5 in (7, 20)",
upper_name_and_types).value(),
- local_engine::SerializedPlanParser::global_context);
+ local_engine::QueryContext::globalContext());
assertRows(
filter_upper.calculateRowRanges(column_index_store, TOTALSIZE),
std::vector(boost::counting_iterator<size_t>(0),
boost::counting_iterator<size_t>(TOTALSIZE)));
@@ -490,7 +490,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
const AnotherRowType lower_name_and_types{{"column5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_lower(
local_engine::test::parseFilter("column5 in (7, 20)",
lower_name_and_types).value(),
- local_engine::SerializedPlanParser::global_context);
+ local_engine::QueryContext::globalContext());
assertRows(filter_lower.calculateRowRanges(column_index_store,
TOTALSIZE), {});
}
}
@@ -1053,7 +1053,7 @@ TEST(ColumnIndex, VectorizedParquetRecordReader)
static const AnotherRowType name_and_types{{"11", BIGINT()}};
const auto filterAction = local_engine::test::parseFilter("`11` = 10 or
`11` = 50", name_and_types);
auto column_index_filter
- =
std::make_shared<local_engine::ColumnIndexFilter>(filterAction.value(),
local_engine::SerializedPlanParser::global_context);
+ =
std::make_shared<local_engine::ColumnIndexFilter>(filterAction.value(),
local_engine::QueryContext::globalContext());
Block blockHeader({{BIGINT(), "11"}, {STRING(), "18"}});
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp
index 4436bf0cd..5e2be6552 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp
@@ -17,12 +17,13 @@
#include <gluten_test_util.h>
#include <incbin.h>
#include <Core/Settings.h>
+#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <gtest/gtest.h>
#include <Common/DebugUtils.h>
#include <Common/GlutenConfig.h>
-
+#include <Common/QueryContext.h>
using namespace local_engine;
@@ -32,7 +33,7 @@ INCBIN(_pr_18_2, SOURCE_DIR
"/utils/extern-local-engine/tests/decimal_filter_pus
TEST(ColumnIndex, Decimal182)
{
// [precision,scale] = [18,2]
- const auto context1 =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context1 =
DB::Context::createCopy(QueryContext::globalMutableContext());
const auto config = ExecutorConfig::loadFromContext(context1);
EXPECT_TRUE(config.use_local_format) << "gtest need set use_local_format
to true";
diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp
b/cpp-ch/local-engine/tests/gtest_parser.cpp
index bf52bd54c..5f8d482d5 100644
--- a/cpp-ch/local-engine/tests/gtest_parser.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parser.cpp
@@ -18,6 +18,7 @@
#include <incbin.h>
#include <testConfig.h>
#include <Core/Settings.h>
+#include <Interpreters/Context.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
@@ -26,7 +27,7 @@
#include <gtest/gtest.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
-
+#include <Common/QueryContext.h>
using namespace local_engine;
using namespace DB;
@@ -39,7 +40,7 @@ TEST(LocalExecutor, ReadCSV)
=
R"({"items":[{"uriFile":"{replace_local_files}","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})";
const std::string split = replaceLocalFilesWildcards(
split_template,
GLUTEN_SOURCE_DIR("/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv"));
- SerializedPlanParser parser(SerializedPlanParser::global_context);
+ SerializedPlanParser parser(QueryContext::globalContext());
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));
auto plan =
local_engine::JsonStringToMessage<substrait::Plan>(EMBEDDED_PLAN(_readcsv_plan));
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
index 69b6fa0f6..3db6f2fd4 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
@@ -17,10 +17,10 @@
#include <gluten_test_util.h>
#include <incbin.h>
-
#include <testConfig.h>
#include <Core/Settings.h>
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
+#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/Squashing.h>
#include <Parser/SerializedPlanParser.h>
@@ -42,6 +42,7 @@
#include <gtest/gtest.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
+#include <Common/QueryContext.h>
using namespace local_engine;
using namespace DB;
@@ -66,7 +67,7 @@ Chunk testChunk()
TEST(LocalExecutor, StorageObjectStorageSink)
{
/// 0. Create ObjectStorage for HDFS
- auto settings = SerializedPlanParser::global_context->getSettingsRef();
+ auto settings = QueryContext::globalContext()->getSettingsRef();
const std::string query
= R"(CREATE TABLE hdfs_engine_xxxx (name String, value UInt32)
ENGINE=HDFS('hdfs://localhost:8020/clickhouse/test2', 'Parquet'))";
DB::ParserCreateQuery parser;
@@ -90,10 +91,10 @@ TEST(LocalExecutor, StorageObjectStorageSink)
EXPECT_TRUE(func && func->name == "HDFS");
DB::StorageHDFSConfiguration config;
- StorageObjectStorage::Configuration::initialize(config,
arg->children[0]->children, SerializedPlanParser::global_context, false);
+ StorageObjectStorage::Configuration::initialize(config,
arg->children[0]->children, QueryContext::globalContext(), false);
const std::shared_ptr<DB::HDFSObjectStorage> object_storage
- =
std::dynamic_pointer_cast<DB::HDFSObjectStorage>(config.createObjectStorage(SerializedPlanParser::global_context,
false));
+ =
std::dynamic_pointer_cast<DB::HDFSObjectStorage>(config.createObjectStorage(QueryContext::globalContext(),
false));
EXPECT_TRUE(object_storage != nullptr);
RelativePathsWithMetadata files_with_metadata;
@@ -101,7 +102,7 @@ TEST(LocalExecutor, StorageObjectStorageSink)
/// 1. Create ObjectStorageSink
DB::StorageObjectStorageSink sink{
- object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(),
"value"}}, SerializedPlanParser::global_context, ""};
+ object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(),
"value"}}, QueryContext::globalContext(), ""};
/// 2. Create Chunk
auto chunk = testChunk();
@@ -114,7 +115,7 @@ TEST(LocalExecutor, StorageObjectStorageSink)
INCBIN(native_write, SOURCE_DIR
"/utils/extern-local-engine/tests/json/native_write_plan.json");
TEST(WritePipeline, SubstraitFileSink)
{
- const auto context =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(QueryContext::globalContext());
GlutenWriteSettings settings{
.task_write_tmp_dir = "file:///tmp/test_table/test",
.task_write_filename = "data.parquet",
@@ -170,7 +171,7 @@ INCBIN(native_write_one_partition, SOURCE_DIR
"/utils/extern-local-engine/tests/
TEST(WritePipeline, SubstraitPartitionedFileSink)
{
- const auto context =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(QueryContext::globalContext());
GlutenWriteSettings settings{
.task_write_tmp_dir = "file:///tmp/test_table/test_partition",
.task_write_filename = "data.parquet",
@@ -221,7 +222,7 @@ TEST(WritePipeline, SubstraitPartitionedFileSink)
TEST(WritePipeline, ComputePartitionedExpression)
{
- const auto context =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(QueryContext::globalContext());
auto partition_by =
SubstraitPartitionedFileSink::make_partition_expression({"s_nationkey",
"name"});
@@ -299,7 +300,7 @@ TEST(WritePipeline, MergeTree)
{
ThreadStatus thread_status;
- const auto context =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(QueryContext::globalContext());
context->setPath("./");
const Settings & settings = context->getSettingsRef();
@@ -382,7 +383,7 @@ TEST(WritePipeline, SparkMergeTree)
{
ThreadStatus thread_status;
- const auto context =
DB::Context::createCopy(SerializedPlanParser::global_context);
+ const auto context =
DB::Context::createCopy(QueryContext::globalContext());
context->setPath("./");
const Settings & settings = context->getSettingsRef();
@@ -396,7 +397,7 @@ TEST(WritePipeline, SparkMergeTree)
do_remove(merge_tree_table.relative_path);
- const auto dest_storage =
merge_tree_table.getStorage(SerializedPlanParser::global_context);
+ const auto dest_storage =
merge_tree_table.getStorage(QueryContext::globalMutableContext());
EXPECT_TRUE(dest_storage);
EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
DB::StorageMetadataPtr metadata_snapshot =
dest_storage->getInMemoryMetadataPtr();
@@ -437,7 +438,7 @@ TEST(WritePipeline, SparkMergeTree)
EXPECT_EQ(merge_tree_table_hdfs.relative_path,
"3.5/test/lineitem_mergetree_hdfs");
EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy,
"__hdfs_main");
- const auto dest_storage_hdfs =
merge_tree_table_hdfs.getStorage(SerializedPlanParser::global_context);
+ const auto dest_storage_hdfs =
merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
EXPECT_TRUE(dest_storage_hdfs);
EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]