This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 54f5787a21 [GLUTEN-7542][CH] Fix cache not refresh (#7547)
54f5787a21 is described below
commit 54f5787a213e0e7388d1481da5742d1504c27979
Author: Shuai li <[email protected]>
AuthorDate: Wed Oct 16 16:04:50 2024 +0800
[GLUTEN-7542][CH] Fix cache not refresh (#7547)
---
.../execution/tpch/GlutenClickHouseHDFSSuite.scala | 50 ++++++++-
.../local-engine/Common/FileCacheConcurrentMap.h | 80 +++++++-------
.../local-engine/Storages/Cache/CacheManager.cpp | 3 +-
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 122 ++++++++++++---------
.../Storages/SubstraitSource/ReadBufferBuilder.h | 15 ++-
5 files changed, 169 insertions(+), 101 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
index 614780dbbd..e18242dde3 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -22,8 +22,11 @@ import org.apache.gluten.execution.{CHNativeCacheManager,
FileSourceScanExecTran
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.Path
+import java.nio.charset.Charset
+
class GlutenClickHouseHDFSSuite
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
@@ -70,6 +73,11 @@ class GlutenClickHouseHDFSSuite
deleteCache()
}
+ override protected def afterAll(): Unit = {
+ deleteCache()
+ super.afterEach()
+ }
+
private def deleteCache(): Unit = {
val targetFile = new Path(tablesPath)
val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
@@ -87,7 +95,6 @@ class GlutenClickHouseHDFSSuite
})
}
})
- clearDataPath(hdfsCachePath)
}
val runWithoutCache: () => Unit = () => {
@@ -113,19 +120,19 @@ class GlutenClickHouseHDFSSuite
}
}
- ignore("test hdfs cache") {
+ test("test hdfs cache") {
runWithoutCache()
runWithCache()
}
- ignore("test cache file command") {
+ test("test cache file command") {
runSql(
s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
noFallBack = false) { _ => }
runWithCache()
}
- ignore("test no cache by query") {
+ test("test no cache by query") {
withSQLConf(
runtimeSettings("read_from_filesystem_cache_if_exists_otherwise_bypass_cache")
-> "true") {
runWithoutCache()
@@ -134,4 +141,39 @@ class GlutenClickHouseHDFSSuite
runWithoutCache()
runWithCache()
}
+
+ test("GLUTEN-7542: Fix cache refresh") {
+ withSQLConf("spark.sql.hive.manageFilesourcePartitions" -> "false") {
+ val file_path = s"$tablesPath/issue_7542/"
+ val targetDirs = new Path(file_path)
+ val fs = targetDirs.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.mkdirs(targetDirs)
+ val out = fs.create(new Path(s"$file_path/00000_0"))
+ IOUtils.write("1\n2\n3\n4\n5", out, Charset.defaultCharset())
+ out.close()
+ sql(s"""
+ |CREATE external TABLE `issue_7542`(
+ | `c_custkey` int )
+ |using CSV
+ |LOCATION
+ | '$file_path/'
+ |""".stripMargin)
+
+ sql(s"""select * from issue_7542""").collect()
+ fs.delete(new Path(s"$file_path/00000_0"), false)
+ val out2 = fs.create(new Path(s"$file_path/00000_0"))
+ IOUtils.write("1\n2\n3\n4\n3\n3\n3", out2, Charset.defaultCharset())
+ out2.close()
+ val df = sql(s"""select count(*) from issue_7542 where c_custkey=3""")
+ // refresh list file
+ collect(df.queryExecution.executedPlan) {
+ case scanExec: FileSourceScanExecTransformer =>
scanExec.relation.location.refresh()
+ }
+ val result = df.collect()
+ assert(result.length == 1)
+ assert(result.head.getLong(0) == 4)
+
+ sql("drop table issue_7542")
+ }
+ }
}
diff --git a/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
b/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
index cb692d3c6f..2e6c4434eb 100644
--- a/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
+++ b/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
@@ -32,78 +32,78 @@ using ReadLock = std::shared_lock<Lock>;
class FileCacheConcurrentMap
{
public:
- void insert(const DB::FileCacheKey & key, const Int64 value)
+ void insert(const DB::FileCacheKey & key, const size_t last_modified_time,
const size_t file_size)
{
WriteLock wLock(rw_locker);
- map.emplace(key, value);
+ if (const auto it = map.find(key); it != map.end())
+ return;
+ map.emplace(key, std::tuple(last_modified_time, file_size));
}
- void
- update_cache_time(const DB::FileCacheKey & key, const String & path, const
Int64 accept_cache_time, const DB::FileCachePtr & file_cache)
+ void update_cache_time(
+ const DB::FileCacheKey & key, const Int64 new_modified_time, const
size_t new_file_size, const DB::FileCachePtr & file_cache)
{
WriteLock wLock(rw_locker);
- // double check
auto it = map.find(key);
if (it != map.end())
{
- if (it->second < accept_cache_time)
+ auto & [last_modified_time, file_size] = it->second;
+ if (last_modified_time < new_modified_time || file_size !=
new_file_size)
{
// will delete cache file immediately
- file_cache->removePathIfExists(path,
DB::FileCache::getCommonUser().user_id);
+ file_cache->removeKeyIfExists(key,
DB::FileCache::getCommonUser().user_id);
// update cache time
- map[key] = accept_cache_time;
+ map[key] = std::tuple(new_modified_time, new_file_size);
}
}
else
{
// will delete cache file immediately
- file_cache->removePathIfExists(path,
DB::FileCache::getCommonUser().user_id);
+ file_cache->removeKeyIfExists(key,
DB::FileCache::getCommonUser().user_id);
// set cache time
- map.emplace(key, accept_cache_time);
+ map.emplace(key, std::tuple(new_modified_time, new_file_size));
}
}
- std::optional<Int64> get(const DB::FileCacheKey & key)
+ std::optional<std::tuple<size_t, size_t>> get(const DB::FileCacheKey & key)
{
ReadLock rLock(rw_locker);
auto it = map.find(key);
if (it == map.end())
- {
return std::nullopt;
- }
return it->second;
}
- bool contain(const DB::FileCacheKey & key)
- {
- ReadLock rLock(rw_locker);
- return map.contains(key);
- }
-
- void erase(const DB::FileCacheKey & key)
- {
- WriteLock wLock(rw_locker);
- if (map.find(key) == map.end())
- {
- return;
- }
- map.erase(key);
- }
-
- void clear()
- {
- WriteLock wLock(rw_locker);
- map.clear();
- }
+ // bool contain(const DB::FileCacheKey & key)
+ // {
+ // ReadLock rLock(rw_locker);
+ // return map.contains(key);
+ // }
- size_t size() const
- {
- ReadLock rLock(rw_locker);
- return map.size();
- }
+ // void erase(const DB::FileCacheKey & key)
+ // {
+ // WriteLock wLock(rw_locker);
+ // if (map.find(key) == map.end())
+ // {
+ // return;
+ // }
+ // map.erase(key);
+ // }
+ //
+ // void clear()
+ // {
+ // WriteLock wLock(rw_locker);
+ // map.clear();
+ // }
+ //
+ // size_t size() const
+ // {
+ // ReadLock rLock(rw_locker);
+ // return map.size();
+ // }
private:
- std::unordered_map<DB::FileCacheKey, Int64> map;
+ std::unordered_map<DB::FileCacheKey, std::tuple<size_t, size_t>> map;
mutable Lock rw_locker;
};
}
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index 25726dc24f..bb688724c3 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -208,13 +208,14 @@ JobId
CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
+ DB::ReadSettings read_settings = context->getReadSettings();
if (file_infos.items_size())
{
const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
const auto read_buffer_builder =
ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(),
context);
- if (read_buffer_builder->file_cache)
+ if (context->getConfigRef().getBool("gluten_cache.local.enabled",
false))
for (const auto & file : file_infos.items())
job.addTask(cacheFile(file, read_buffer_builder));
else
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 3fa9863607..0e2d8f41be 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -48,7 +48,6 @@
#include <Poco/Logger.h>
#include <Poco/URI.h>
#include <Common/CHUtil.h>
-#include <Common/FileCacheConcurrentMap.h>
#include <Common/GlutenConfig.h>
#include <Common/GlutenSettings.h>
#include <Common/logger_useful.h>
@@ -86,6 +85,9 @@ namespace ErrorCodes
namespace local_engine
{
+
+FileCacheConcurrentMap ReadBufferBuilder::files_cache_time_map;
+
template <class key_type, class value_type>
class ConcurrentLRU
{
@@ -246,8 +248,12 @@ public:
}
size_t file_size = 0;
+ size_t modified_time = 0;
if (file_info.has_properties())
+ {
file_size = file_info.properties().filesize();
+ modified_time = file_info.properties().modificationtime();
+ }
std::unique_ptr<DB::ReadBuffer> read_buffer;
@@ -281,8 +287,9 @@ public:
hdfs_uri, hdfs_file_path, config, read_settings,
read_util_position, true, object.bytes_size);
};
- DB::StoredObjects
stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}};
- auto cache_creator = wrapWithCache(hdfs_read_buffer_creator,
read_settings);
+ auto remote_path = file_uri.getPath().substr(1);
+ DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "",
file_size}};
+ auto cache_creator = wrapWithCache(hdfs_read_buffer_creator,
read_settings, remote_path, modified_time, file_size);
auto cache_hdfs_read =
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
std::move(cache_creator), stored_objects, read_settings,
nullptr, /* use_external_buffer */ false);
cache_hdfs_read->setReadUntilPosition(read_util_position);
@@ -447,24 +454,6 @@ public:
size_t object_size = object_info.size;
Int64 object_modified_time = object_info.last_modification_time;
- if (read_settings.enable_filesystem_cache)
- {
- auto file_cache_key = DB::FileCacheKey::fromPath(pathKey);
- auto last_cache_time = files_cache_time_map.get(file_cache_key);
- // quick check
- if (last_cache_time != std::nullopt && last_cache_time.has_value())
- {
- if (last_cache_time.value() < object_modified_time*1000l)
//second to milli second
- {
- files_cache_time_map.update_cache_time(file_cache_key,
pathKey, object_modified_time*1000l, file_cache);
- }
- }
- else
- {
- files_cache_time_map.update_cache_time(file_cache_key,
pathKey, object_modified_time*1000l, file_cache);
- }
- }
-
auto read_buffer_creator
= [bucket, client, read_settings, this](bool restricted_seek,
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
@@ -481,7 +470,7 @@ public:
restricted_seek);
};
- auto cache_creator = wrapWithCache(read_buffer_creator, read_settings);
+ auto cache_creator = wrapWithCache(read_buffer_creator, read_settings,
pathKey, object_modified_time, object_size);
DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "",
object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
@@ -519,8 +508,6 @@ public:
private:
static const std::string SHARED_CLIENT_KEY;
static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>>
per_bucket_clients;
- static FileCacheConcurrentMap files_cache_time_map;
- DB::FileCachePtr file_cache;
static std::string toBucketNameSetting(const std::string & bucket_name,
const std::string & config_name)
{
@@ -690,7 +677,6 @@ private:
};
const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY =
"___shared-client___";
ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>>
S3FileReadBufferBuilder::per_bucket_clients(100);
-FileCacheConcurrentMap S3FileReadBufferBuilder::files_cache_time_map;
#endif
@@ -762,11 +748,14 @@ ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr
context_) : context(context_
DB::ReadSettings ReadBufferBuilder::getReadSettings() const
{
DB::ReadSettings read_settings = context->getReadSettings();
- read_settings.enable_filesystem_cache = false;
+ if (context->getConfigRef().getBool("gluten_cache.local.enabled", false))
+ read_settings.enable_filesystem_cache = true;
+ else
+ read_settings.enable_filesystem_cache = false;
+
return read_settings;
}
-
std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::buildWithCompressionWrapper(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position)
{
@@ -778,8 +767,12 @@ ReadBufferBuilder::buildWithCompressionWrapper(const
substrait::ReadRel::LocalFi
return compression != DB::CompressionMethod::None ?
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) :
std::move(in);
}
-ReadBufferBuilder::ReadBufferCreator
-ReadBufferBuilder::wrapWithCache(ReadBufferCreator read_buffer_creator,
DB::ReadSettings & read_settings)
+ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache(
+ ReadBufferCreator read_buffer_creator,
+ DB::ReadSettings & read_settings,
+ const String & key,
+ const size_t & last_modified_time,
+ const size_t & file_size)
{
const auto & config = context->getConfigRef();
if (!config.getBool("gluten_cache.local.enabled", false))
@@ -802,34 +795,57 @@ ReadBufferBuilder::wrapWithCache(ReadBufferCreator
read_buffer_creator, DB::Read
file_cache->initialize();
}
- if (file_cache->isInitialized())
+ if (!file_cache->isInitialized())
{
- return [read_buffer_creator, read_settings, this](
+ file_cache->throwInitExceptionIfNeeded();
+ return read_buffer_creator;
+ }
+
+ updateCaches(key, last_modified_time, file_size);
+
+ return [read_buffer_creator, read_settings, this](
bool restricted_seek, const DB::StoredObject & object) ->
std::unique_ptr<DB::ReadBufferFromFileBase>
- {
- auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
- auto modified_read_settings = read_settings.withNestedBuffer();
- auto rbc = [=, this]() { return
read_buffer_creator(restricted_seek, object); };
-
- return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
- object.remote_path,
- cache_key,
- file_cache,
- DB::FileCache::getCommonUser(),
- rbc,
- modified_read_settings,
- std::string(DB::CurrentThread::getQueryId()),
- object.bytes_size,
- /* allow_seeks */
!read_settings.remote_read_buffer_restrict_seek,
- /* use_external_buffer */ true,
- /* read_until_position */ std::nullopt,
- context->getFilesystemCacheLog());
- };
+ {
+ auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
+ auto modified_read_settings = read_settings.withNestedBuffer();
+ auto rbc = [=, this]() { return read_buffer_creator(restricted_seek,
object); };
+
+ return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
+ object.remote_path,
+ cache_key,
+ file_cache,
+ DB::FileCache::getCommonUser(),
+ rbc,
+ modified_read_settings,
+ std::string(DB::CurrentThread::getQueryId()),
+ object.bytes_size,
+ /* allow_seeks */ !read_settings.remote_read_buffer_restrict_seek,
+ /* use_external_buffer */ true,
+ /* read_until_position */ std::nullopt,
+ context->getFilesystemCacheLog());
+ };
+}
+
+void ReadBufferBuilder::updateCaches(const String & key, const size_t &
last_modified_time, const size_t & file_size) const
+{
+ if (!file_cache)
+ return;
+
+ auto file_cache_key = DB::FileCacheKey::fromPath(key);
+ auto last_cache_time = files_cache_time_map.get(file_cache_key);
+ // quick check
+ if (last_cache_time != std::nullopt && last_cache_time.has_value())
+ {
+ auto & [cached_modified_time, cached_file_size] =
last_cache_time.value();
+ if (cached_modified_time < last_modified_time || cached_file_size !=
file_size)
+ files_cache_time_map.update_cache_time(file_cache_key,
last_modified_time, file_size, file_cache);
}
else
- file_cache->throwInitExceptionIfNeeded();
-
- return read_buffer_creator;
+ {
+ // if process restart, cache map will be empty,
+ // we recommend continuing to use caching instead of renew it
+ files_cache_time_map.insert(file_cache_key, last_modified_time,
file_size);
+ }
}
ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index e3463e1b8c..5e63a93e3d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -18,10 +18,11 @@
#include <functional>
#include <memory>
+#include <Disks/ObjectStorages/StoredObject.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
-#include <Disks/ObjectStorages/StoredObject.h>
#include <substrait/plan.pb.h>
+#include <Common/FileCacheConcurrentMap.h>
namespace local_engine
@@ -44,14 +45,22 @@ public:
protected:
using ReadBufferCreator =
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek,
const DB::StoredObject & object)>;
- ReadBufferCreator
- wrapWithCache(ReadBufferCreator read_buffer_creator, DB::ReadSettings &
read_settings);
+ ReadBufferCreator wrapWithCache(
+ ReadBufferCreator read_buffer_creator,
+ DB::ReadSettings & read_settings,
+ const String & key,
+ const size_t & last_modified_time,
+ const size_t & file_size);
DB::ReadSettings getReadSettings() const;
DB::ContextPtr context;
+private:
+ void updateCaches(const String & key, const size_t & last_modified_time,
const size_t & file_size) const;
+
public:
DB::FileCachePtr file_cache = nullptr;
+ static FileCacheConcurrentMap files_cache_time_map;
};
using ReadBufferBuilderPtr = std::shared_ptr<ReadBufferBuilder>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]