This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 528ed60bcd Improved experience when remote object store URL does not end in / (#17364) 528ed60bcd is described below commit 528ed60bcdf536f6b6b5c2bbbf0d085977d499ec Author: Jensen <czjour...@163.com> AuthorDate: Sat Sep 6 18:37:33 2025 +0800 Improved experience when remote object store URL does not end in / (#17364) * Improved experience when remote object store URL does not end in / * refactor * fix code style * fix * fix * Simplify implementation, add more tests * tweak * Add comments * fix and test non existing --------- Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion-cli/tests/cli_integration.rs | 37 ++++ .../snapshots/s3_url_fallback@s3_url_fallback.snap | 34 +++ datafusion/datasource/src/url.rs | 232 +++++++++++++++++++-- 3 files changed, 284 insertions(+), 19 deletions(-) diff --git a/datafusion-cli/tests/cli_integration.rs b/datafusion-cli/tests/cli_integration.rs index 125771acb3..b92b0790ba 100644 --- a/datafusion-cli/tests/cli_integration.rs +++ b/datafusion-cli/tests/cli_integration.rs @@ -360,3 +360,40 @@ fn test_backtrace_output(#[case] query: &str) { stderr ); } + +#[tokio::test] +async fn test_s3_url_fallback() { + if env::var("TEST_STORAGE_INTEGRATION").is_err() { + eprintln!("Skipping external storages integration tests"); + return; + } + + let container = setup_minio_container().await; + + let mut settings = make_settings(); + settings.set_snapshot_suffix("s3_url_fallback"); + let _bound = settings.bind_to_scope(); + + let port = container.get_host_port_ipv4(9000).await.unwrap(); + + // Create a table using a prefix path (without trailing slash) + // This should trigger the fallback logic where head() fails on the prefix + // and list() is used to discover the actual files + let input = r#"CREATE EXTERNAL TABLE partitioned_data +STORED AS CSV +LOCATION 's3://data/partitioned_csv' +OPTIONS ( + 'format.has_header' 'false' +); + +SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5; +"#; + + assert_cmd_snapshot!(cli() + .env_clear() + .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin") + .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword") + .env("AWS_ENDPOINT", format!("http://localhost:{port}")) + .env("AWS_ALLOW_HTTP", "true") + .pass_stdin(input)); +} diff --git a/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap b/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap new file mode 100644 index 0000000000..07036d041b --- /dev/null +++ b/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap @@ -0,0 +1,34 @@ +--- +source: datafusion-cli/tests/cli_integration.rs +info: + program: datafusion-cli + args: [] + env: + AWS_ACCESS_KEY_ID: TEST-DataFusionLogin + AWS_ALLOW_HTTP: "true" + AWS_ENDPOINT: "http://localhost:32771" + AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword + stdin: "CREATE EXTERNAL TABLE partitioned_data\nSTORED AS CSV\nLOCATION 's3://data/partitioned_csv'\nOPTIONS (\n 'format.has_header' 'false'\n);\n\nSELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;\n" +--- +success: true +exit_code: 0 +----- stdout ----- +[CLI_VERSION] +0 row(s) fetched. +[ELAPSED] + ++----------+----------+----------+ +| column_1 | column_2 | column_3 | ++----------+----------+----------+ +| 0 | 0 | true | +| 0 | 1 | false | +| 0 | 2 | true | +| 0 | 3 | false | +| 0 | 4 | true | ++----------+----------+----------+ +5 row(s) fetched. +[ELAPSED] + +\q + +----- stderr ----- diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 1dc12f7d1d..c87b307c5f 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -242,25 +242,20 @@ impl ListingTableUrl { ) -> Result<BoxStream<'a, Result<ObjectMeta>>> { let exec_options = &ctx.config_options().execution; let ignore_subdirectory = exec_options.listing_table_ignore_subdirectory; - // If the prefix is a file, use a head request, otherwise list - let list = match self.is_collection() { - true => match ctx.runtime_env().cache_manager.get_list_files_cache() { - None => store.list(Some(&self.prefix)), - Some(cache) => { - if let Some(res) = cache.get(&self.prefix) { - debug!("Hit list all files cache"); - futures::stream::iter(res.as_ref().clone().into_iter().map(Ok)) - .boxed() - } else { - let list_res = store.list(Some(&self.prefix)); - let vec = list_res.try_collect::<Vec<ObjectMeta>>().await?; - cache.put(&self.prefix, Arc::new(vec.clone())); - futures::stream::iter(vec.into_iter().map(Ok)).boxed() - } - } - }, - false => futures::stream::once(store.head(&self.prefix)).boxed(), + + let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() { + list_with_cache(ctx, store, &self.prefix).await? + } else { + match store.head(&self.prefix).await { + Ok(meta) => futures::stream::once(async { Ok(meta) }) + .map_err(|e| DataFusionError::ObjectStore(Box::new(e))) + .boxed(), + // If the head command fails, it is likely that object doesn't exist. + // Retry as though it were a prefix (aka a collection) + Err(_) => list_with_cache(ctx, store, &self.prefix).await?, + } }; + Ok(list .try_filter(move |meta| { let path = &meta.location; @@ -268,7 +263,6 @@ impl ListingTableUrl { let glob_match = self.contains(path, ignore_subdirectory); futures::future::ready(extension_match && glob_match) }) - .map_err(|e| DataFusionError::ObjectStore(Box::new(e))) .boxed()) } @@ -306,6 +300,33 @@ impl ListingTableUrl { } } +async fn list_with_cache<'b>( + ctx: &'b dyn Session, + store: &'b dyn ObjectStore, + prefix: &'b Path, +) -> Result<BoxStream<'b, Result<ObjectMeta>>> { + match ctx.runtime_env().cache_manager.get_list_files_cache() { + None => Ok(store + .list(Some(prefix)) + .map(|res| res.map_err(|e| DataFusionError::ObjectStore(Box::new(e)))) + .boxed()), + Some(cache) => { + let vec = if let Some(res) = cache.get(prefix) { + debug!("Hit list all files cache"); + res.as_ref().clone() + } else { + let vec = store + .list(Some(prefix)) + .try_collect::<Vec<ObjectMeta>>() + .await?; + cache.put(prefix, Arc::new(vec.clone())); + vec + }; + Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed()) + } + } +} + /// Creates a file URL from a potentially relative filesystem path #[cfg(not(target_arch = "wasm32"))] fn url_from_filesystem_path(s: &str) -> Option<Url> { @@ -384,6 +405,18 @@ fn split_glob_expression(path: &str) -> Option<(&str, &str)> { #[cfg(test)] mod tests { use super::*; + use datafusion_common::config::TableOptions; + use datafusion_common::DFSchema; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::TaskContext; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_plan::ExecutionPlan; + use object_store::PutPayload; + use std::any::Any; + use std::collections::HashMap; use tempfile::tempdir; #[test] @@ -597,4 +630,165 @@ mod tests { "file path ends with .ext - extension is ext", ); } + + #[tokio::test] + async fn test_list_files() { + let store = object_store::memory::InMemory::new(); + // Create some files: + create_file(&store, "a.parquet").await; + create_file(&store, "/t/b.parquet").await; + create_file(&store, "/t/c.csv").await; + create_file(&store, "/t/d.csv").await; + + assert_eq!( + list_all_files("/", &store, "parquet").await, + vec!["a.parquet"], + ); + + // test with and without trailing slash + assert_eq!( + list_all_files("/t/", &store, "parquet").await, + vec!["t/b.parquet"], + ); + assert_eq!( + list_all_files("/t", &store, "parquet").await, + vec!["t/b.parquet"], + ); + + // test with and without trailing slash + assert_eq!( + list_all_files("/t", &store, "csv").await, + vec!["t/c.csv", "t/d.csv"], + ); + assert_eq!( + list_all_files("/t/", &store, "csv").await, + vec!["t/c.csv", "t/d.csv"], + ); + + // Test a non existing prefix + assert_eq!( + list_all_files("/NonExisting", &store, "csv").await, + vec![] as Vec<String> + ); + assert_eq!( + list_all_files("/NonExisting/", &store, "csv").await, + vec![] as Vec<String> + ); + } + + /// Creates a file with "hello world" content at the specified path + async fn create_file(object_store: &dyn ObjectStore, path: &str) { + object_store + .put(&Path::from(path), PutPayload::from_static(b"hello world")) + .await + .expect("failed to create test file"); + } + + /// Runs "list_all_files" and returns their paths + /// + /// Panic's on error + async fn list_all_files( + url: &str, + store: &dyn ObjectStore, + file_extension: &str, + ) -> Vec<String> { + try_list_all_files(url, store, file_extension) + .await + .unwrap() + } + + /// Runs "list_all_files" and returns their paths + async fn try_list_all_files( + url: &str, + store: &dyn ObjectStore, + file_extension: &str, + ) -> Result<Vec<String>> { + let session = MockSession::new(); + let url = ListingTableUrl::parse(url)?; + let files = url + .list_all_files(&session, store, file_extension) + .await? + .try_collect::<Vec<_>>() + .await? + .into_iter() + .map(|meta| meta.location.as_ref().to_string()) + .collect(); + Ok(files) + } + + struct MockSession { + config: SessionConfig, + runtime_env: Arc<RuntimeEnv>, + } + + impl MockSession { + fn new() -> Self { + Self { + config: SessionConfig::new(), + runtime_env: Arc::new(RuntimeEnv::default()), + } + } + } + + #[async_trait::async_trait] + impl Session for MockSession { + fn session_id(&self) -> &str { + unimplemented!() + } + + fn config(&self) -> &SessionConfig { + &self.config + } + + async fn create_physical_plan( + &self, + _logical_plan: &LogicalPlan, + ) -> Result<Arc<dyn ExecutionPlan>> { + unimplemented!() + } + + fn create_physical_expr( + &self, + _expr: Expr, + _df_schema: &DFSchema, + ) -> Result<Arc<dyn PhysicalExpr>> { + unimplemented!() + } + + fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> { + unimplemented!() + } + + fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> { + unimplemented!() + } + + fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> { + unimplemented!() + } + + fn runtime_env(&self) -> &Arc<RuntimeEnv> { + &self.runtime_env + } + + fn execution_props(&self) -> &ExecutionProps { + unimplemented!() + } + + fn as_any(&self) -> &dyn Any { + unimplemented!() + } + + fn table_options(&self) -> &TableOptions { + unimplemented!() + } + + fn table_options_mut(&mut self) -> &mut TableOptions { + unimplemented!() + } + + fn task_ctx(&self) -> Arc<TaskContext> { + unimplemented!() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org