This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 528fdfb23f Partitioning fixes (#9207)
528fdfb23f is described below

commit 528fdfb23f975b0fa6ca911160db2d512e405c56
Author: Eric Sheppard <[email protected]>
AuthorDate: Thu Feb 15 03:28:57 2024 +1100

    Partitioning fixes (#9207)
    
    * remove hive partition segments from segments
    
    * add test
    
    * avoid allocating vec when there is no glob, remove unused variable
    
    * a few more small fixes
    
    * tests for standard cases and small refactor
---
 datafusion/core/src/datasource/listing/mod.rs | 56 ++++++++++++++++++++++++++-
 datafusion/core/src/datasource/listing/url.rs | 51 ++++++++++++------------
 2 files changed, 81 insertions(+), 26 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index e7583501f9..b8c279c8a7 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -139,8 +139,8 @@ mod tests {
     use datafusion_execution::object_store::{
         DefaultObjectStoreRegistry, ObjectStoreRegistry,
     };
-    use object_store::local::LocalFileSystem;
-    use std::sync::Arc;
+    use object_store::{local::LocalFileSystem, path::Path};
+    use std::{ops::Not, sync::Arc};
     use url::Url;
 
     #[test]
@@ -185,4 +185,56 @@ mod tests {
         let url = ListingTableUrl::parse("../").unwrap();
         sut.get_store(url.as_ref()).unwrap();
     }
+
+    #[test]
+    fn test_url_contains() {
+        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
+
+        // standard case with default config
+        assert!(url.contains(
+            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
+            true
+        ));
+
+        // standard case with `ignore_subdirectory` set to false
+        assert!(url.contains(
+            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
+            false
+        ));
+
+        // as per documentation, when `ignore_subdirectory` is true, we should 
ignore files that aren't
+        // a direct child of the `url`
+        assert!(url
+            .contains(
+                
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
+                true
+            )
+            .not());
+
+        // when we set `ignore_subdirectory` to false, we should not ignore 
the file
+        assert!(url.contains(
+            
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
+            false
+        ));
+
+        // as above, `ignore_subdirectory` is false, so we include the file
+        assert!(url.contains(
+            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
+            false
+        ));
+
+        // in this case, we include the file even when `ignore_subdirectory` 
is true because the
+        // path segment is a hive partition which doesn't count as a 
subdirectory for the purposes
+        // of `Url::contains`
+        assert!(url.contains(
+            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
+            true
+        ));
+
+        // testing an empty path with default config
+        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), 
true));
+
+        // testing an empty path with `ignore_subdirectory` set to false
+        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), 
false));
+    }
 }
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
index 6421edf779..d9149bcc20 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -27,6 +27,7 @@ use glob::Pattern;
 use itertools::Itertools;
 use log::debug;
 use object_store::path::Path;
+use object_store::path::DELIMITER;
 use object_store::{ObjectMeta, ObjectStore};
 use std::sync::Arc;
 use url::Url;
@@ -189,34 +190,37 @@ impl ListingTableUrl {
 
     /// Returns `true` if `path` matches this [`ListingTableUrl`]
     pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
-        match self.strip_prefix(path) {
-            Some(mut segments) => match &self.glob {
-                Some(glob) => {
-                    if ignore_subdirectory {
-                        segments
-                            .next()
-                            .map_or(false, |file_name| glob.matches(file_name))
-                    } else {
-                        let stripped = segments.join("/");
-                        glob.matches(&stripped)
-                    }
-                }
-                None => {
-                    if ignore_subdirectory {
-                        let has_subdirectory = 
segments.collect::<Vec<_>>().len() > 1;
-                        !has_subdirectory
-                    } else {
-                        true
-                    }
+        let Some(all_segments) = self.strip_prefix(path) else {
+            return false;
+        };
+
+        // remove any segments that contain `=` as they are allowed even
+        // when ignore subdirectories is `true`.
+        let mut segments = all_segments.filter(|s| !s.contains('='));
+
+        match &self.glob {
+            Some(glob) => {
+                if ignore_subdirectory {
+                    segments
+                        .next()
+                        .map_or(false, |file_name| glob.matches(file_name))
+                } else {
+                    let stripped = segments.join(DELIMITER);
+                    glob.matches(&stripped)
                 }
-            },
-            None => false,
+            }
+            // where we are ignoring subdirectories, we require
+            // the path to be either empty, or contain just the
+            // final file name segment.
+            None if ignore_subdirectory => segments.count() <= 1,
+            // in this case, any valid path at or below the url is allowed
+            None => true,
         }
     }
 
     /// Returns `true` if `path` refers to a collection of objects
     pub fn is_collection(&self) -> bool {
-        self.url.as_str().ends_with('/')
+        self.url.as_str().ends_with(DELIMITER)
     }
 
     /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
@@ -225,7 +229,6 @@ impl ListingTableUrl {
         &'a self,
         path: &'b Path,
     ) -> Option<impl Iterator<Item = &'b str> + 'a> {
-        use object_store::path::DELIMITER;
         let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
         if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
             stripped = stripped.strip_prefix(DELIMITER)?;
@@ -234,7 +237,7 @@ impl ListingTableUrl {
     }
 
     /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
-    pub(crate) async fn list_all_files<'a>(
+    pub async fn list_all_files<'a>(
         &'a self,
         ctx: &'a SessionState,
         store: &'a dyn ObjectStore,

Reply via email to