This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 528fdfb23f Partitioning fixes (#9207)
528fdfb23f is described below
commit 528fdfb23f975b0fa6ca911160db2d512e405c56
Author: Eric Sheppard <[email protected]>
AuthorDate: Thu Feb 15 03:28:57 2024 +1100
Partitioning fixes (#9207)
* remove hive partition segments from segments
* add test
* avoid allocating vec when there is no glob, remove unused variable
* a few more small fixes
* tests for standard cases and small refactor
---
datafusion/core/src/datasource/listing/mod.rs | 56 ++++++++++++++++++++++++++-
datafusion/core/src/datasource/listing/url.rs | 51 ++++++++++++------------
2 files changed, 81 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index e7583501f9..b8c279c8a7 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -139,8 +139,8 @@ mod tests {
use datafusion_execution::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
- use object_store::local::LocalFileSystem;
- use std::sync::Arc;
+ use object_store::{local::LocalFileSystem, path::Path};
+ use std::{ops::Not, sync::Arc};
use url::Url;
#[test]
@@ -185,4 +185,56 @@ mod tests {
let url = ListingTableUrl::parse("../").unwrap();
sut.get_store(url.as_ref()).unwrap();
}
+
+ #[test]
+ fn test_url_contains() {
+ let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
+
+ // standard case with default config
+ assert!(url.contains(
+ &Path::parse("/var/data/mytable/data.parquet").unwrap(),
+ true
+ ));
+
+ // standard case with `ignore_subdirectory` set to false
+ assert!(url.contains(
+ &Path::parse("/var/data/mytable/data.parquet").unwrap(),
+ false
+ ));
+
+ // as per documentation, when `ignore_subdirectory` is true, we should
ignore files that aren't
+ // a direct child of the `url`
+ assert!(url
+ .contains(
+
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
+ true
+ )
+ .not());
+
+ // when we set `ignore_subdirectory` to false, we should not ignore
the file
+ assert!(url.contains(
+
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
+ false
+ ));
+
+ // as above, `ignore_subdirectory` is false, so we include the file
+ assert!(url.contains(
+ &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
+ false
+ ));
+
+ // in this case, we include the file even when `ignore_subdirectory`
is true because the
+ // path segment is a hive partition which doesn't count as a
subdirectory for the purposes
+ // of `Url::contains`
+ assert!(url.contains(
+ &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
+ true
+ ));
+
+ // testing an empty path with default config
+ assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(),
true));
+
+ // testing an empty path with `ignore_subdirectory` set to false
+ assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(),
false));
+ }
}
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 6421edf779..d9149bcc20 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -27,6 +27,7 @@ use glob::Pattern;
use itertools::Itertools;
use log::debug;
use object_store::path::Path;
+use object_store::path::DELIMITER;
use object_store::{ObjectMeta, ObjectStore};
use std::sync::Arc;
use url::Url;
@@ -189,34 +190,37 @@ impl ListingTableUrl {
/// Returns `true` if `path` matches this [`ListingTableUrl`]
pub fn contains(&self, path: &Path, ignore_subdirectory: bool) -> bool {
- match self.strip_prefix(path) {
- Some(mut segments) => match &self.glob {
- Some(glob) => {
- if ignore_subdirectory {
- segments
- .next()
- .map_or(false, |file_name| glob.matches(file_name))
- } else {
- let stripped = segments.join("/");
- glob.matches(&stripped)
- }
- }
- None => {
- if ignore_subdirectory {
- let has_subdirectory =
segments.collect::<Vec<_>>().len() > 1;
- !has_subdirectory
- } else {
- true
- }
+ let Some(all_segments) = self.strip_prefix(path) else {
+ return false;
+ };
+
+ // remove any segments that contain `=` as they are allowed even
+ // when ignore subdirectories is `true`.
+ let mut segments = all_segments.filter(|s| !s.contains('='));
+
+ match &self.glob {
+ Some(glob) => {
+ if ignore_subdirectory {
+ segments
+ .next()
+ .map_or(false, |file_name| glob.matches(file_name))
+ } else {
+ let stripped = segments.join(DELIMITER);
+ glob.matches(&stripped)
}
- },
- None => false,
+ }
+ // where we are ignoring subdirectories, we require
+ // the path to be either empty, or contain just the
+ // final file name segment.
+ None if ignore_subdirectory => segments.count() <= 1,
+ // in this case, any valid path at or below the url is allowed
+ None => true,
}
}
/// Returns `true` if `path` refers to a collection of objects
pub fn is_collection(&self) -> bool {
- self.url.as_str().ends_with('/')
+ self.url.as_str().ends_with(DELIMITER)
}
/// Strips the prefix of this [`ListingTableUrl`] from the provided path,
returning
@@ -225,7 +229,6 @@ impl ListingTableUrl {
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
- use object_store::path::DELIMITER;
let mut stripped = path.as_ref().strip_prefix(self.prefix.as_ref())?;
if !stripped.is_empty() && !self.prefix.as_ref().is_empty() {
stripped = stripped.strip_prefix(DELIMITER)?;
@@ -234,7 +237,7 @@ impl ListingTableUrl {
}
/// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
- pub(crate) async fn list_all_files<'a>(
+ pub async fn list_all_files<'a>(
&'a self,
ctx: &'a SessionState,
store: &'a dyn ObjectStore,