This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 620d498da8 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250220)
(#8783)
620d498da8 is described below
commit 620d498da8fa6d84cefcd15b3660e962fb46b205
Author: Kyligence Git <[email protected]>
AuthorDate: Thu Feb 20 03:15:21 2025 -0600
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20250220) (#8783)
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250220)
* Fix build due to https://github.com/ClickHouse/ClickHouse/pull/75474
* Fix build due to https://github.com/ClickHouse/ClickHouse/pull/75452
* Rework "gluten_cache.local.enabled" config due to
https://github.com/ClickHouse/ClickHouse/pull/75452
Now, we still use "gluten_cache.local.enabled" in spark, and we will
convert it to "enable.gluten_cache.local" when initlize native backend.
* Fix memory access error exposed by
https://github.com/ClickHouse/ClickHouse/pull/75452
* Fix "<Error>... Possibly incorrect column size subtraction" due to
https://github.com/ClickHouse/ClickHouse/pull/75938
* Unified temporary directory
---------
Co-authored-by: kyligence-git <[email protected]>
Co-authored-by: Chang Chen <[email protected]>
---
.../gluten/backendsapi/clickhouse/CHConfig.scala | 9 +++++++
.../backendsapi/clickhouse/CHTransformerApi.scala | 9 +++++++
.../commands/GlutenCacheFilesCommand.scala | 9 +++----
...lutenClickHouseWholeStageTransformerSuite.scala | 12 ++++++---
.../GlutenClickHouseMergeTreeCacheDataSuite.scala | 2 +-
.../execution/tpch/GlutenClickHouseHDFSSuite.scala | 12 ++++++---
cpp-ch/clickhouse.version | 4 +--
cpp-ch/local-engine/Common/CHUtil.cpp | 22 ++++++++++++++-
cpp-ch/local-engine/Common/CHUtil.h | 2 --
cpp-ch/local-engine/Common/GlutenConfig.h | 8 ++++++
.../Disks/ObjectStorages/GlutenDiskHDFS.h | 2 +-
.../Parser/RelParsers/MergeTreeRelParser.cpp | 7 ++---
.../local-engine/Storages/Cache/CacheManager.cpp | 2 +-
.../Storages/MergeTree/MergeSparkMergeTreeTask.cpp | 2 +-
.../Storages/MergeTree/SparkMergeTreeSink.cpp | 4 +--
.../Storages/MergeTree/SparkStorageMergeTree.cpp | 1 +
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 31 +++++++++++++---------
17 files changed, 98 insertions(+), 40 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
index b682f67e93..4720db3ad7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHConfig.scala
@@ -97,6 +97,13 @@ object CHConfig {
+ " will be rewritten to `to_date(stringType)`")
.booleanConf
.createWithDefault(true)
+
+ val ENABLE_GLUTEN_LOCAL_FILE_CACHE =
+ buildConf(runtimeConfig("gluten_cache.local.enabled"))
+ .internal()
+ .doc("Enable local cache for CH backend.")
+ .booleanConf
+ .createWithDefault(false)
}
class CHConfig(conf: SQLConf) extends GlutenConfig(conf) {
@@ -121,4 +128,6 @@ class CHConfig(conf: SQLConf) extends GlutenConfig(conf) {
def enableCHRewriteDateConversion: Boolean =
getConf(ENABLE_CH_REWRITE_DATE_CONVERSION)
+
+ def enableGlutenLocalFileCache: Boolean =
getConf(ENABLE_GLUTEN_LOCAL_FILE_CACHE)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 311f446a1a..bec88d13d6 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -162,6 +162,15 @@ class CHTransformerApi extends TransformerApi with Logging
{
nativeConfMap.put(orcCompressionKey, "snappy")
}
}
+
+ if
(nativeConfMap.containsKey(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key)) {
+ // We can't use gluten_cache.local.enabled
+ // because FileCacheSettings doesn't contain this field.
+ nativeConfMap.put(
+ CHConfig.runtimeConfig("enable.gluten_cache.local"),
+ nativeConfMap.get(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key))
+ nativeConfMap.remove(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key)
+ }
}
override def getSupportExpressionClassName: util.Set[String] = {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
index 77c536d7ec..c39c68524d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
@@ -51,12 +51,9 @@ case class GlutenCacheFilesCommand(
AttributeReference("reason", StringType, nullable = false)())
override def run(session: SparkSession): Seq[Row] = {
- if (
- !session.sparkContext.getConf.getBoolean(
- CHConfig.runtimeConfig("gluten_cache.local.enabled"),
- defaultValue = false)
- ) {
- return Seq(Row(false, "Config `gluten_cache.local.enabled` is
disabled."))
+ if (!CHConfig.get.enableGlutenLocalFileCache) {
+ return Seq(
+ Row(false, s"Config `${CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key}`
is disabled."))
}
val targetFile = new Path(filePath)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index ca6c06ba71..0e557bd26b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -41,15 +41,17 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
}
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
- val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME"
- val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
+ private val TMP_PREFIX = s"/tmp/gluten/$SPARK_DIR_NAME"
+
+ val S3_METADATA_PATH = s"$TMP_PREFIX/s3/metadata"
+ val S3_CACHE_PATH = s"$TMP_PREFIX/s3/cache"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
val BUCKET_NAME: String = SPARK_DIR_NAME
val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
- val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME"
- val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
+ val HDFS_METADATA_PATH = s"$TMP_PREFIX/hdfs/metadata"
+ val HDFS_CACHE_PATH = s"$TMP_PREFIX/hdfs/cache"
val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"
@@ -58,6 +60,8 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
val CH_DEFAULT_STORAGE_DIR = "/data"
+ val LOCAL_CACHE_PATH = s"$TMP_PREFIX/local/cache"
+
protected def spark32: Boolean = sparkVersion.equals("3.2")
protected def spark33: Boolean = sparkVersion.equals("3.3")
protected def spark35: Boolean = sparkVersion.equals("3.5")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
index 364bd32e08..7db265de80 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala
@@ -595,7 +595,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
}
test("test disable cache files return") {
- withSQLConf(CHConfig.runtimeConfig("gluten_cache.local.enabled") ->
"false") {
+ withSQLConf(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key -> "false") {
runSql(
s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
noFallBack = false) {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
index 0ed9e1525c..b16ec5fe57 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution.tpch
+import org.apache.gluten.backendsapi.clickhouse.CHConfig
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
import org.apache.gluten.execution.{CHNativeCacheManager,
FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
@@ -36,7 +37,6 @@ class GlutenClickHouseHDFSSuite
rootPath +
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
override protected val queriesResults: String = rootPath + "queries-output"
- private val hdfsCachePath = "/tmp/gluten_hdfs_cache/"
private val cache_name = "gluten_cache"
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
@@ -49,11 +49,15 @@ class GlutenClickHouseHDFSSuite
.set("spark.sql.adaptive.enabled", "true")
.setCHConfig("use_local_format", true)
.set(prefixOf("shuffle.hash.algorithm"), "sparkMurmurHash3_32")
- .setCHConfig("gluten_cache.local.enabled", "true")
+ .set(CHConfig.ENABLE_GLUTEN_LOCAL_FILE_CACHE.key, "true")
.setCHConfig("gluten_cache.local.name", cache_name)
- .setCHConfig("gluten_cache.local.path", hdfsCachePath)
+ .setCHConfig("gluten_cache.local.path", LOCAL_CACHE_PATH)
.setCHConfig("gluten_cache.local.max_size", "10Gi")
- .setCHConfig("reuse_disk_cache", "false")
+ // If reuse_disk_cache is set to false,the cache will be deleted in
JNI_OnUnload
+ // but CacheManager and JobScheduler of backend are static global
variables
+ // and is destroyed at the end of the program which causes backend
reporting logical errors.
+ // TODO: fix reuse_disk_cache
+ .setCHConfig("reuse_disk_cache", "true")
.set("spark.sql.adaptive.enabled", "false")
// TODO: spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm =>
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index aae4e16a40..0940123e63 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250215
-CH_COMMIT=fa944483093
+CH_BRANCH=rebase_ch/20250220
+CH_COMMIT=c4a8e4a6d2f
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 3741bb3604..35366dcce0 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -512,7 +512,7 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
});
change_func("path");
- change_func("gluten_cache.local.path");
+ change_func(GlutenCacheConfig::PREFIX + ".path");
return changed_paths;
}
@@ -567,6 +567,26 @@ DB::Context::ConfigurationPtr
BackendInitializerUtil::initConfig(const SparkConf
BackendFinalizerUtil::paths_need_to_clean.insert(
BackendFinalizerUtil::paths_need_to_clean.end(),
path_need_clean.begin(), path_need_clean.end());
}
+
+ // FIXMEX: workaround for
https://github.com/ClickHouse/ClickHouse/pull/75452#pullrequestreview-2625467710
+ // entry in DiskSelector::initialize
+ // Bug in FileCacheSettings::loadFromConfig
+ auto updateCacheDiskType = [](Poco::Util::AbstractConfiguration & config) {
+ const std::string config_prefix = "storage_configuration.disks";
+ Poco::Util::AbstractConfiguration::Keys keys;
+ config.keys(config_prefix, keys);
+ for (const auto & disk_name : keys)
+ {
+ const auto disk_config_prefix = config_prefix + "." + disk_name;
+ const auto disk_type = config.getString(disk_config_prefix +
".type", "local");
+ if (disk_type == "cache")
+ config.setString(disk_config_prefix, "workaround");
+ }
+ config.setString(GlutenCacheConfig::PREFIX, "workaround");
+ };
+
+ updateCacheDiskType(*config);
+
return config;
}
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index ec3f0174ae..cc03bc0ce6 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -183,8 +183,6 @@ public:
inline static const String GLUTEN_TASK_OFFHEAP =
"spark.gluten.memory.task.offHeap.size.in.bytes";
- inline static const String GLUTEN_LOCAL_CACHE_PREFIX =
"gluten_cache.local.";
-
/// On yarn mode, native writing on hdfs cluster takes yarn container user
as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the
owner of the hdfs dir to be written.
/// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 1aefca1bc4..74ccc441fc 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -186,4 +186,12 @@ struct SparkSQLConfig
static SparkSQLConfig loadFromContext(const DB::ContextPtr & context);
};
+struct GlutenCacheConfig
+{
+ inline static const String PREFIX = "gluten_cache.local";
+
+ /// We can't use gluten_cache.local.enabled because FileCacheSettings
doesn't contain this field.
+ inline static const String ENABLED = "enable.gluten_cache.local";
+};
+
}
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 68942c6a47..33c85f9c39 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -81,7 +81,7 @@ public:
bool is_cache = object_storage->supportsCache();
if (is_cache)
{
- auto cache_os =
reinterpret_cast<DB::CachedObjectStorage*>(object_storage.get());
+ std::shared_ptr<DB::CachedObjectStorage> cache_os =
typeid_cast<std::shared_ptr<DB::CachedObjectStorage>>(object_storage);
object_storage = hdfs_object_storage;
auto cache =
DB::FileCacheFactory::instance().getOrCreate(cache_os->getCacheName(),
cache_os->getCacheSettings(), "storage_configuration.disks.hdfs_cache");
wrapWithCache(cache, cache_os->getCacheSettings(),
cache_os->getCacheConfigName());
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index c355c5d813..5d1b93c719 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -126,8 +126,12 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
LOG_DEBUG(getLogger("SerializedPlanParser"), "Try to read ({}) instead
of empty header", one_column_name_type.front().dump());
}
+ std::vector<DataPartPtr> selected_parts =
StorageMergeTreeFactory::getDataPartsByNames(
+ storage->getStorageID(), merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
+
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
+
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage,
storage->getInMemoryMetadataPtr());
auto names_and_types_list = input.getNamesAndTypesList();
@@ -141,9 +145,6 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
query_info->prewhere_info = parsePreWhereInfo(rel.filter(), input);
}
- std::vector<DataPartPtr> selected_parts =
StorageMergeTreeFactory::getDataPartsByNames(
- storage->getStorageID(), merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
-
auto read_step = storage->reader.readFromParts(
selected_parts,
storage->getMutationsSnapshot({}),
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index f6421d1b2d..121207aa7b 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -234,7 +234,7 @@ JobId
CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
const auto read_buffer_builder =
ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(),
context);
- if (context->getConfigRef().getBool("gluten_cache.local.enabled",
false))
+ if (context->getConfigRef().getBool(GlutenCacheConfig::ENABLED, false))
for (const auto & file : file_infos.items())
job.addTask(cacheFile(file, read_buffer_builder));
else
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
index cb0b3f2054..58e43dc12e 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
@@ -57,7 +57,7 @@ bool MergeSparkMergeTreeTask::executeStep()
std::optional<ThreadGroupSwitcher> switcher;
if (merge_list_entry)
{
- switcher.emplace((*merge_list_entry)->thread_group);
+ switcher.emplace((*merge_list_entry)->thread_group, "",
/*allow_existing_group*/ true);
}
switch (state)
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
index d1066b4143..2b50f62c74 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeSink.cpp
@@ -175,9 +175,9 @@ void SinkHelper::doMergePartsAsync(const
std::vector<PartWithStats> & merge_part
thread_pool.scheduleOrThrow(
[this, merge_parts_with_stats, thread_group =
CurrentThread::getGroup()]() -> void
{
+ ThreadGroupSwitcher switcher(thread_group, "AsyncMerge");
+
Stopwatch watch;
- CurrentThread::detachFromGroupIfNotDetached();
- CurrentThread::attachToGroup(thread_group);
size_t before_size = 0;
std::vector<DB::DataPartPtr> prepare_merge_parts_;
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 3e5601e860..2169926cdd 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -289,6 +289,7 @@ MergeTreeData::LoadPartResult
SparkStorageMergeTree::loadDataPart(
has_lightweight_delete_parts.store(true);
// without it "test mergetree optimize partitioned by one low card column"
will log ERROR
+ resetColumnSizes();
calculateColumnAndSecondaryIndexSizesIfNeeded();
LOG_TRACE(log, "Finished loading {} part {} on disk {}",
magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index b7beeb52ca..09ba291419 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -25,6 +25,7 @@
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <IO/BoundedReadBuffer.h>
+#include <IO/ParallelReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
@@ -33,7 +34,6 @@
#include <IO/SeekableReadBuffer.h>
#include <IO/SharedThreadPools.h>
#include <IO/SplittableBzip2ReadBuffer.h>
-#include <IO/ParallelReadBuffer.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
@@ -87,6 +87,12 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
}
+
+namespace FileCacheSetting
+{
+extern const FileCacheSettingsUInt64 max_size;
+extern const FileCacheSettingsString path;
+}
}
namespace local_engine
@@ -371,13 +377,13 @@ public:
if (!file_cache && config.s3_local_cache_enabled)
{
DB::FileCacheSettings file_cache_settings;
- file_cache_settings.max_size = config.s3_local_cache_max_size;
+ file_cache_settings[FileCacheSetting::max_size] =
config.s3_local_cache_max_size;
auto cache_base_path = config.s3_local_cache_cache_path;
if (!std::filesystem::exists(cache_base_path))
std::filesystem::create_directories(cache_base_path);
- file_cache_settings.base_path = cache_base_path;
+ file_cache_settings[FileCacheSetting::path] = cache_base_path;
file_cache =
DB::FileCacheFactory::instance().getOrCreate("s3_local_cache",
file_cache_settings, "");
file_cache->initialize();
}
@@ -684,7 +690,7 @@ DB::ReadSettings ReadBufferBuilder::getReadSettings() const
const auto & config = context->getConfigRef();
/// Override enable_filesystem_cache with gluten config
- read_settings.enable_filesystem_cache =
config.getBool("gluten_cache.local.enabled", false);
+ read_settings.enable_filesystem_cache =
config.getBool(GlutenCacheConfig::ENABLED, false);
/// Override remote_fs_prefetch with gluten config
read_settings.remote_fs_prefetch = config.getBool("hdfs.enable_async_io",
false);
@@ -798,23 +804,24 @@ ReadBufferBuilder::ReadBufferCreator
ReadBufferBuilder::wrapWithCache(
size_t file_size)
{
const auto & config = context->getConfigRef();
- if (!config.getBool("gluten_cache.local.enabled", false))
+ if (!config.getBool(GlutenCacheConfig::ENABLED, false))
return read_buffer_creator;
read_settings.enable_filesystem_cache = true;
if (!file_cache)
{
DB::FileCacheSettings file_cache_settings;
- file_cache_settings.loadFromConfig(config, "gluten_cache.local");
+ file_cache_settings.loadFromConfig(config, GlutenCacheConfig::PREFIX);
- if (std::filesystem::path(file_cache_settings.base_path).is_relative())
- file_cache_settings.base_path =
std::filesystem::path(context->getPath()) / "caches" /
file_cache_settings.base_path;
+ auto & base_path = file_cache_settings[FileCacheSetting::path].value;
+ if (std::filesystem::path(base_path).is_relative())
+ base_path = std::filesystem::path(context->getPath()) / "caches" /
base_path;
- if (!std::filesystem::exists(file_cache_settings.base_path))
- std::filesystem::create_directories(file_cache_settings.base_path);
+ if (!std::filesystem::exists(base_path))
+ std::filesystem::create_directories(base_path);
- const auto name = config.getString("gluten_cache.local.name");
- const auto * config_prefix = "";
+ const auto name = config.getString(GlutenCacheConfig::PREFIX +
".name");
+ std::string config_prefix;
file_cache = DB::FileCacheFactory::instance().getOrCreate(name,
file_cache_settings, config_prefix);
file_cache->initialize();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]