alamb commented on code in PR #2578:
URL: https://github.com/apache/arrow-datafusion/pull/2578#discussion_r884746154


##########
benchmarks/src/bin/tpch.rs:
##########
@@ -50,6 +50,7 @@ use datafusion::{
 
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;

Review Comment:
   I wonder if it is time to move `datasource` to its own crate, if possible 🤔 



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   minor comment: maybe this file should be called "url.rs" for consistency



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a 
drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => 
Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => 
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = 
&self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, 
returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);
+    }
+
+    #[test]
+    fn test_split_glob() {
+        fn test(input: &str, expected: Option<(&str, &str)>) {
+            assert_eq!(
+                split_glob_expression(input),
+                expected,
+                "testing split_glob_expression with {}",
+                input
+            );
+        }
+
+        // no glob patterns
+        test("/", None);
+        test("/a.txt", None);
+        test("/a", None);
+        test("/a/", None);
+        test("/a/b", None);
+        test("/a/b/", None);
+        test("/a/b.txt", None);
+        test("/a/b/c.txt", None);
+        // glob patterns, thus we build the longest path (os-specific)
+        test("*.txt", Some((".", "*.txt")));
+        test("/*.txt", Some(("/", "*.txt")));
+        test("/a/*b.txt", Some(("/a/", "*b.txt")));
+        test("/a/*/b.txt", Some(("/a/", "*/b.txt")));
+        test("/a/b/[123]/file*.txt", Some(("/a/b/", "[123]/file*.txt")));
+        test("/a/b*.txt", Some(("/a/", "b*.txt")));
+        test("/a/b/**/c*.txt", Some(("/a/b/", "**/c*.txt")));
+
+        // https://github.com/apache/arrow-datafusion/issues/2465

Review Comment:
   https://github.com/apache/arrow-datafusion/issues/2465 is closed -- is this 
meant to refer to another ticket? Or is it saying this isa test for that issue 
🤔 



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -601,50 +557,29 @@ mod tests {
         assert_eq!(
             None,
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/v1/file.csv",
                 &[String::from("mypartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1", "v2"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition"), String::from("otherpartition")]
             )
         );
         assert_eq!(
             Some(vec!["v1"]),
             parse_partitions_for_path(
-                "bucket/mytable",
+                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
                 "bucket/mytable/mypartition=v1/otherpartition=v2/file.csv",
                 &[String::from("mypartition")]
             )
         );
     }
 
-    #[cfg(target_os = "windows")]

Review Comment:
   Why were these tests removed? I think ensuring we have proper windows 
coverage is important



##########
datafusion/core/src/lib.rs:
##########
@@ -52,11 +52,11 @@
 //!    .to_string();
 //!
 //! let expected = vec![
-//!     "+---+--------------------------+",
-//!     "| a | MIN(tests/example.csv.b) |",
-//!     "+---+--------------------------+",
-//!     "| 1 | 2                        |",
-//!     "+---+--------------------------+"
+//!     "+---+----------------+",
+//!     "| a | MIN(?table?.b) |",

Review Comment:
   it seems ok to me. I am not sure if the `use the filename as table name` 
function of csv was intentional (like maybe it is some sort of dataframe 
compatibility)?
   
   cc @andygrove 



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -68,6 +70,8 @@ lazy_static! {
 pub struct FileScanConfig {
     /// Store from which the `files` should be fetched
     pub object_store: Arc<dyn ObjectStore>,
+    /// Object store URL
+    pub object_store_url: ObjectStoreUrl,

Review Comment:
   worth a tracking ticket?



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered

Review Comment:
   I think we should also explicitly say here that globs are only supported 
with local files, not when the path is a url. 



##########
datafusion/core/src/datasource/listing/helpers.rs:
##########
@@ -476,33 +426,34 @@ mod tests {
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
             store.as_ref(),
-            "tablepath/",
+            &ListingTableUrl::parse("file:///tablepath/").unwrap(),

Review Comment:
   these tests certainly look much nicer



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a 
drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => 
Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {

Review Comment:
   Is there any use case for this being `pub`? Like to allow users to force the 
parsing to treat it like a path



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path

Review Comment:
   👍  this is great. Thank you for this writeup. I think it makes sense to me
   
   @yjshen  is this behavior ok with you?



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
+//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: 
{}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations 
for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read 
local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the 
registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {

Review Comment:
   Is the `Clone::clone` needed here? I thought `get` already returned an owned 
`Arc`?



##########
datafusion/core/src/catalog/schema.rs:
##########
@@ -156,31 +156,33 @@ impl ObjectStoreSchemaProvider {
             .register_store(scheme.into(), object_store)
     }
 
-    /// Retrieves a `ObjectStore` instance by scheme
-    pub fn object_store<'a>(
+    /// Retrieves a `ObjectStore` instance for a given Url
+    pub fn object_store(
         &self,
-        uri: &'a str,
-    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+        url: impl AsRef<url::Url>,

Review Comment:
   this seems like a reasonable change so that object store paths are always 
identified by url 👍 
   



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
+//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: 
{}",

Review Comment:
   does it technically require an `authority`? I thought authority refers to 
`hostname:port` whereas I think the object store only requires a hostname



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a 
drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => 
Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => 
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = 
&self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, 
returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]

Review Comment:
   I am a big fan of the consolidated logic and ability to test here 💯 



##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -811,25 +813,57 @@ pub fn table_with_sequence(
     Ok(Arc::new(MemTable::try_new(schema, partitions)?))
 }
 
-// Normalizes parts of an explain plan that vary from run to run (such as path)
-fn normalize_for_explain(s: &str) -> String {
-    // Convert things like 
/Users/alamb/Software/arrow/testing/data/csv/aggregate_test_100.csv
-    // to ARROW_TEST_DATA/csv/aggregate_test_100.csv
-    let data_path = datafusion::test_util::arrow_test_data();
-    let s = s.replace(&data_path, "ARROW_TEST_DATA");
+pub struct ExplainNormalizer {

Review Comment:
   👍 



##########
datafusion/core/src/datasource/listing/path.rs:
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::datasource::object_store::ObjectStoreUrl;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::ObjectStore;
+use datafusion_data_access::FileMeta;
+use futures::stream::BoxStream;
+use futures::{StreamExt, TryStreamExt};
+use glob::Pattern;
+use itertools::Itertools;
+use std::path::is_separator;
+use url::Url;
+
+/// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
+/// for more information on the supported expressions
+#[derive(Debug, Clone)]
+pub struct ListingTableUrl {
+    /// A URL that identifies a file or directory to list files from
+    url: Url,
+    /// An optional glob expression used to filter files
+    glob: Option<Pattern>,
+}
+
+impl ListingTableUrl {
+    /// Parse a provided string as a `ListingTableUrl`
+    ///
+    /// # Paths without a Scheme
+    ///
+    /// If no scheme is provided, or the string is an absolute filesystem path
+    /// as determined [`std::path::Path::is_absolute`], the string will be
+    /// interpreted as a path on the local filesystem using the operating
+    /// system's standard path delimiter, i.e. `\` on Windows, `/` on Unix.
+    ///
+    /// If the path contains any of `'?', '*', '['`, it will be considered
+    /// a glob expression and resolved as described in the section below.
+    ///
+    /// Otherwise, the path will be resolved to an absolute path, returning
+    /// an error if it does not exist, and converted to a [file URI]
+    ///
+    /// If you wish to specify a path that does not exist on the local
+    /// machine you must provide it as a fully-qualified [file URI]
+    /// e.g. `file:///myfile.txt`
+    ///
+    /// ## Glob Paths
+    ///
+    /// If no scheme is provided, and the path contains a glob expression, it 
will
+    /// be resolved as follows.
+    ///
+    /// The string up to the first path segment containing a glob expression 
will be extracted,
+    /// and resolved in the same manner as a normal scheme-less path. That is, 
resolved to
+    /// an absolute path on the local filesystem, returning an error if it 
does not exist,
+    /// and converted to a [file URI]
+    ///
+    /// The remaining string will be interpreted as a [`glob::Pattern`] and 
used as a
+    /// filter when listing files from object storage
+    ///
+    /// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let s = s.as_ref();
+
+        // This is necessary to handle the case of a path starting with a 
drive letter
+        if std::path::Path::new(s).is_absolute() {
+            return Self::parse_path(s);
+        }
+
+        match Url::parse(s) {
+            Ok(url) => Ok(Self { url, glob: None }),
+            Err(url::ParseError::RelativeUrlWithoutBase) => 
Self::parse_path(s),
+            Err(e) => Err(DataFusionError::External(Box::new(e))),
+        }
+    }
+
+    /// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
+    fn parse_path(s: &str) -> Result<Self> {
+        let (prefix, glob) = match split_glob_expression(s) {
+            Some((prefix, glob)) => {
+                let glob = Pattern::new(glob)
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                (prefix, Some(glob))
+            }
+            None => (s, None),
+        };
+
+        let path = std::path::Path::new(prefix).canonicalize()?;
+        let url = match path.is_file() {
+            true => Url::from_file_path(path).unwrap(),
+            false => Url::from_directory_path(path).unwrap(),
+        };
+
+        Ok(Self { url, glob })
+    }
+
+    /// Returns the URL scheme
+    pub fn scheme(&self) -> &str {
+        self.url.scheme()
+    }
+
+    /// Returns the path as expected by [`ObjectStore`]
+    ///
+    /// In particular for file scheme URLs, this is an absolute
+    /// on the local filesystem in the OS-specific path representation
+    ///
+    /// For other URLs, this is a the host and path of the URL,
+    /// delimited by `/`, and with no leading `/`
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    fn prefix(&self) -> &str {
+        match self.scheme() {
+            "file" => match cfg!(target_family = "windows") {
+                true => self.url.path().strip_prefix('/').unwrap(),
+                false => self.url.path(),
+            },
+            _ => 
&self.url[url::Position::BeforeHost..url::Position::AfterPath],
+        }
+    }
+
+    /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
+    /// an iterator of the remaining path segments
+    ///
+    /// TODO: Handle paths consistently (#2489)
+    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+        &'a self,
+        path: &'b str,
+    ) -> Option<impl Iterator<Item = &'b str> + 'a> {
+        let prefix = self.prefix();
+        // Ignore empty path segments
+        let diff = itertools::diff_with(
+            path.split(is_separator).filter(|s| !s.is_empty()),
+            prefix.split(is_separator).filter(|s| !s.is_empty()),
+            |a, b| a == b,
+        );
+
+        match diff {
+            // Match with remaining
+            Some(itertools::Diff::Shorter(_, subpath)) => Some(subpath),
+            _ => None,
+        }
+    }
+
+    /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
+    pub(crate) fn list_all_files<'a>(
+        &'a self,
+        store: &'a dyn ObjectStore,
+        file_extension: &'a str,
+    ) -> BoxStream<'a, Result<FileMeta>> {
+        futures::stream::once(async move {
+            let prefix = self.prefix();
+            store.list_file(prefix.as_ref()).await
+        })
+        .try_flatten()
+        .map_err(DataFusionError::IoError)
+        .try_filter(move |meta| {
+            let path = meta.path();
+
+            let extension_match = path.ends_with(file_extension);
+            let glob_match = match &self.glob {
+                Some(glob) => match self.strip_prefix(path) {
+                    Some(mut segments) => {
+                        let stripped = segments.join("/");
+                        glob.matches(&stripped)
+                    }
+                    None => false,
+                },
+                None => true,
+            };
+
+            futures::future::ready(extension_match && glob_match)
+        })
+        .boxed()
+    }
+
+    /// Returns this [`ListingTableUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+
+    /// Return the [`ObjectStoreUrl`] for this [`ListingTableUrl`]
+    pub fn object_store(&self) -> ObjectStoreUrl {
+        let url = 
&self.url[url::Position::BeforeScheme..url::Position::BeforePath];
+        ObjectStoreUrl::parse(url).unwrap()
+    }
+}
+
+impl AsRef<str> for ListingTableUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ListingTableUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ListingTableUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
+
+/// Splits `path` at the first path segment containing a glob expression, 
returning
+/// `None` if no glob expression found.
+///
+/// Path delimiters are determined using [`std::path::is_separator`] which
+/// permits `/` as a path delimiter even on Windows platforms.
+///
+fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
+    let mut last_separator = 0;
+
+    for (byte_idx, char) in path.char_indices() {
+        if GLOB_START_CHARS.contains(&char) {
+            if last_separator == 0 {
+                return Some((".", path));
+            }
+            return Some(path.split_at(last_separator));
+        }
+
+        if std::path::is_separator(char) {
+            last_separator = byte_idx + char.len_utf8();
+        }
+    }
+    None
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_prefix_path() {
+        let root = std::env::current_dir().unwrap();
+        let root = root.to_string_lossy();
+
+        let url = ListingTableUrl::parse(&root).unwrap();
+        let child = format!("{}/partition/file", root);
+
+        let prefix: Vec<_> = url.strip_prefix(&child).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "file"]);
+    }
+
+    #[test]
+    fn test_prefix_s3() {
+        let url = ListingTableUrl::parse("s3://bucket/foo/bar").unwrap();
+        assert_eq!(url.prefix(), "bucket/foo/bar");
+
+        let path = "bucket/foo/bar/partition/foo.parquet";
+        let prefix: Vec<_> = url.strip_prefix(path).unwrap().collect();
+        assert_eq!(prefix, vec!["partition", "foo.parquet"]);

Review Comment:
   maybe also a negative test (like what happens if `path = 
"other-bucket/foo/bar/partition/foo.parquet"`?



##########
datafusion/core/Cargo.toml:
##########
@@ -67,7 +67,9 @@ datafusion-physical-expr = { path = "../physical-expr", 
version = "8.0.0" }
 datafusion-row = { path = "../row", version = "8.0.0" }
 datafusion-sql = { path = "../sql", version = "8.0.0" }
 futures = "0.3"
+glob = "0.3.0"
 hashbrown = { version = "0.12", features = ["raw"] }
+itertools = "0.10"

Review Comment:
   But datafusion doesn't depend on prost, right? If we could avoid another 
dependency that would be goo in my opinion



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.
+//! This allows the user to extend DataFusion with different storage systems 
such as S3 or HDFS
+//! and query data inside these systems.
+
+use datafusion_common::{DataFusionError, Result};
+use datafusion_data_access::object_store::local::{LocalFileSystem, 
LOCAL_SCHEME};
+use datafusion_data_access::object_store::ObjectStore;
+use parking_lot::RwLock;
+use std::collections::HashMap;
+use std::sync::Arc;
+use url::Url;
+
+/// A parsed URL identifying a particular [`ObjectStore`]
+#[derive(Debug, Clone)]
+pub struct ObjectStoreUrl {
+    url: Url,
+}
+
+impl ObjectStoreUrl {
+    /// Parse an [`ObjectStoreUrl`] from a string
+    pub fn parse(s: impl AsRef<str>) -> Result<Self> {
+        let mut parsed =
+            Url::parse(s.as_ref()).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
+
+        let remaining = &parsed[url::Position::BeforePath..];
+        if !remaining.is_empty() && remaining != "/" {
+            return Err(DataFusionError::Execution(format!(
+                "ObjectStoreUrl must only contain scheme and authority, got: 
{}",
+                remaining
+            )));
+        }
+
+        // Always set path for consistency
+        parsed.set_path("/");
+        Ok(Self { url: parsed })
+    }
+
+    /// An [`ObjectStoreUrl`] for the local filesystem
+    pub fn local_filesystem() -> Self {
+        Self::parse("file://").unwrap()
+    }
+
+    /// Returns this [`ObjectStoreUrl`] as a string
+    pub fn as_str(&self) -> &str {
+        self.as_ref()
+    }
+}
+
+impl AsRef<str> for ObjectStoreUrl {
+    fn as_ref(&self) -> &str {
+        self.url.as_ref()
+    }
+}
+
+impl AsRef<Url> for ObjectStoreUrl {
+    fn as_ref(&self) -> &Url {
+        &self.url
+    }
+}
+
+impl std::fmt::Display for ObjectStoreUrl {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        self.as_str().fmt(f)
+    }
+}
+
+/// Object store registry
+pub struct ObjectStoreRegistry {
+    /// A map from scheme to object store that serve list / read operations 
for the store
+    pub object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+}
+
+impl std::fmt::Debug for ObjectStoreRegistry {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        f.debug_struct("ObjectStoreRegistry")
+            .field(
+                "schemes",
+                &self.object_stores.read().keys().collect::<Vec<_>>(),
+            )
+            .finish()
+    }
+}
+
+impl Default for ObjectStoreRegistry {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl ObjectStoreRegistry {
+    /// Create the registry that object stores can registered into.
+    /// ['LocalFileSystem'] store is registered in by default to support read 
local files natively.
+    pub fn new() -> Self {
+        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
+        map.insert(LOCAL_SCHEME.to_string(), Arc::new(LocalFileSystem));
+
+        Self {
+            object_stores: RwLock::new(map),
+        }
+    }
+
+    /// Adds a new store to this registry.
+    /// If a store of the same prefix existed before, it is replaced in the 
registry and returned.
+    pub fn register_store(
+        &self,
+        scheme: String,
+        store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        let mut stores = self.object_stores.write();
+        stores.insert(scheme, store)
+    }
+
+    /// Get the store registered for scheme
+    pub fn get(&self, scheme: &str) -> Option<Arc<dyn ObjectStore>> {
+        let stores = self.object_stores.read();
+        stores.get(scheme).cloned()
+    }
+
+    /// Get a suitable store for the provided URL. For example:
+    ///
+    /// - URL with scheme `file://` or no schema will return the default 
LocalFS store
+    /// - URL with scheme `s3://` will return the S3 store if it's registered
+    ///
+    pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn 
ObjectStore>> {
+        let url = url.as_ref();
+        let stores = self.object_stores.read();
+        let store = stores.get(url.scheme()).map(Clone::clone).ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "No suitable object store found for {}",
+                url
+            ))
+        })?;
+
+        Ok(store)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::datasource::listing::ListingTableUrl;
+    use datafusion_data_access::object_store::local::LocalFileSystem;
+    use std::sync::Arc;
+
+    #[test]
+    fn test_object_store_url() {
+        let listing = ListingTableUrl::parse("file:///").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "file:///");
+
+        let file = ObjectStoreUrl::parse("file://").unwrap();
+        assert_eq!(file.as_str(), "file:///");
+
+        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
+        let store = listing.object_store();
+        assert_eq!(store.as_str(), "s3://bucket/");
+
+        let url = ObjectStoreUrl::parse("s3://bucket").unwrap();
+        assert_eq!(url.as_str(), "s3://bucket/");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?").unwrap_err();
+        assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only 
contain scheme and authority, got: ?");
+
+        let err = ObjectStoreUrl::parse("s3://bucket?foo=bar").unwrap_err();

Review Comment:
   I think it is also worth checking with username and passwords in the URL:
   
   s3://host:port/foo
   s3://username@host:port/foo
   s3://username:password@host:port/foo
   



##########
datafusion/core/src/datasource/object_store.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! ObjectStoreRegistry holds all the object stores at Runtime with a scheme 
for each store.

Review Comment:
   👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to