Ted-Jiang commented on code in PR #7570:
URL: https://github.com/apache/arrow-datafusion/pull/7570#discussion_r1327088091


##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = 
ObjectMeta>>;
+
+#[derive(Default)]
+pub struct CacheManager {
+    file_statistic_cache: Option<FileStaticCache>,
+}
+
+impl CacheManager {
+    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
+        let mut manager = CacheManager::default();
+        if let Some(cc) = &config.table_files_statistics_cache {
+            manager.file_statistic_cache = Some(cc.clone())
+        }
+        Ok(Arc::new(manager))
+    }
+
+    pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {
+        self.file_statistic_cache.clone()
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct CacheManagerConfig {
+    /// Enable cache of files statistics when listing files.
+    /// Avoid get same file statistics repeatedly in same datafusion session.
+    /// Default is disable. Fow now only supports Parquet files.
+    pub table_files_statistics_cache: Option<FileStaticCache>,
+}
+
+impl CacheManagerConfig {
+    pub fn enable_table_files_statistics_cache(mut self, cache: 
FileStaticCache) -> Self {

Review Comment:
   I think here, crate user can construct self-define `FileStaticCache` to add 
specific caching policy they want 😄 
   
   



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -510,39 +509,6 @@ impl ListingOptions {
     }
 }
 
-/// Collected statistics for files
-/// Cache is invalided when file size or last modification has changed
-#[derive(Default)]
-struct StatisticsCache {
-    statistics: DashMap<Path, (ObjectMeta, Statistics)>,
-}
-
-impl StatisticsCache {

Review Comment:
   Move to cache dir



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = 
ObjectMeta>>;
+
+#[derive(Default)]
+pub struct CacheManager {
+    file_statistic_cache: Option<FileStaticCache>,
+}
+
+impl CacheManager {
+    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
+        let mut manager = CacheManager::default();
+        if let Some(cc) = &config.table_files_statistics_cache {
+            manager.file_statistic_cache = Some(cc.clone())
+        }
+        Ok(Arc::new(manager))
+    }
+
+    pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {
+        self.file_statistic_cache.clone()
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct CacheManagerConfig {
+    /// Enable cache of files statistics when listing files.
+    /// Avoid get same file statistics repeatedly in same datafusion session.
+    /// Default is disable. Fow now only supports Parquet files.
+    pub table_files_statistics_cache: Option<FileStaticCache>,
+}
+
+impl CacheManagerConfig {
+    pub fn enable_table_files_statistics_cache(mut self, cache: 
FileStaticCache) -> Self {

Review Comment:
   I think here, crate user can construct self-define `FileStaticCache` to add 
specific caching policy they want 😄 
   
   



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1092,6 +1092,95 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn load_table_stats_with_session_level_cache() -> Result<()> {
+        let testdata = crate::test_util::parquet_test_data();

Review Comment:
   Add test to check cache share in session level



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use dashmap::DashMap;
+use datafusion_common::Statistics;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+
+/// Collected statistics for files
+/// Cache is invalided when file size or last modification has changed
+#[derive(Default)]
+pub struct FileStatisticsCache {
+    statistics: DashMap<Path, (ObjectMeta, Statistics)>,
+}
+
+impl CacheAccessor<Path, Statistics> for FileStatisticsCache {
+    type Extra = ObjectMeta;
+
+    /// Get `Statistics` for file location.
+    fn get(&self, k: &Path) -> Option<Statistics> {
+        self.statistics
+            .get(k)
+            .map(|s| Some(s.value().1.clone()))
+            .unwrap_or(None)
+    }
+
+    /// Get `Statistics` for file location. Returns None if file has changed 
or not found.
+    fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Statistics> {
+        self.statistics
+            .get(k)
+            .map(|s| {
+                let (saved_meta, statistics) = s.value();
+                if saved_meta.size != e.size
+                    || saved_meta.last_modified != e.last_modified
+                {
+                    // file has changed
+                    None
+                } else {
+                    Some(statistics.clone())
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    /// Save collected file statistics
+    fn put(&self, _key: &Path, _value: Statistics) -> Option<Statistics> {
+        panic!("Put cache in FileStatisticsCache without Extra not supported.")
+    }
+
+    fn put_with_extra(
+        &self,
+        key: &Path,
+        value: Statistics,
+        e: &Self::Extra,
+    ) -> Option<Statistics> {
+        self.statistics
+            .insert(key.clone(), (e.clone(), value))
+            .map(|x| x.1)
+    }
+
+    fn evict(&self, k: &Path) -> bool {
+        self.statistics.remove(k).is_some()
+    }
+
+    fn contains_key(&self, k: &Path) -> bool {
+        self.statistics.contains_key(k)
+    }
+
+    fn len(&self) -> usize {
+        self.statistics.len()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::cache::cache_unit::FileStatisticsCache;
+    use crate::cache::CacheAccessor;
+    use chrono::DateTime;
+    use datafusion_common::Statistics;
+    use object_store::path::Path;
+    use object_store::ObjectMeta;

Review Comment:
   old test move here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to