This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 528ed60bcd Improved experience when remote object store URL does not 
end in /  (#17364)
528ed60bcd is described below

commit 528ed60bcdf536f6b6b5c2bbbf0d085977d499ec
Author: Jensen <czjour...@163.com>
AuthorDate: Sat Sep 6 18:37:33 2025 +0800

    Improved experience when remote object store URL does not end in /  (#17364)
    
    * Improved experience when remote object store URL does not end in /
    
    * refactor
    
    * fix code style
    
    * fix
    
    * fix
    
    * Simplify implementation, add more tests
    
    * tweak
    
    * Add comments
    
    * fix and test non existing
    
    ---------
    
    Co-authored-by: Andrew Lamb <and...@nerdnetworks.org>
---
 datafusion-cli/tests/cli_integration.rs            |  37 ++++
 .../snapshots/s3_url_fallback@s3_url_fallback.snap |  34 +++
 datafusion/datasource/src/url.rs                   | 232 +++++++++++++++++++--
 3 files changed, 284 insertions(+), 19 deletions(-)

diff --git a/datafusion-cli/tests/cli_integration.rs 
b/datafusion-cli/tests/cli_integration.rs
index 125771acb3..b92b0790ba 100644
--- a/datafusion-cli/tests/cli_integration.rs
+++ b/datafusion-cli/tests/cli_integration.rs
@@ -360,3 +360,40 @@ fn test_backtrace_output(#[case] query: &str) {
         stderr
     );
 }
+
+#[tokio::test]
+async fn test_s3_url_fallback() {
+    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
+        eprintln!("Skipping external storages integration tests");
+        return;
+    }
+
+    let container = setup_minio_container().await;
+
+    let mut settings = make_settings();
+    settings.set_snapshot_suffix("s3_url_fallback");
+    let _bound = settings.bind_to_scope();
+
+    let port = container.get_host_port_ipv4(9000).await.unwrap();
+
+    // Create a table using a prefix path (without trailing slash)
+    // This should trigger the fallback logic where head() fails on the prefix
+    // and list() is used to discover the actual files
+    let input = r#"CREATE EXTERNAL TABLE partitioned_data
+STORED AS CSV
+LOCATION 's3://data/partitioned_csv'
+OPTIONS (
+    'format.has_header' 'false'
+);
+
+SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
+"#;
+
+    assert_cmd_snapshot!(cli()
+        .env_clear()
+        .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
+        .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
+        .env("AWS_ENDPOINT", format!("http://localhost:{port}";))
+        .env("AWS_ALLOW_HTTP", "true")
+        .pass_stdin(input));
+}
diff --git 
a/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap 
b/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap
new file mode 100644
index 0000000000..07036d041b
--- /dev/null
+++ b/datafusion-cli/tests/snapshots/s3_url_fallback@s3_url_fallback.snap
@@ -0,0 +1,34 @@
+---
+source: datafusion-cli/tests/cli_integration.rs
+info:
+  program: datafusion-cli
+  args: []
+  env:
+    AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
+    AWS_ALLOW_HTTP: "true"
+    AWS_ENDPOINT: "http://localhost:32771";
+    AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
+  stdin: "CREATE EXTERNAL TABLE partitioned_data\nSTORED AS CSV\nLOCATION 
's3://data/partitioned_csv'\nOPTIONS (\n    'format.has_header' 
'false'\n);\n\nSELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 
5;\n"
+---
+success: true
+exit_code: 0
+----- stdout -----
+[CLI_VERSION]
+0 row(s) fetched. 
+[ELAPSED]
+
++----------+----------+----------+
+| column_1 | column_2 | column_3 |
++----------+----------+----------+
+| 0        | 0        | true     |
+| 0        | 1        | false    |
+| 0        | 2        | true     |
+| 0        | 3        | false    |
+| 0        | 4        | true     |
++----------+----------+----------+
+5 row(s) fetched. 
+[ELAPSED]
+
+\q
+
+----- stderr -----
diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs
index 1dc12f7d1d..c87b307c5f 100644
--- a/datafusion/datasource/src/url.rs
+++ b/datafusion/datasource/src/url.rs
@@ -242,25 +242,20 @@ impl ListingTableUrl {
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
-        // If the prefix is a file, use a head request, otherwise list
-        let list = match self.is_collection() {
-            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
-                None => store.list(Some(&self.prefix)),
-                Some(cache) => {
-                    if let Some(res) = cache.get(&self.prefix) {
-                        debug!("Hit list all files cache");
-                        
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
-                            .boxed()
-                    } else {
-                        let list_res = store.list(Some(&self.prefix));
-                        let vec = 
list_res.try_collect::<Vec<ObjectMeta>>().await?;
-                        cache.put(&self.prefix, Arc::new(vec.clone()));
-                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
-                    }
-                }
-            },
-            false => futures::stream::once(store.head(&self.prefix)).boxed(),
+
+        let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
+            list_with_cache(ctx, store, &self.prefix).await?
+        } else {
+            match store.head(&self.prefix).await {
+                Ok(meta) => futures::stream::once(async { Ok(meta) })
+                    .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
+                    .boxed(),
+                // If the head command fails, it is likely that object doesn't 
exist.
+                // Retry as though it were a prefix (aka a collection)
+                Err(_) => list_with_cache(ctx, store, &self.prefix).await?,
+            }
         };
+
         Ok(list
             .try_filter(move |meta| {
                 let path = &meta.location;
@@ -268,7 +263,6 @@ impl ListingTableUrl {
                 let glob_match = self.contains(path, ignore_subdirectory);
                 futures::future::ready(extension_match && glob_match)
             })
-            .map_err(|e| DataFusionError::ObjectStore(Box::new(e)))
             .boxed())
     }
 
@@ -306,6 +300,33 @@ impl ListingTableUrl {
     }
 }
 
+async fn list_with_cache<'b>(
+    ctx: &'b dyn Session,
+    store: &'b dyn ObjectStore,
+    prefix: &'b Path,
+) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
+    match ctx.runtime_env().cache_manager.get_list_files_cache() {
+        None => Ok(store
+            .list(Some(prefix))
+            .map(|res| res.map_err(|e| 
DataFusionError::ObjectStore(Box::new(e))))
+            .boxed()),
+        Some(cache) => {
+            let vec = if let Some(res) = cache.get(prefix) {
+                debug!("Hit list all files cache");
+                res.as_ref().clone()
+            } else {
+                let vec = store
+                    .list(Some(prefix))
+                    .try_collect::<Vec<ObjectMeta>>()
+                    .await?;
+                cache.put(prefix, Arc::new(vec.clone()));
+                vec
+            };
+            Ok(futures::stream::iter(vec.into_iter().map(Ok)).boxed())
+        }
+    }
+}
+
 /// Creates a file URL from a potentially relative filesystem path
 #[cfg(not(target_arch = "wasm32"))]
 fn url_from_filesystem_path(s: &str) -> Option<Url> {
@@ -384,6 +405,18 @@ fn split_glob_expression(path: &str) -> Option<(&str, 
&str)> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use datafusion_common::config::TableOptions;
+    use datafusion_common::DFSchema;
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use datafusion_execution::TaskContext;
+    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, 
WindowUDF};
+    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+    use datafusion_physical_plan::ExecutionPlan;
+    use object_store::PutPayload;
+    use std::any::Any;
+    use std::collections::HashMap;
     use tempfile::tempdir;
 
     #[test]
@@ -597,4 +630,165 @@ mod tests {
             "file path ends with .ext - extension is ext",
         );
     }
+
+    #[tokio::test]
+    async fn test_list_files() {
+        let store = object_store::memory::InMemory::new();
+        // Create some files:
+        create_file(&store, "a.parquet").await;
+        create_file(&store, "/t/b.parquet").await;
+        create_file(&store, "/t/c.csv").await;
+        create_file(&store, "/t/d.csv").await;
+
+        assert_eq!(
+            list_all_files("/", &store, "parquet").await,
+            vec!["a.parquet"],
+        );
+
+        // test with and without trailing slash
+        assert_eq!(
+            list_all_files("/t/", &store, "parquet").await,
+            vec!["t/b.parquet"],
+        );
+        assert_eq!(
+            list_all_files("/t", &store, "parquet").await,
+            vec!["t/b.parquet"],
+        );
+
+        // test with and without trailing slash
+        assert_eq!(
+            list_all_files("/t", &store, "csv").await,
+            vec!["t/c.csv", "t/d.csv"],
+        );
+        assert_eq!(
+            list_all_files("/t/", &store, "csv").await,
+            vec!["t/c.csv", "t/d.csv"],
+        );
+
+        // Test a non existing prefix
+        assert_eq!(
+            list_all_files("/NonExisting", &store, "csv").await,
+            vec![] as Vec<String>
+        );
+        assert_eq!(
+            list_all_files("/NonExisting/", &store, "csv").await,
+            vec![] as Vec<String>
+        );
+    }
+
+    /// Creates a file with "hello world" content at the specified path
+    async fn create_file(object_store: &dyn ObjectStore, path: &str) {
+        object_store
+            .put(&Path::from(path), PutPayload::from_static(b"hello world"))
+            .await
+            .expect("failed to create test file");
+    }
+
+    /// Runs "list_all_files" and returns their paths
+    ///
+    /// Panic's on error
+    async fn list_all_files(
+        url: &str,
+        store: &dyn ObjectStore,
+        file_extension: &str,
+    ) -> Vec<String> {
+        try_list_all_files(url, store, file_extension)
+            .await
+            .unwrap()
+    }
+
+    /// Runs "list_all_files" and returns their paths
+    async fn try_list_all_files(
+        url: &str,
+        store: &dyn ObjectStore,
+        file_extension: &str,
+    ) -> Result<Vec<String>> {
+        let session = MockSession::new();
+        let url = ListingTableUrl::parse(url)?;
+        let files = url
+            .list_all_files(&session, store, file_extension)
+            .await?
+            .try_collect::<Vec<_>>()
+            .await?
+            .into_iter()
+            .map(|meta| meta.location.as_ref().to_string())
+            .collect();
+        Ok(files)
+    }
+
+    struct MockSession {
+        config: SessionConfig,
+        runtime_env: Arc<RuntimeEnv>,
+    }
+
+    impl MockSession {
+        fn new() -> Self {
+            Self {
+                config: SessionConfig::new(),
+                runtime_env: Arc::new(RuntimeEnv::default()),
+            }
+        }
+    }
+
+    #[async_trait::async_trait]
+    impl Session for MockSession {
+        fn session_id(&self) -> &str {
+            unimplemented!()
+        }
+
+        fn config(&self) -> &SessionConfig {
+            &self.config
+        }
+
+        async fn create_physical_plan(
+            &self,
+            _logical_plan: &LogicalPlan,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            unimplemented!()
+        }
+
+        fn create_physical_expr(
+            &self,
+            _expr: Expr,
+            _df_schema: &DFSchema,
+        ) -> Result<Arc<dyn PhysicalExpr>> {
+            unimplemented!()
+        }
+
+        fn scalar_functions(&self) -> &HashMap<String, Arc<ScalarUDF>> {
+            unimplemented!()
+        }
+
+        fn aggregate_functions(&self) -> &HashMap<String, Arc<AggregateUDF>> {
+            unimplemented!()
+        }
+
+        fn window_functions(&self) -> &HashMap<String, Arc<WindowUDF>> {
+            unimplemented!()
+        }
+
+        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+            &self.runtime_env
+        }
+
+        fn execution_props(&self) -> &ExecutionProps {
+            unimplemented!()
+        }
+
+        fn as_any(&self) -> &dyn Any {
+            unimplemented!()
+        }
+
+        fn table_options(&self) -> &TableOptions {
+            unimplemented!()
+        }
+
+        fn table_options_mut(&mut self) -> &mut TableOptions {
+            unimplemented!()
+        }
+
+        fn task_ctx(&self) -> Arc<TaskContext> {
+            unimplemented!()
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to