This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 54f5787a21 [GLUTEN-7542][CH] Fix cache not refresh (#7547)
54f5787a21 is described below

commit 54f5787a213e0e7388d1481da5742d1504c27979
Author: Shuai li <[email protected]>
AuthorDate: Wed Oct 16 16:04:50 2024 +0800

    [GLUTEN-7542][CH] Fix cache not refresh (#7547)
---
 .../execution/tpch/GlutenClickHouseHDFSSuite.scala |  50 ++++++++-
 .../local-engine/Common/FileCacheConcurrentMap.h   |  80 +++++++-------
 .../local-engine/Storages/Cache/CacheManager.cpp   |   3 +-
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp | 122 ++++++++++++---------
 .../Storages/SubstraitSource/ReadBufferBuilder.h   |  15 ++-
 5 files changed, 169 insertions(+), 101 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
index 614780dbbd..e18242dde3 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -22,8 +22,11 @@ import org.apache.gluten.execution.{CHNativeCacheManager, 
FileSourceScanExecTran
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
+import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs.Path
 
+import java.nio.charset.Charset
+
 class GlutenClickHouseHDFSSuite
   extends GlutenClickHouseTPCHAbstractSuite
   with AdaptiveSparkPlanHelper {
@@ -70,6 +73,11 @@ class GlutenClickHouseHDFSSuite
     deleteCache()
   }
 
+  override protected def afterAll(): Unit = {
+    deleteCache()
+    super.afterEach()
+  }
+
   private def deleteCache(): Unit = {
     val targetFile = new Path(tablesPath)
     val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
@@ -87,7 +95,6 @@ class GlutenClickHouseHDFSSuite
                 })
           }
         })
-    clearDataPath(hdfsCachePath)
   }
 
   val runWithoutCache: () => Unit = () => {
@@ -113,19 +120,19 @@ class GlutenClickHouseHDFSSuite
     }
   }
 
-  ignore("test hdfs cache") {
+  test("test hdfs cache") {
     runWithoutCache()
     runWithCache()
   }
 
-  ignore("test cache file command") {
+  test("test cache file command") {
     runSql(
       s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
       noFallBack = false) { _ => }
     runWithCache()
   }
 
-  ignore("test no cache by query") {
+  test("test no cache by query") {
     withSQLConf(
       
runtimeSettings("read_from_filesystem_cache_if_exists_otherwise_bypass_cache") 
-> "true") {
       runWithoutCache()
@@ -134,4 +141,39 @@ class GlutenClickHouseHDFSSuite
     runWithoutCache()
     runWithCache()
   }
+
+  test("GLUTEN-7542: Fix cache refresh") {
+    withSQLConf("spark.sql.hive.manageFilesourcePartitions" -> "false") {
+      val file_path = s"$tablesPath/issue_7542/"
+      val targetDirs = new Path(file_path)
+      val fs = targetDirs.getFileSystem(spark.sessionState.newHadoopConf())
+      fs.mkdirs(targetDirs)
+      val out = fs.create(new Path(s"$file_path/00000_0"))
+      IOUtils.write("1\n2\n3\n4\n5", out, Charset.defaultCharset())
+      out.close()
+      sql(s"""
+             |CREATE external TABLE `issue_7542`(
+             |  `c_custkey` int )
+             |using CSV
+             |LOCATION
+             |  '$file_path/'
+             |""".stripMargin)
+
+      sql(s"""select * from issue_7542""").collect()
+      fs.delete(new Path(s"$file_path/00000_0"), false)
+      val out2 = fs.create(new Path(s"$file_path/00000_0"))
+      IOUtils.write("1\n2\n3\n4\n3\n3\n3", out2, Charset.defaultCharset())
+      out2.close()
+      val df = sql(s"""select count(*) from issue_7542 where c_custkey=3""")
+      // refresh list file
+      collect(df.queryExecution.executedPlan) {
+        case scanExec: FileSourceScanExecTransformer => 
scanExec.relation.location.refresh()
+      }
+      val result = df.collect()
+      assert(result.length == 1)
+      assert(result.head.getLong(0) == 4)
+
+      sql("drop table issue_7542")
+    }
+  }
 }
diff --git a/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h 
b/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
index cb692d3c6f..2e6c4434eb 100644
--- a/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
+++ b/cpp-ch/local-engine/Common/FileCacheConcurrentMap.h
@@ -32,78 +32,78 @@ using ReadLock = std::shared_lock<Lock>;
 class FileCacheConcurrentMap
 {
 public:
-    void insert(const DB::FileCacheKey & key, const Int64 value)
+    void insert(const DB::FileCacheKey & key, const size_t last_modified_time, 
const size_t file_size)
     {
         WriteLock wLock(rw_locker);
-        map.emplace(key, value);
+        if (const auto it = map.find(key); it != map.end())
+            return;
+        map.emplace(key, std::tuple(last_modified_time, file_size));
     }
 
-    void
-    update_cache_time(const DB::FileCacheKey & key, const String & path, const 
Int64 accept_cache_time, const DB::FileCachePtr & file_cache)
+    void update_cache_time(
+        const DB::FileCacheKey & key, const Int64 new_modified_time, const 
size_t new_file_size, const DB::FileCachePtr & file_cache)
     {
         WriteLock wLock(rw_locker);
-        // double check
         auto it = map.find(key);
         if (it != map.end())
         {
-            if (it->second < accept_cache_time)
+            auto & [last_modified_time, file_size] = it->second;
+            if (last_modified_time < new_modified_time || file_size != 
new_file_size)
             {
                 // will delete cache file immediately
-                file_cache->removePathIfExists(path, 
DB::FileCache::getCommonUser().user_id);
+                file_cache->removeKeyIfExists(key, 
DB::FileCache::getCommonUser().user_id);
                 // update cache time
-                map[key] = accept_cache_time;
+                map[key] = std::tuple(new_modified_time, new_file_size);
             }
         }
         else
         {
             // will delete cache file immediately
-            file_cache->removePathIfExists(path, 
DB::FileCache::getCommonUser().user_id);
+            file_cache->removeKeyIfExists(key, 
DB::FileCache::getCommonUser().user_id);
             // set cache time
-            map.emplace(key, accept_cache_time);
+            map.emplace(key, std::tuple(new_modified_time, new_file_size));
         }
     }
 
-    std::optional<Int64> get(const DB::FileCacheKey & key)
+    std::optional<std::tuple<size_t, size_t>> get(const DB::FileCacheKey & key)
     {
         ReadLock rLock(rw_locker);
         auto it = map.find(key);
         if (it == map.end())
-        {
             return std::nullopt;
-        }
         return it->second;
     }
 
-    bool contain(const DB::FileCacheKey & key)
-    {
-        ReadLock rLock(rw_locker);
-        return map.contains(key);
-    }
-
-    void erase(const DB::FileCacheKey & key)
-    {
-        WriteLock wLock(rw_locker);
-        if (map.find(key) == map.end())
-        {
-            return;
-        }
-        map.erase(key);
-    }
-
-    void clear()
-    {
-        WriteLock wLock(rw_locker);
-        map.clear();
-    }
+    // bool contain(const DB::FileCacheKey & key)
+    // {
+    //     ReadLock rLock(rw_locker);
+    //     return map.contains(key);
+    // }
 
-    size_t size() const
-    {
-        ReadLock rLock(rw_locker);
-        return map.size();
-    }
+    // void erase(const DB::FileCacheKey & key)
+    // {
+    //     WriteLock wLock(rw_locker);
+    //     if (map.find(key) == map.end())
+    //     {
+    //         return;
+    //     }
+    //     map.erase(key);
+    // }
+    //
+    // void clear()
+    // {
+    //     WriteLock wLock(rw_locker);
+    //     map.clear();
+    // }
+    //
+    // size_t size() const
+    // {
+    //     ReadLock rLock(rw_locker);
+    //     return map.size();
+    // }
 
 private:
-    std::unordered_map<DB::FileCacheKey, Int64> map;
+    std::unordered_map<DB::FileCacheKey, std::tuple<size_t, size_t>> map;
     mutable Lock rw_locker;
 };
 }
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index 25726dc24f..bb688724c3 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -208,13 +208,14 @@ JobId 
CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
 {
     JobId id = toString(UUIDHelpers::generateV4());
     Job job(id);
+    DB::ReadSettings read_settings = context->getReadSettings();
 
     if (file_infos.items_size())
     {
         const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
         const auto read_buffer_builder = 
ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), 
context);
 
-        if (read_buffer_builder->file_cache)
+        if (context->getConfigRef().getBool("gluten_cache.local.enabled", 
false))
             for (const auto & file : file_infos.items())
                 job.addTask(cacheFile(file, read_buffer_builder));
         else
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 3fa9863607..0e2d8f41be 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -48,7 +48,6 @@
 #include <Poco/Logger.h>
 #include <Poco/URI.h>
 #include <Common/CHUtil.h>
-#include <Common/FileCacheConcurrentMap.h>
 #include <Common/GlutenConfig.h>
 #include <Common/GlutenSettings.h>
 #include <Common/logger_useful.h>
@@ -86,6 +85,9 @@ namespace ErrorCodes
 
 namespace local_engine
 {
+
+FileCacheConcurrentMap ReadBufferBuilder::files_cache_time_map;
+
 template <class key_type, class value_type>
 class ConcurrentLRU
 {
@@ -246,8 +248,12 @@ public:
         }
 
         size_t file_size = 0;
+        size_t modified_time = 0;
         if (file_info.has_properties())
+        {
             file_size = file_info.properties().filesize();
+            modified_time = file_info.properties().modificationtime();
+        }
 
         std::unique_ptr<DB::ReadBuffer> read_buffer;
 
@@ -281,8 +287,9 @@ public:
                     hdfs_uri, hdfs_file_path, config, read_settings, 
read_util_position, true, object.bytes_size);
             };
 
-            DB::StoredObjects 
stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}};
-            auto cache_creator = wrapWithCache(hdfs_read_buffer_creator, 
read_settings);
+            auto remote_path = file_uri.getPath().substr(1);
+            DB::StoredObjects stored_objects{DB::StoredObject{remote_path, "", 
file_size}};
+            auto cache_creator = wrapWithCache(hdfs_read_buffer_creator, 
read_settings, remote_path, modified_time, file_size);
             auto cache_hdfs_read = 
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
                 std::move(cache_creator), stored_objects, read_settings, 
nullptr, /* use_external_buffer */ false);
             cache_hdfs_read->setReadUntilPosition(read_util_position);
@@ -447,24 +454,6 @@ public:
         size_t object_size = object_info.size;
         Int64 object_modified_time = object_info.last_modification_time;
 
-        if (read_settings.enable_filesystem_cache)
-        {
-            auto file_cache_key = DB::FileCacheKey::fromPath(pathKey);
-            auto last_cache_time = files_cache_time_map.get(file_cache_key);
-            // quick check
-            if (last_cache_time != std::nullopt && last_cache_time.has_value())
-            {
-                if (last_cache_time.value() < object_modified_time*1000l) 
//second to milli second
-                {
-                    files_cache_time_map.update_cache_time(file_cache_key, 
pathKey, object_modified_time*1000l, file_cache);
-                }
-            }
-            else
-            {
-                files_cache_time_map.update_cache_time(file_cache_key, 
pathKey, object_modified_time*1000l, file_cache);
-            }
-        }
-
         auto read_buffer_creator
             = [bucket, client, read_settings, this](bool restricted_seek, 
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
         {
@@ -481,7 +470,7 @@ public:
                     restricted_seek);
         };
 
-        auto cache_creator = wrapWithCache(read_buffer_creator, read_settings);
+        auto cache_creator = wrapWithCache(read_buffer_creator, read_settings, 
pathKey, object_modified_time, object_size);
 
         DB::StoredObjects stored_objects{DB::StoredObject{pathKey, "", 
object_size}};
         auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
@@ -519,8 +508,6 @@ public:
 private:
     static const std::string SHARED_CLIENT_KEY;
     static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> 
per_bucket_clients;
-    static FileCacheConcurrentMap files_cache_time_map;
-    DB::FileCachePtr file_cache;
 
     static std::string toBucketNameSetting(const std::string & bucket_name, 
const std::string & config_name)
     {
@@ -690,7 +677,6 @@ private:
 };
 const std::string S3FileReadBufferBuilder::SHARED_CLIENT_KEY = 
"___shared-client___";
 ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> 
S3FileReadBufferBuilder::per_bucket_clients(100);
-FileCacheConcurrentMap S3FileReadBufferBuilder::files_cache_time_map;
 
 #endif
 
@@ -762,11 +748,14 @@ ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr 
context_) : context(context_
 DB::ReadSettings ReadBufferBuilder::getReadSettings() const
 {
     DB::ReadSettings read_settings = context->getReadSettings();
-    read_settings.enable_filesystem_cache = false;
+    if (context->getConfigRef().getBool("gluten_cache.local.enabled", false))
+        read_settings.enable_filesystem_cache = true;
+    else
+        read_settings.enable_filesystem_cache = false;
+
     return read_settings;
 }
 
-
 std::unique_ptr<DB::ReadBuffer>
 ReadBufferBuilder::buildWithCompressionWrapper(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position)
 {
@@ -778,8 +767,12 @@ ReadBufferBuilder::buildWithCompressionWrapper(const 
substrait::ReadRel::LocalFi
     return compression != DB::CompressionMethod::None ? 
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) : 
std::move(in);
 }
 
-ReadBufferBuilder::ReadBufferCreator
-ReadBufferBuilder::wrapWithCache(ReadBufferCreator read_buffer_creator, 
DB::ReadSettings & read_settings)
+ReadBufferBuilder::ReadBufferCreator ReadBufferBuilder::wrapWithCache(
+    ReadBufferCreator read_buffer_creator,
+    DB::ReadSettings & read_settings,
+    const String & key,
+    const size_t & last_modified_time,
+    const size_t & file_size)
 {
     const auto & config = context->getConfigRef();
     if (!config.getBool("gluten_cache.local.enabled", false))
@@ -802,34 +795,57 @@ ReadBufferBuilder::wrapWithCache(ReadBufferCreator 
read_buffer_creator, DB::Read
         file_cache->initialize();
     }
 
-    if (file_cache->isInitialized())
+    if (!file_cache->isInitialized())
     {
-        return [read_buffer_creator, read_settings, this](
+        file_cache->throwInitExceptionIfNeeded();
+        return read_buffer_creator;
+    }
+
+    updateCaches(key, last_modified_time, file_size);
+
+    return [read_buffer_creator, read_settings, this](
                    bool restricted_seek, const DB::StoredObject & object) -> 
std::unique_ptr<DB::ReadBufferFromFileBase>
-        {
-            auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
-            auto modified_read_settings = read_settings.withNestedBuffer();
-            auto rbc = [=, this]() { return 
read_buffer_creator(restricted_seek, object); };
-
-            return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
-                object.remote_path,
-                cache_key,
-                file_cache,
-                DB::FileCache::getCommonUser(),
-                rbc,
-                modified_read_settings,
-                std::string(DB::CurrentThread::getQueryId()),
-                object.bytes_size,
-                /* allow_seeks */ 
!read_settings.remote_read_buffer_restrict_seek,
-                /* use_external_buffer */ true,
-                /* read_until_position */ std::nullopt,
-                context->getFilesystemCacheLog());
-        };
+    {
+        auto cache_key = DB::FileCacheKey::fromPath(object.remote_path);
+        auto modified_read_settings = read_settings.withNestedBuffer();
+        auto rbc = [=, this]() { return read_buffer_creator(restricted_seek, 
object); };
+
+        return std::make_unique<DB::CachedOnDiskReadBufferFromFile>(
+            object.remote_path,
+            cache_key,
+            file_cache,
+            DB::FileCache::getCommonUser(),
+            rbc,
+            modified_read_settings,
+            std::string(DB::CurrentThread::getQueryId()),
+            object.bytes_size,
+            /* allow_seeks */ !read_settings.remote_read_buffer_restrict_seek,
+            /* use_external_buffer */ true,
+            /* read_until_position */ std::nullopt,
+            context->getFilesystemCacheLog());
+    };
+}
+
+void ReadBufferBuilder::updateCaches(const String & key, const size_t & 
last_modified_time, const size_t & file_size) const
+{
+    if (!file_cache)
+        return;
+
+    auto file_cache_key = DB::FileCacheKey::fromPath(key);
+    auto last_cache_time = files_cache_time_map.get(file_cache_key);
+    // quick check
+    if (last_cache_time != std::nullopt && last_cache_time.has_value())
+    {
+        auto & [cached_modified_time, cached_file_size] = 
last_cache_time.value();
+        if (cached_modified_time < last_modified_time || cached_file_size != 
file_size)
+            files_cache_time_map.update_cache_time(file_cache_key, 
last_modified_time, file_size, file_cache);
     }
     else
-        file_cache->throwInitExceptionIfNeeded();
-
-    return read_buffer_creator;
+    {
+        // if process restart, cache map will be empty,
+        //   we recommend continuing to use caching instead of renew it
+        files_cache_time_map.insert(file_cache_key, last_modified_time, 
file_size);
+    }
 }
 
 ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index e3463e1b8c..5e63a93e3d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -18,10 +18,11 @@
 
 #include <functional>
 #include <memory>
+#include <Disks/ObjectStorages/StoredObject.h>
 #include <IO/ReadBuffer.h>
 #include <IO/ReadBufferFromFileBase.h>
-#include <Disks/ObjectStorages/StoredObject.h>
 #include <substrait/plan.pb.h>
+#include <Common/FileCacheConcurrentMap.h>
 
 
 namespace local_engine
@@ -44,14 +45,22 @@ public:
 protected:
     using ReadBufferCreator = 
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, 
const DB::StoredObject & object)>;
 
-    ReadBufferCreator
-    wrapWithCache(ReadBufferCreator read_buffer_creator, DB::ReadSettings & 
read_settings);
+    ReadBufferCreator wrapWithCache(
+        ReadBufferCreator read_buffer_creator,
+        DB::ReadSettings & read_settings,
+        const String & key,
+        const size_t & last_modified_time,
+        const size_t & file_size);
 
     DB::ReadSettings getReadSettings() const;
     DB::ContextPtr context;
 
+private:
+    void updateCaches(const String & key, const size_t & last_modified_time, 
const size_t & file_size) const;
+
 public:
     DB::FileCachePtr file_cache = nullptr;
+    static FileCacheConcurrentMap files_cache_time_map;
 };
 
 using ReadBufferBuilderPtr = std::shared_ptr<ReadBufferBuilder>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to