tustvold commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r877010770


##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,217 @@
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, the string will be interpreted as a
+    /// path on the local filesystem, using the operating system's
+    /// standard path delimiter - i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+        Ok(match Url::parse(s) {
+            Ok(url) => Self { url, glob: None },
+            Err(url::ParseError::RelativeUrlWithoutBase) => {
+                let (prefix, glob) = match split_glob_expression(s) {
+                    Some((prefix, glob)) => {
+                        let glob = Pattern::new(glob)
+                            .map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+                        (prefix, Some(glob))
+                    }
+                    None => (s, None),
+                };
+
+                let path = std::path::Path::new(prefix).canonicalize()?;
+                let url = match path.is_file() {
+                    true => Url::from_file_path(path).unwrap(),
+                    false => Url::from_directory_path(path).unwrap(),
+                };
+
+                Self { url, glob }
+            }
+            Err(e) => return Err(DataFusionError::External(Box::new(e))),
+        })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this has a leading `/`
+    /// and describes an absolute path on the local filesystem
+    ///
+    /// For other URLs, this also contains the host component
+    /// and lacks a leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {

Review Comment:
   As part of #2489 I hope to bring some consistency to what the paths passed 
to the ObjectStore actually are. It's currently a bit confused



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,217 @@
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, the string will be interpreted as a
+    /// path on the local filesystem, using the operating system's
+    /// standard path delimiter - i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+        Ok(match Url::parse(s) {
+            Ok(url) => Self { url, glob: None },
+            Err(url::ParseError::RelativeUrlWithoutBase) => {
+                let (prefix, glob) = match split_glob_expression(s) {
+                    Some((prefix, glob)) => {
+                        let glob = Pattern::new(glob)
+                            .map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+                        (prefix, Some(glob))
+                    }
+                    None => (s, None),
+                };
+
+                let path = std::path::Path::new(prefix).canonicalize()?;
+                let url = match path.is_file() {
+                    true => Url::from_file_path(path).unwrap(),
+                    false => Url::from_directory_path(path).unwrap(),
+                };
+
+                Self { url, glob }
+            }
+            Err(e) => return Err(DataFusionError::External(Box::new(e))),
+        })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this has a leading `/`
+    /// and describes an absolute path on the local filesystem
+    ///
+    /// For other URLs, this also contains the host component
+    /// and lacks a leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => self.url.path(),
+            _ => 
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
+    /// an iterator of the remaining path segments
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split('/').filter(|s| !s.is_empty()),
+            self.prefix().split('/').filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
+    pub(crate) fn list_all_files<'a>(

Review Comment:
   This combines the various different ObjectStore listing methods into a 
single method. This is clearer and more consistent



##########
datafusion/core/src/catalog/schema.rs:
##########
@@ -157,10 +157,7 @@ impl ObjectStoreSchemaProvider {
     }
 
     /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
-        &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+    pub fn object_store(&self, uri: &ListingTableUrl) -> Result<Arc<dyn 
ObjectStore>> {

Review Comment:
   The scheme stripping logic is now part of ListingTableUrl



##########
datafusion/core/src/test_util.rs:
##########
@@ -203,7 +203,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> 
Result<PathBuf, Box<dyn
         if !trimmed.is_empty() {
             let pb = PathBuf::from(trimmed);
             if pb.is_dir() {
-                return Ok(pb);
+                return Ok(pb.canonicalize().unwrap());

Review Comment:
   This is necessary for normalize_for_explain to work correctly



##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   I'm not entirely sure what change caused this... I wonder if we aren't 
running doctests in CI :thinking: 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -504,7 +504,7 @@ mod tests {
                 "bucket/key-prefix/file3",
                 "bucket/key-prefix/file4",
             ],
-            "bucket/key-prefix/",
+            "file:///bucket/key-prefix/",

Review Comment:
   These files don't actually exist, so we need to fully qualify the URLs



##########
datafusion/data-access/src/object_store/mod.rs:
##########
@@ -137,144 +85,3 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Get object reader for one file
     fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
 }
-
-/// Normalize a path without requiring it to exist on the filesystem 
(path::canonicalize)
-pub fn normalize_path<P: AsRef<Path>>(path: P) -> PathBuf {
-    let ends_with_slash = path
-        .as_ref()
-        .to_str()
-        .map_or(false, |s| s.ends_with(path::MAIN_SEPARATOR));
-    let mut normalized = PathBuf::new();
-    for component in path.as_ref().components() {
-        match &component {
-            Component::ParentDir => {
-                if !normalized.pop() {
-                    normalized.push(component);
-                }
-            }
-            _ => {
-                normalized.push(component);
-            }
-        }
-    }
-    if ends_with_slash {
-        normalized.push("");
-    }
-    normalized
-}
-
-const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
-
-/// Determine whether the path contains a globbing character
-fn contains_glob_start_char(path: &str) -> bool {
-    path.chars().any(|c| GLOB_START_CHARS.contains(&c))
-}
-
-/// Filters the file_stream to only contain files that end with suffix
-fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> 
Result<FileMetaStream> {
-    let suffix = suffix.to_owned();
-    Ok(Box::pin(
-        file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
-    ))
-}
-
-fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String 
{
-    // in case the glob_pattern is not actually a glob pattern, take the 
entire thing
-    if !contains_glob_start_char(glob_pattern) {
-        glob_pattern.to_string()
-    } else {
-        // take all the components of the path (left-to-right) which do not 
contain a glob pattern
-        let components_in_glob_pattern = Path::new(glob_pattern).components();
-        let mut path_buf_for_longest_search_path_without_glob_pattern = 
PathBuf::new();
-        for component_in_glob_pattern in components_in_glob_pattern {
-            let component_as_str =
-                component_in_glob_pattern.as_os_str().to_str().unwrap();
-            if contains_glob_start_char(component_as_str) {
-                break;
-            }
-            path_buf_for_longest_search_path_without_glob_pattern
-                .push(component_in_glob_pattern);
-        }
-
-        let mut result = path_buf_for_longest_search_path_without_glob_pattern
-            .to_str()
-            .unwrap()
-            .to_string();
-
-        // when we're not at the root, append a separator
-        if path_buf_for_longest_search_path_without_glob_pattern
-            .components()
-            .count()
-            > 1
-        {
-            result.push(path::MAIN_SEPARATOR);
-        }
-        result
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[tokio::test]
-    async fn test_is_glob_path() -> Result<()> {
-        assert!(!contains_glob_start_char("/"));
-        assert!(!contains_glob_start_char("/test"));
-        assert!(!contains_glob_start_char("/test/"));
-        assert!(contains_glob_start_char("/test*"));
-        Ok(())
-    }
-
-    fn test_longest_base_path(input: &str, expected: &str) {
-        assert_eq!(
-            find_longest_search_path_without_glob_pattern(input),
-            expected,
-            "testing find_longest_search_path_without_glob_pattern with {}",
-            input
-        );
-    }
-
-    #[tokio::test]
-    async fn test_find_longest_search_path_without_glob_pattern() -> 
Result<()> {

Review Comment:
   This test is reformulated and moved to ListingTableUrl



##########
datafusion/data-access/src/object_store/mod.rs:
##########
@@ -78,54 +74,6 @@ pub trait ObjectStore: Sync + Send + Debug {
     /// Returns all the files in path `prefix`
     async fn list_file(&self, prefix: &str) -> Result<FileMetaStream>;
 
-    /// Calls `list_file` with a suffix filter

Review Comment:
   This logic is now encapsulated in ListingTableUrl



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to