Ted-Jiang commented on code in PR #7620:
URL: https://github.com/apache/arrow-datafusion/pull/7620#discussion_r1334179142
##########
datafusion/core/tests/parquet/file_statistics.rs:
##########
@@ -19,98 +19,190 @@ use
datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
+use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit;
-use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::cache::cache_unit::{
+ DefaultFileStatisticsCache, DefaultListFilesCache,
+};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use std::fs;
use std::sync::Arc;
+use tempfile::tempdir;
#[tokio::test]
async fn load_table_stats_with_session_level_cache() {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
- let (cache1, state1) = get_cache_runtime_state();
+ let (cache1, _, state1) = get_cache_runtime_state();
// Create a separate DefaultFileStatisticsCache
- let (cache2, state2) = get_cache_runtime_state();
+ let (cache2, _, state2) = get_cache_runtime_state();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
- let table1 = get_listing_with_cache(&table_path, cache1, &state1,
&opt).await;
- let table2 = get_listing_with_cache(&table_path, cache2, &state2,
&opt).await;
+ let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
+ let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;
//Session 1 first time list files
- assert_eq!(get_cache_size(&state1), 0);
+ assert_eq!(get_static_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(exec1.statistics().num_rows, Some(8));
assert_eq!(exec1.statistics().total_byte_size, Some(671));
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
//Session 2 first time list files
//check session 1 cache result not show in session 2
- assert_eq!(
- state2
- .runtime_env()
- .cache_manager
- .get_file_statistic_cache()
- .unwrap()
- .len(),
- 0
- );
+ assert_eq!(get_static_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
assert_eq!(exec2.statistics().num_rows, Some(8));
assert_eq!(exec2.statistics().total_byte_size, Some(671));
- assert_eq!(get_cache_size(&state2), 1);
+ assert_eq!(get_static_cache_size(&state2), 1);
//Session 1 second time list files
//check session 1 cache result not show in session 2
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(exec3.statistics().num_rows, Some(8));
assert_eq!(exec3.statistics().total_byte_size, Some(671));
// List same file no increase
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
}
-async fn get_listing_with_cache(
+#[tokio::test]
+async fn list_files_with_session_level_cache() {
+ let p_name = "alltypes_plain.parquet";
+ let testdata = datafusion::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, p_name);
+
+ let temp_path1 = tempdir()
+ .unwrap()
+ .into_path()
+ .into_os_string()
+ .into_string()
+ .unwrap();
+ let temp_filename1 = format!("{}/{}", temp_path1, p_name);
+
+ let temp_path2 = tempdir()
+ .unwrap()
+ .into_path()
+ .into_os_string()
+ .into_string()
+ .unwrap();
+ let temp_filename2 = format!("{}/{}", temp_path2, p_name);
+
+ fs::copy(filename.clone(), temp_filename1).expect("panic");
+ fs::copy(filename, temp_filename2).expect("panic");
+
+ let table_path = ListingTableUrl::parse(temp_path1).unwrap();
+
+ let (_, _, state1) = get_cache_runtime_state();
+
+ // Create a separate DefaultFileStatisticsCache
+ let (_, _, state2) = get_cache_runtime_state();
+
+ let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+
+ let table1 = get_listing_table(&table_path, None, &opt).await;
+ let table2 = get_listing_table(&table_path, None, &opt).await;
+
+ //Session 1 first time list files
+ assert_eq!(get_list_file_cache_size(&state1), 0);
+ let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
+ let parquet1 = exec1.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let fg = &parquet1.base_config().file_groups;
+ assert_eq!(fg.len(), 1);
+ assert_eq!(fg.get(0).unwrap().len(), 1);
+
+ //Session 2 first time list files
+ //check session 1 cache result not show in session 2
+ assert_eq!(get_list_file_cache_size(&state2), 0);
+ let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
+ let parquet2 = exec2.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state2), 1);
+ let fg2 = &parquet2.base_config().file_groups;
+ assert_eq!(fg2.len(), 1);
+ assert_eq!(fg2.get(0).unwrap().len(), 1);
+
+ //Session 1 second time list files
+ //check session 1 cache result not show in session 2
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
+ let parquet3 = exec3.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let fg = &parquet3.base_config().file_groups;
+ assert_eq!(fg.len(), 1);
+ assert_eq!(fg.get(0).unwrap().len(), 1);
+ // List same file no increase
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+}
+
+async fn get_listing_table(
table_path: &ListingTableUrl,
- cache1: Arc<DefaultFileStatisticsCache>,
- state1: &SessionState,
+ static_cache: Option<Arc<DefaultFileStatisticsCache>>,
opt: &ListingOptions,
) -> ListingTable {
- let schema = opt.infer_schema(state1, table_path).await.unwrap();
+ let schema = opt
+ .infer_schema(&SessionState::default(), table_path)
Review Comment:
Here infer_schema will call list_files as well, so create a new Session
state here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]