This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 62776739 feat(python/sedonadb): Implement GDAL/OGR formats via pyogrio 
(#283)
62776739 is described below

commit 627767393de7b1c041c04a0b59a0895442610a6d
Author: Dewey Dunnington <[email protected]>
AuthorDate: Sun Nov 23 21:08:03 2025 -0600

    feat(python/sedonadb): Implement GDAL/OGR formats via pyogrio (#283)
    
    Co-authored-by: Copilot <[email protected]>
---
 Cargo.lock                                    |   4 +
 python/sedonadb/Cargo.toml                    |   2 +
 python/sedonadb/python/sedonadb/context.py    |  68 +++++
 python/sedonadb/python/sedonadb/datasource.py | 194 +++++++++++++
 python/sedonadb/src/context.rs                |  21 ++
 python/sedonadb/src/datasource.rs             | 388 ++++++++++++++++++++++++++
 python/sedonadb/src/lib.rs                    |   3 +
 python/sedonadb/src/schema.rs                 |  15 +
 python/sedonadb/tests/test_datasource.py      | 135 +++++++++
 rust/sedona-datasource/Cargo.toml             |   2 +
 rust/sedona-datasource/src/format.rs          |   6 +-
 rust/sedona-datasource/src/lib.rs             |   1 +
 rust/sedona-datasource/src/spec.rs            | 156 ++++++++++-
 rust/sedona-datasource/src/utility.rs         | 145 ++++++++++
 rust/sedona-expr/src/spatial_filter.rs        |  96 ++++++-
 rust/sedona-geometry/src/bounding_box.rs      |  71 ++++-
 rust/sedona-geometry/src/interval.rs          | 270 +++++++++++++++++-
 rust/sedona/Cargo.toml                        |   1 +
 rust/sedona/src/context.rs                    | 167 ++++++++++-
 19 files changed, 1719 insertions(+), 26 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 92dc407f..10f5f3b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4834,6 +4834,7 @@ dependencies = [
  "parking_lot",
  "rstest",
  "sedona-common",
+ "sedona-datasource",
  "sedona-expr",
  "sedona-functions",
  "sedona-geo",
@@ -4911,6 +4912,7 @@ dependencies = [
  "datafusion-physical-plan",
  "futures",
  "object_store",
+ "regex",
  "sedona-common",
  "sedona-expr",
  "sedona-schema",
@@ -5315,7 +5317,9 @@ dependencies = [
  "pyo3",
  "sedona",
  "sedona-adbc",
+ "sedona-datasource",
  "sedona-expr",
+ "sedona-geometry",
  "sedona-geoparquet",
  "sedona-proj",
  "sedona-schema",
diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml
index 16c75a3d..67a21e07 100644
--- a/python/sedonadb/Cargo.toml
+++ b/python/sedonadb/Cargo.toml
@@ -42,6 +42,8 @@ futures = { workspace = true }
 pyo3 = { version = "0.25.1" }
 sedona = { workspace = true }
 sedona-adbc = { workspace = true }
+sedona-datasource = { workspace = true }
+sedona-geometry = { workspace = true }
 sedona-expr = { workspace = true }
 sedona-geoparquet = { workspace = true }
 sedona-schema = { workspace = true }
diff --git a/python/sedonadb/python/sedonadb/context.py 
b/python/sedonadb/python/sedonadb/context.py
index f1c48273..714d7333 100644
--- a/python/sedonadb/python/sedonadb/context.py
+++ b/python/sedonadb/python/sedonadb/context.py
@@ -152,6 +152,74 @@ class SedonaContext:
             self.options,
         )
 
+    def read_pyogrio(
+        self,
+        table_paths: Union[str, Path, Iterable[str]],
+        options: Optional[Dict[str, Any]] = None,
+        extension: str = "",
+    ) -> DataFrame:
+        """Read spatial file formats using GDAL/OGR via pyogrio
+
+        Creates a DataFrame from one or more paths or URLs to a file supported 
by
+        [pyogrio](https://pyogrio.readthedocs.io/en/latest/), which is the 
same package
+        that powers `geopandas.read_file()` by default. Some common formats 
that can be
+        opened using GDAL/OGR are FlatGeoBuf, GeoPackage, Shapefile, GeoJSON, 
and many,
+        many more. See <https://gdal.org/en/stable/drivers/vector/index.html> 
for a list
+        of available vector drivers.
+
+        Like `read_parquet()`, globs and directories can be specified in 
addition to
+        individual file paths. Paths ending in `.zip` are automatically 
prepended with
+        `/vsizip/` (i.e., are automatically unzipped by GDAL). HTTP(s) URLs are
+        supported via `/vsicurl/`.
+
+        Args:
+            table_paths: A str, Path, or iterable of paths containing URLs or
+                paths. Globs (i.e., `path/*.gpkg`), directories, and zipped
+                versions of otherwise readable files are supported.
+            options: An optional mapping of key/value pairs (open options)
+                passed to GDAL/OGR.
+            extension: An optional file extension (e.g., `"fgb"`) used when
+                `table_paths` specifies one or more directories or a glob
+                that does not enforce a file extension.
+
+        Examples:
+
+            >>> import geopandas
+            >>> import tempfile
+            >>> sd = sedona.db.connect()
+            >>> df = geopandas.GeoDataFrame({
+            ...     "geometry": geopandas.GeoSeries.from_wkt(["POINT (0 1)"], 
crs=3857)
+            ... })
+            >>>
+            >>> with tempfile.TemporaryDirectory() as td:
+            ...     df.to_file(f"{td}/df.fgb")
+            ...     sd.read_pyogrio(f"{td}/df.fgb").show()
+            ...
+            ┌──────────────┐
+            │ wkb_geometry │
+            │   geometry   │
+            ╞══════════════╡
+            │ POINT(0 1)   │
+            └──────────────┘
+
+        """
+        from sedonadb.datasource import PyogrioFormatSpec
+
+        if isinstance(table_paths, (str, Path)):
+            table_paths = [table_paths]
+
+        spec = PyogrioFormatSpec(extension)
+        if options is not None:
+            spec = spec.with_options(options)
+
+        return DataFrame(
+            self._impl,
+            self._impl.read_external_format(
+                spec, [str(path) for path in table_paths], False
+            ),
+            self.options,
+        )
+
     def sql(self, sql: str) -> DataFrame:
         """Create a [DataFrame][sedonadb.dataframe.DataFrame] by executing SQL
 
diff --git a/python/sedonadb/python/sedonadb/datasource.py 
b/python/sedonadb/python/sedonadb/datasource.py
new file mode 100644
index 00000000..ea9bdded
--- /dev/null
+++ b/python/sedonadb/python/sedonadb/datasource.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Mapping
+
+from sedonadb._lib import PyExternalFormat, PyProjectedRecordBatchReader
+
+
+class ExternalFormatSpec:
+    """Python file format specification
+
+    This class defines an abstract "file format", which maps to the DataFusion
+    concept of a `FileFormat`. This is a layer on top of the `TableProvider` 
that
+    provides standard support for querying collections of files using globs
+    or directories of files with compatible schemas. This abstraction allows 
for
+    basic support for pruning and partial filter pushdown (e.g., a bounding box
+    is available if one was provided in the underlying query); however, data
+    providers with more advanced features may wish to implement a 
`TableProvider`
+    in Rust to take advantage of a wider range of DataFusion features.
+
+    Implementations are only required to implement `open_reader()`; however, if
+    opening a reader is expensive and there is a more efficient way to infer a
+    schema from a given source, implementers may wish to also implement
+    `infer_schema()`.
+
+    This extension point is experimental and may evolve to serve the needs of
+    various file formats.
+    """
+
+    @property
+    def extension(self):
+        """A file extension for files that match this format
+
+        If this concept is not important for this format, returns an empty 
string.
+        """
+        return ""
+
+    def with_options(self, options: Mapping[str, Any]):
+        """Clone this instance and return a new instance with options applied
+
+        Apply an arbitrary set of format-defined key/value options. It is 
useful
+        to raise an error in this method if an option or value will later 
result
+        in an error; however, implementation may defer the error until later if
+        required by the underlying producer.
+
+        The default implementation of this method errors for any attempt to
+        pass options.
+        """
+        raise NotImplementedError(
+            f"key/value options not supported by {type(self).__name__}"
+        )
+
+    def open_reader(self, args: Any):
+        """Open an ArrowArrayStream/RecordBatchReader of batches given input 
information
+
+        Note that the output stream must take into account 
`args.file_projection`, if one
+        exists (`PyProjectedRecordBatchReader` may be used to ensure a set of 
output
+        columns or apply an output projection on an input stream.
+
+        The internals will keep a strong (Python) reference to the returned 
object
+        for as long as batches are being produced.
+
+        Args:
+            args: An object with attributes
+                - `src`: An object/file abstraction. Currently, `.to_url()` is 
the best way
+                  to extract the underlying URL from the source.
+                - `filter`: An object representing the filter expression that 
was pushed
+                  down, if one exists. Currently, 
`.bounding_box(column_index)` is the only
+                  way to interact with this object.
+                - `file_schema`: An optional schema. If `None`, the 
implementation must
+                  infer the schema.
+                - `file_projection`: An optional list of integers of the 
columns of
+                  `file_schema` that must be produced by this implementation 
(in the
+                  exact order specified).
+                - `batch_size`: An optional integer specifying the number of 
rows requested
+                  for each output batch.
+
+        """
+        raise NotImplementedError()
+
+    def infer_schema(self, src):
+        """Infer the output schema
+
+        Implementations can leave this unimplemented, in which case the 
internals will call
+        `open_reader()` and query the provided schema without pulling any 
batches.
+
+        Args:
+            src: An object/file abstraction. Currently, `.to_url()` is the 
best way
+                to extract the underlying URL from the source.
+        """
+        raise NotImplementedError()
+
+    def __sedona_external_format__(self):
+        return PyExternalFormat(self)
+
+
+class PyogrioFormatSpec(ExternalFormatSpec):
+    """An `ExternalFormatSpec` implementation wrapping GDAL/OGR via pyogrio"""
+
+    def __init__(self, extension=""):
+        self._extension = extension
+        self._options = {}
+
+    def with_options(self, options):
+        cloned = type(self)(self.extension)
+        cloned._options.update(options)
+        return cloned
+
+    @property
+    def extension(self) -> str:
+        return self._extension
+
+    def open_reader(self, args):
+        import pyogrio.raw
+
+        url = args.src.to_url()
+        if url is None:
+            raise ValueError(f"Can't convert {args.src} to OGR-openable 
object")
+
+        if url.startswith("http://";) or url.startswith("https://";):
+            ogr_src = f"/vsicurl/{url}"
+        elif url.startswith("file://"):
+            ogr_src = url.removeprefix("file://")
+        else:
+            raise ValueError(f"Can't open {url} with OGR")
+
+        if ogr_src.endswith(".zip"):
+            ogr_src = f"/vsizip/{ogr_src}"
+
+        if args.is_projected():
+            file_columns = args.file_schema.names
+            columns = [file_columns[i] for i in args.file_projection]
+        else:
+            columns = None
+
+        batch_size = args.batch_size if args.batch_size is not None else 0
+
+        if args.filter and args.file_schema is not None:
+            geometry_column_indices = args.file_schema.geometry_column_indices
+            if len(geometry_column_indices) == 1:
+                bbox = args.filter.bounding_box(geometry_column_indices[0])
+            else:
+                bbox = None
+        else:
+            bbox = None
+
+        return PyogrioReaderShelter(
+            pyogrio.raw.ogr_open_arrow(
+                ogr_src, {}, columns=columns, batch_size=batch_size, bbox=bbox
+            ),
+            columns,
+        )
+
+
+class PyogrioReaderShelter:
+    """Python object wrapper around the context manager returned by pyogrio
+
+    The pyogrio object returned by `pyogrio.raw.ogr_open_arrow()` is a context
+    manager; however, the internals can only manage Rust object references.
+    This object ensures that the context manager is closed when the object
+    is deleted (which occurs as soon as possible when the returned reader
+    is no longer required).
+    """
+
+    def __init__(self, inner, output_names=None):
+        self._inner = inner
+        self._output_names = output_names
+        self._meta, self._reader = self._inner.__enter__()
+
+    def __del__(self):
+        self._inner.__exit__(None, None, None)
+
+    def __arrow_c_stream__(self, requested_schema=None):
+        if self._output_names is None:
+            return self._reader.__arrow_c_stream__()
+        else:
+            projected = PyProjectedRecordBatchReader(
+                self._reader, None, self._output_names
+            )
+            return projected.__arrow_c_stream__()
diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs
index 4c480484..67ad8dcc 100644
--- a/python/sedonadb/src/context.rs
+++ b/python/sedonadb/src/context.rs
@@ -23,6 +23,7 @@ use tokio::runtime::Runtime;
 
 use crate::{
     dataframe::InternalDataFrame,
+    datasource::PyExternalFormat,
     error::PySedonaError,
     import_from::{import_ffi_scalar_udf, import_table_provider_from_any},
     runtime::wait_for_future,
@@ -107,6 +108,26 @@ impl InternalContext {
         Ok(InternalDataFrame::new(df, self.runtime.clone()))
     }
 
+    pub fn read_external_format<'py>(
+        &self,
+        py: Python<'py>,
+        format_spec: Bound<PyAny>,
+        table_paths: Vec<String>,
+        check_extension: bool,
+    ) -> Result<InternalDataFrame, PySedonaError> {
+        let spec = format_spec
+            .call_method0("__sedona_external_format__")?
+            .extract::<PyExternalFormat>()?;
+        let df = wait_for_future(
+            py,
+            &self.runtime,
+            self.inner
+                .read_external_format(Arc::new(spec), table_paths, None, 
check_extension),
+        )??;
+
+        Ok(InternalDataFrame::new(df, self.runtime.clone()))
+    }
+
     pub fn sql<'py>(
         &self,
         py: Python<'py>,
diff --git a/python/sedonadb/src/datasource.rs 
b/python/sedonadb/src/datasource.rs
new file mode 100644
index 00000000..496a3000
--- /dev/null
+++ b/python/sedonadb/src/datasource.rs
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, ffi::CString, sync::Arc};
+
+use arrow_array::{ffi_stream::FFI_ArrowArrayStream, RecordBatch, 
RecordBatchReader};
+use arrow_schema::{ArrowError, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{physical_expr::conjunction, physical_plan::PhysicalExpr};
+use datafusion_common::{DataFusionError, Result};
+use pyo3::{
+    exceptions::PyNotImplementedError, pyclass, pymethods, types::PyCapsule, 
Bound, PyObject,
+    Python,
+};
+use sedona_datasource::{
+    spec::{ExternalFormatSpec, Object, OpenReaderArgs},
+    utility::ProjectedRecordBatchReader,
+};
+use sedona_expr::spatial_filter::SpatialFilter;
+use sedona_geometry::interval::IntervalTrait;
+
+use crate::{
+    error::PySedonaError,
+    import_from::{import_arrow_array_stream, import_arrow_schema},
+    schema::PySedonaSchema,
+};
+
+/// Python object that calls the methods of Python-level ExternalFormatSpec
+///
+/// The main purpose of this object is to implement [ExternalFormatSpec] such
+/// that it can be used by SedonaDB/DataFusion internals.
+#[pyclass]
+#[derive(Debug)]
+pub struct PyExternalFormat {
+    extension: String,
+    py_spec: PyObject,
+}
+
+impl Clone for PyExternalFormat {
+    fn clone(&self) -> Self {
+        Python::with_gil(|py| Self {
+            extension: self.extension.clone(),
+            py_spec: self.py_spec.clone_ref(py),
+        })
+    }
+}
+
+impl PyExternalFormat {
+    fn with_options_impl<'py>(
+        &self,
+        py: Python<'py>,
+        options: &HashMap<String, String>,
+    ) -> Result<Self, PySedonaError> {
+        let new_py_spec = self
+            .py_spec
+            .call_method(py, "with_options", (options.clone(),), None)?;
+        let new_extension = new_py_spec
+            .getattr(py, "extension")?
+            .extract::<String>(py)?;
+        Ok(Self {
+            extension: new_extension,
+            py_spec: new_py_spec,
+        })
+    }
+
+    fn infer_schema_impl<'py>(
+        &self,
+        py: Python<'py>,
+        object: &Object,
+    ) -> Result<Schema, PySedonaError> {
+        let maybe_schema = self.py_spec.call_method(
+            py,
+            "infer_schema",
+            (PyDataSourceObject {
+                inner: object.clone(),
+            },),
+            None,
+        );
+
+        match maybe_schema {
+            Ok(py_schema) => import_arrow_schema(py_schema.bind(py)),
+            Err(e) => {
+                if e.is_instance_of::<PyNotImplementedError>(py) {
+                    // Fall back on the open_reader implementation, as for some
+                    // external formats there is no other mechanism to infer a 
schema
+                    // other than to open a reader and query the schema at 
that point.
+                    let reader_args = OpenReaderArgs {
+                        src: object.clone(),
+                        batch_size: None,
+                        file_schema: None,
+                        file_projection: None,
+                        filters: vec![],
+                    };
+
+                    let reader = self.open_reader_impl(py, &reader_args)?;
+                    Ok(reader.schema().as_ref().clone())
+                } else {
+                    Err(PySedonaError::from(e))
+                }
+            }
+        }
+    }
+
+    fn open_reader_impl<'py>(
+        &self,
+        py: Python<'py>,
+        args: &OpenReaderArgs,
+    ) -> Result<Box<dyn RecordBatchReader + Send>, PySedonaError> {
+        let reader_obj = self.py_spec.call_method(
+            py,
+            "open_reader",
+            (PyOpenReaderArgs {
+                inner: args.clone(),
+            },),
+            None,
+        )?;
+
+        let reader = import_arrow_array_stream(py, reader_obj.bind(py), None)?;
+        let wrapped_reader = WrappedRecordBatchReader {
+            inner: reader,
+            shelter: Some(reader_obj),
+        };
+        Ok(Box::new(wrapped_reader))
+    }
+}
+
+#[pymethods]
+impl PyExternalFormat {
+    #[new]
+    fn new<'py>(py: Python<'py>, py_spec: PyObject) -> Result<Self, 
PySedonaError> {
+        let extension = py_spec.getattr(py, 
"extension")?.extract::<String>(py)?;
+        Ok(Self { extension, py_spec })
+    }
+}
+
+#[async_trait]
+impl ExternalFormatSpec for PyExternalFormat {
+    fn extension(&self) -> &str {
+        &self.extension
+    }
+
+    fn with_options(
+        &self,
+        options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn ExternalFormatSpec>> {
+        let new_external_format = Python::with_gil(|py| 
self.with_options_impl(py, options))
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        Ok(Arc::new(new_external_format))
+    }
+
+    async fn infer_schema(&self, location: &Object) -> Result<Schema> {
+        let schema = Python::with_gil(|py| self.infer_schema_impl(py, 
location))
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        Ok(schema)
+    }
+
+    async fn open_reader(
+        &self,
+        args: &OpenReaderArgs,
+    ) -> Result<Box<dyn RecordBatchReader + Send>> {
+        let reader = Python::with_gil(|py| self.open_reader_impl(py, args))
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        Ok(reader)
+    }
+}
+
+/// Wrapper around the [Object] such that the [PyExternalFormatSpec] can pass
+/// required information into Python method calls
+///
+/// Currently this only exposes `to_url()`; however, we can and should expose
+/// the ability to read portions of files using the underlying object_store.
+#[pyclass]
+#[derive(Clone, Debug)]
+pub struct PyDataSourceObject {
+    pub inner: Object,
+}
+
+#[pymethods]
+impl PyDataSourceObject {
+    fn to_url(&self) -> Option<String> {
+        self.inner.to_url_string()
+    }
+}
+
+/// Wrapper around the [OpenReaderArgs] such that the [PyExternalFormatSpec] 
can pass
+/// required information into Python method calls
+#[pyclass]
+#[derive(Clone, Debug)]
+pub struct PyOpenReaderArgs {
+    pub inner: OpenReaderArgs,
+}
+
+#[pymethods]
+impl PyOpenReaderArgs {
+    #[getter]
+    fn src(&self) -> PyDataSourceObject {
+        PyDataSourceObject {
+            inner: self.inner.src.clone(),
+        }
+    }
+
+    #[getter]
+    fn batch_size(&self) -> Option<usize> {
+        self.inner.batch_size
+    }
+
+    #[getter]
+    fn file_schema(&self) -> Option<PySedonaSchema> {
+        self.inner
+            .file_schema
+            .as_ref()
+            .map(|schema| PySedonaSchema::new(schema.as_ref().clone()))
+    }
+
+    #[getter]
+    fn file_projection(&self) -> Option<Vec<usize>> {
+        self.inner.file_projection.clone()
+    }
+
+    #[getter]
+    fn filters(&self) -> Vec<PyFilter> {
+        self.inner
+            .filters
+            .iter()
+            .map(|f| PyFilter { inner: f.clone() })
+            .collect()
+    }
+
+    #[getter]
+    fn filter(&self) -> Option<PyFilter> {
+        if self.inner.filters.is_empty() {
+            None
+        } else {
+            Some(PyFilter {
+                inner: conjunction(self.inner.filters.iter().cloned()),
+            })
+        }
+    }
+
+    fn is_projected(&self) -> Result<bool, PySedonaError> {
+        match (&self.inner.file_projection, &self.inner.file_schema) {
+            (None, None) | (None, Some(_)) => Ok(false),
+            (Some(projection), Some(schema)) => {
+                let seq_along_schema = 
(0..schema.fields().len()).collect::<Vec<_>>();
+                Ok(&seq_along_schema != projection)
+            }
+            (Some(_), None) => Err(PySedonaError::SedonaPython(
+                "Can't check projection for OpenReaderArgs with no 
schema".to_string(),
+            )),
+        }
+    }
+}
+
+/// Wrapper around a PhysicalExpr such that the [PyExternalFormatSpec] can pass
+/// required information into Python method calls
+///
+/// This currently only exposes `bounding_box()`, but in the future could 
expose
+/// various ways to serialize the expression (SQL, DataFusion ProtoBuf, 
Substrait).
+#[pyclass]
+#[derive(Debug)]
+pub struct PyFilter {
+    inner: Arc<dyn PhysicalExpr>,
+}
+
+#[pymethods]
+impl PyFilter {
+    fn bounding_box(
+        &self,
+        column_index: usize,
+    ) -> Result<Option<(f64, f64, f64, f64)>, PySedonaError> {
+        let filter = SpatialFilter::try_from_expr(&self.inner)?;
+        let filter_bbox = filter.filter_bbox(column_index);
+        if filter_bbox.x().is_full() || filter_bbox.y().is_full() {
+            Ok(None)
+        } else {
+            Ok(Some((
+                filter_bbox.x().lo(),
+                filter_bbox.y().lo(),
+                filter_bbox.x().hi(),
+                filter_bbox.y().hi(),
+            )))
+        }
+    }
+
+    fn __repr__(&self) -> String {
+        format!("{self:?}")
+    }
+}
+
+/// RecordBatchReader utility that helps ensure projected output
+///
+/// Because the output of `open_reader()` is required to take into account
+/// the projection, we need to provide a utility to ensure this is take into 
account.
+/// This wrapper is a thin wrapper around the [ProjectedRecordBatchReader] 
that allows
+/// it to be constructed from Python using either a set of indices or a set of 
names.
+#[pyclass]
+pub struct PyProjectedRecordBatchReader {
+    inner_object: PyObject,
+    projection_indices: Option<Vec<usize>>,
+    projection_names: Option<Vec<String>>,
+}
+
+#[pymethods]
+impl PyProjectedRecordBatchReader {
+    #[new]
+    fn new(
+        inner_object: PyObject,
+        projection_indices: Option<Vec<usize>>,
+        projection_names: Option<Vec<String>>,
+    ) -> Self {
+        Self {
+            inner_object,
+            projection_indices,
+            projection_names,
+        }
+    }
+
+    #[pyo3(signature = (requested_schema=None))]
+    fn __arrow_c_stream__<'py>(
+        &self,
+        py: Python<'py>,
+        #[allow(unused_variables)] requested_schema: Option<Bound<'py, 
PyCapsule>>,
+    ) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
+        let inner = import_arrow_array_stream(py, self.inner_object.bind(py), 
None)?;
+
+        let reader = match (&self.projection_indices, &self.projection_names) {
+            (None, None) | (Some(_), Some(_)) => {
+                return 
Err(PySedonaError::SedonaPython("PyProjectedRecordBatchReader must be specified 
by one of projection_indices or projection_names".to_string()))
+            }
+            (Some(indices), None) => {
+                ProjectedRecordBatchReader::from_projection(inner, 
indices.clone())?
+            }
+            (None, Some(names)) => {
+                ProjectedRecordBatchReader::from_output_names(inner, 
&names.iter().map(|s| s.as_str()).collect::<Vec<&str>>())?
+            }
+        };
+
+        let ffi_stream = FFI_ArrowArrayStream::new(Box::new(reader));
+        let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
+        Ok(PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?)
+    }
+}
+
+/// Helper to ensure a Python object stays in scope for the duration of a
+/// [RecordBatchReader]'s output.
+///
+/// Some Python frameworks require that some parent object outlive a returned
+/// ArrowArrayStream/RecordBatchReader (e.g., the pyogrio context manager, or
+/// an ADBC statement/cursor).
+struct WrappedRecordBatchReader {
+    pub inner: Box<dyn RecordBatchReader + Send>,
+    pub shelter: Option<PyObject>,
+}
+
+impl RecordBatchReader for WrappedRecordBatchReader {
+    fn schema(&self) -> SchemaRef {
+        self.inner.schema()
+    }
+}
+
+impl Iterator for WrappedRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(item) = self.inner.next() {
+            Some(item)
+        } else {
+            self.shelter = None;
+            None
+        }
+    }
+}
diff --git a/python/sedonadb/src/lib.rs b/python/sedonadb/src/lib.rs
index 62a0caba..6110144a 100644
--- a/python/sedonadb/src/lib.rs
+++ b/python/sedonadb/src/lib.rs
@@ -22,6 +22,7 @@ use std::ffi::c_void;
 
 mod context;
 mod dataframe;
+mod datasource;
 mod error;
 mod import_from;
 mod reader;
@@ -94,6 +95,8 @@ fn _lib(py: Python<'_>, m: &Bound<'_, PyModule>) -> 
PyResult<()> {
 
     m.add_class::<context::InternalContext>()?;
     m.add_class::<dataframe::InternalDataFrame>()?;
+    m.add_class::<datasource::PyExternalFormat>()?;
+    m.add_class::<datasource::PyProjectedRecordBatchReader>()?;
     m.add("SedonaError", py.get_type::<error::SedonaError>())?;
     m.add_class::<schema::PySedonaSchema>()?;
     m.add_class::<schema::PySedonaField>()?;
diff --git a/python/sedonadb/src/schema.rs b/python/sedonadb/src/schema.rs
index d261043c..d9ead708 100644
--- a/python/sedonadb/src/schema.rs
+++ b/python/sedonadb/src/schema.rs
@@ -22,6 +22,7 @@ use pyo3::exceptions::{PyIndexError, PyKeyError, PyTypeError};
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
 use sedona_schema::datatypes::SedonaType;
+use sedona_schema::schema::SedonaSchema;
 
 use crate::error::PySedonaError;
 
@@ -59,6 +60,20 @@ impl PySedonaSchema {
 
 #[pymethods]
 impl PySedonaSchema {
+    #[getter]
+    fn names(&self) -> Vec<String> {
+        self.inner
+            .fields()
+            .iter()
+            .map(|f| f.name().to_string())
+            .collect()
+    }
+
+    #[getter]
+    fn geometry_column_indices(&self) -> Result<Vec<usize>, PySedonaError> {
+        Ok(self.inner.geometry_column_indices()?)
+    }
+
     fn field<'py>(
         &self,
         py: Python<'py>,
diff --git a/python/sedonadb/tests/test_datasource.py 
b/python/sedonadb/tests/test_datasource.py
new file mode 100644
index 00000000..64e5b0b8
--- /dev/null
+++ b/python/sedonadb/tests/test_datasource.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pathlib import Path
+import tempfile
+
+import geopandas
+import geopandas.testing
+import pandas as pd
+import pytest
+import shapely
+import sedonadb
+
+
+def test_read_ogr_projection(con):
+    n = 1024
+    series = geopandas.GeoSeries.from_xy(
+        list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+    )
+    gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry": 
series})
+    gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+    with tempfile.TemporaryDirectory() as td:
+        temp_fgb_path = f"{td}/temp.fgb"
+        gdf.to_file(temp_fgb_path)
+        con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True)
+
+        # With no projection
+        geopandas.testing.assert_geodataframe_equal(
+            con.sql("SELECT * FROM test_fgb ORDER BY idx").to_pandas(), gdf
+        )
+
+        # With only not geometry selected
+        pd.testing.assert_frame_equal(
+            con.sql("SELECT idx FROM test_fgb ORDER BY idx").to_pandas(),
+            gdf.filter(["idx"]),
+        )
+
+        # With reversed columns
+        pd.testing.assert_frame_equal(
+            con.sql("SELECT wkb_geometry, idx FROM test_fgb ORDER BY 
idx").to_pandas(),
+            gdf.filter(["wkb_geometry", "idx"]),
+        )
+
+
+def test_read_ogr_multi_file(con):
+    n = 1024 * 16
+    partitions = ["part_{c}" for c in "abcdefghijklmnop"]
+    series = geopandas.GeoSeries.from_xy(
+        list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+    )
+    gdf = geopandas.GeoDataFrame(
+        {
+            "idx": list(range(n)),
+            "partition": [partitions[i % len(partitions)] for i in range(n)],
+            "wkb_geometry": series,
+        }
+    )
+    gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+    with tempfile.TemporaryDirectory() as td:
+        # Create partitioned files by writing Parquet first and translating
+        # one file at a time
+        con.create_data_frame(gdf).to_parquet(td, partition_by="partition")
+        for parquet_path in Path(td).rglob("*.parquet"):
+            fgb_path = str(parquet_path).replace(".parquet", ".fgb")
+            con.read_parquet(parquet_path).to_pandas().to_file(fgb_path)
+
+        # Reading a directory while specifying the extension should work
+        con.read_pyogrio(f"{td}", extension="fgb").to_view(
+            "gdf_from_dir", overwrite=True
+        )
+        geopandas.testing.assert_geodataframe_equal(
+            con.sql("SELECT * FROM gdf_from_dir ORDER BY idx").to_pandas(),
+            gdf.filter(["idx", "wkb_geometry"]),
+        )
+
+        # Reading using a glob without specifying the extension should work
+        con.read_pyogrio(f"{td}/**/*.fgb").to_view("gdf_from_glob", 
overwrite=True)
+        geopandas.testing.assert_geodataframe_equal(
+            con.sql("SELECT * FROM gdf_from_glob ORDER BY idx").to_pandas(),
+            gdf.filter(["idx", "wkb_geometry"]),
+        )
+
+
+def test_read_ogr_filter(con):
+    n = 1024
+    series = geopandas.GeoSeries.from_xy(
+        list(range(n)), list(range(1, n + 1)), crs="EPSG:3857"
+    )
+    gdf = geopandas.GeoDataFrame({"idx": list(range(n)), "wkb_geometry": 
series})
+    gdf = gdf.set_geometry(gdf["wkb_geometry"])
+
+    with tempfile.TemporaryDirectory() as td:
+        temp_fgb_path = f"{td}/temp.fgb"
+        gdf.to_file(temp_fgb_path)
+        con.read_pyogrio(temp_fgb_path).to_view("test_fgb", overwrite=True)
+
+        # With something that should trigger a bounding box filter
+        geopandas.testing.assert_geodataframe_equal(
+            con.sql(
+                """
+                SELECT * FROM test_fgb
+                WHERE ST_Equals(wkb_geometry, ST_SetSRID(ST_Point(1, 2), 3857))
+                """
+            ).to_pandas(),
+            gdf[gdf.geometry.geom_equals(shapely.Point(1, 
2))].reset_index(drop=True),
+        )
+
+
+def test_read_ogr_file_not_found(con):
+    with pytest.raises(
+        sedonadb._lib.SedonaError, match="Can't infer schema for zero objects"
+    ):
+        con.read_pyogrio("this/is/not/a/directory")
+
+    with tempfile.TemporaryDirectory() as td:
+        with pytest.raises(
+            sedonadb._lib.SedonaError, match="Can't infer schema for zero 
objects"
+        ):
+            con.read_pyogrio(Path(td) / "file_does_not_exist")
diff --git a/rust/sedona-datasource/Cargo.toml 
b/rust/sedona-datasource/Cargo.toml
index b939ff41..c7273de8 100644
--- a/rust/sedona-datasource/Cargo.toml
+++ b/rust/sedona-datasource/Cargo.toml
@@ -29,6 +29,7 @@ rust-version.workspace = true
 default = []
 
 [dev-dependencies]
+object_store = { workspace = true, features = ["http"] }
 url = { workspace = true }
 tempfile = { workspace = true }
 tokio = { workspace = true }
@@ -45,6 +46,7 @@ datafusion-physical-expr = { workspace = true }
 datafusion-physical-plan = { workspace = true }
 futures = { workspace = true }
 object_store = { workspace = true }
+regex = { workspace = true }
 sedona-common = { workspace = true }
 sedona-expr = { workspace = true }
 sedona-schema = { workspace = true }
diff --git a/rust/sedona-datasource/src/format.rs 
b/rust/sedona-datasource/src/format.rs
index 5e6d5315..69a2bc9f 100644
--- a/rust/sedona-datasource/src/format.rs
+++ b/rust/sedona-datasource/src/format.rs
@@ -31,7 +31,7 @@ use datafusion::{
     },
 };
 use datafusion_catalog::{memory::DataSourceExec, Session};
-use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, 
Statistics};
+use datafusion_common::{not_impl_err, plan_err, DataFusionError, GetExt, 
Result, Statistics};
 use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr};
 use datafusion_physical_plan::{
     filter_pushdown::{FilterPushdownPropagation, PushedDown},
@@ -124,6 +124,10 @@ impl FileFormat for ExternalFileFormat {
         store: &Arc<dyn ObjectStore>,
         objects: &[ObjectMeta],
     ) -> Result<SchemaRef> {
+        if objects.is_empty() {
+            return plan_err!("Can't infer schema for zero objects. Does the 
input path exist?");
+        }
+
         let mut schemas: Vec<_> = futures::stream::iter(objects)
             .map(|object| async move {
                 let schema = self
diff --git a/rust/sedona-datasource/src/lib.rs 
b/rust/sedona-datasource/src/lib.rs
index 4bdc5969..88ccd95e 100644
--- a/rust/sedona-datasource/src/lib.rs
+++ b/rust/sedona-datasource/src/lib.rs
@@ -18,3 +18,4 @@
 pub mod format;
 pub mod provider;
 pub mod spec;
+pub mod utility;
diff --git a/rust/sedona-datasource/src/spec.rs 
b/rust/sedona-datasource/src/spec.rs
index a3abf8d6..d9b8f1af 100644
--- a/rust/sedona-datasource/src/spec.rs
+++ b/rust/sedona-datasource/src/spec.rs
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{collections::HashMap, fmt::Debug, sync::Arc};
+use std::{
+    collections::HashMap,
+    fmt::{Debug, Display},
+    sync::Arc,
+};
 
 use arrow_array::RecordBatchReader;
 use arrow_schema::{Schema, SchemaRef};
@@ -26,6 +30,7 @@ use datafusion_common::{Result, Statistics};
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_physical_expr::PhysicalExpr;
 use object_store::{ObjectMeta, ObjectStore};
+use regex::Regex;
 
 /// Simple file format specification
 ///
@@ -51,11 +56,6 @@ pub trait ExternalFormatSpec: Debug + Send + Sync {
     async fn open_reader(&self, args: &OpenReaderArgs)
         -> Result<Box<dyn RecordBatchReader + Send>>;
 
-    /// A file extension or `""` if this concept does not apply
-    fn extension(&self) -> &str {
-        ""
-    }
-
     /// Compute a clone of self but with the key/value options specified
     ///
     /// Implementations should error for invalid key/value input that does
@@ -65,6 +65,11 @@ pub trait ExternalFormatSpec: Debug + Send + Sync {
         options: &HashMap<String, String>,
     ) -> Result<Arc<dyn ExternalFormatSpec>>;
 
+    /// A file extension or `""` if this concept does not apply
+    fn extension(&self) -> &str {
+        ""
+    }
+
     /// Fill in default options from [TableOptions]
     ///
     /// The TableOptions are a DataFusion concept that provide a means by which
@@ -182,16 +187,145 @@ impl Object {
                 // GDAL to be able to translate.
                 let object_store_debug = format!("{:?}", 
self.store).to_lowercase();
                 if object_store_debug.contains("http") {
-                    Some(format!("https://{}";, meta.location))
-                } else if object_store_debug.contains("local") {
+                    let pattern = r#"host: 
some\(domain\("([A-Za-z0-9.-]+)"\)\)"#;
+                    let re = Regex::new(pattern).ok()?;
+                    if let Some(caps) = re.captures(&object_store_debug) {
+                        caps.get(1)
+                            .map(|host| format!("https://{}/{}";, 
host.as_str(), meta.location))
+                    } else {
+                        None
+                    }
+                } else if object_store_debug.contains("localfilesystem") {
                     Some(format!("file:///{}", meta.location))
                 } else {
                     None
                 }
             }
-            (Some(url), None) => Some(url.to_string()),
-            (Some(url), Some(meta)) => Some(format!("{url}/{}", 
meta.location)),
-            (None, None) => None,
+            (Some(url), Some(meta)) => Some(format!("{url}{}", meta.location)),
+            (Some(_), None) | (None, None) => None,
+        }
+    }
+}
+
+impl Display for Object {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        if let Some(url) = self.to_url_string() {
+            write!(f, "{url}")
+        } else if let Some(meta) = &self.meta {
+            write!(f, "<object store> {}", meta.location)
+        } else {
+            write!(f, "<object store> <unknown location>")
         }
     }
 }
+
+#[cfg(test)]
+mod test {
+    use object_store::{http::HttpBuilder, local::LocalFileSystem};
+
+    use super::*;
+
+    #[test]
+    fn http_object() {
+        let url_string = "https://foofy.foof/path/to/file.ext";;
+
+        let store = 
Arc::new(HttpBuilder::new().with_url(url_string).build().unwrap());
+
+        let url = ObjectStoreUrl::parse("https://foofy.foof";).unwrap();
+
+        let meta = ObjectMeta {
+            location: "/path/to/file.ext".into(),
+            last_modified: Default::default(),
+            size: 0,
+            e_tag: None,
+            version: None,
+        };
+
+        // Should be able to reconstruct the url with ObjectStoreUrl + meta
+        let obj = Object {
+            store: None,
+            url: Some(url.clone()),
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert_eq!(obj.to_url_string().unwrap(), url_string);
+
+        // Should be able to reconstruct the url with the ObjectStore + meta
+        let obj = Object {
+            store: Some(store),
+            url: None,
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert_eq!(obj.to_url_string().unwrap(), url_string);
+
+        // With only Meta, this should fail to compute a url
+        let obj = Object {
+            store: None,
+            url: None,
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert!(obj.to_url_string().is_none());
+
+        // With only ObjectStoreUrl, this should fail to compute a url
+        let obj = Object {
+            store: None,
+            url: Some(url),
+            meta: None,
+            range: None,
+        };
+        assert!(obj.to_url_string().is_none());
+    }
+
+    #[test]
+    fn filesystem_object() {
+        let store = Arc::new(LocalFileSystem::new());
+
+        let url = ObjectStoreUrl::parse("file://").unwrap();
+
+        let meta = ObjectMeta {
+            location: "/path/to/file.ext".into(),
+            last_modified: Default::default(),
+            size: 0,
+            e_tag: None,
+            version: None,
+        };
+
+        // Should be able to reconstruct the url with ObjectStoreUrl + meta
+        let obj = Object {
+            store: None,
+            url: Some(url.clone()),
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext");
+
+        // Should be able to reconstruct the url with the ObjectStore + meta
+        let obj = Object {
+            store: Some(store),
+            url: None,
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert_eq!(obj.to_url_string().unwrap(), "file:///path/to/file.ext");
+
+        // With only Meta, this should fail to compute a url
+        let obj = Object {
+            store: None,
+            url: None,
+            meta: Some(meta.clone()),
+            range: None,
+        };
+        assert!(obj.to_url_string().is_none());
+
+        // With only ObjectStoreUrl, this should fail to compute a url
+        let obj = Object {
+            store: None,
+            url: Some(url),
+            meta: None,
+            range: None,
+        };
+        assert!(obj.to_url_string().is_none());
+    }
+}
diff --git a/rust/sedona-datasource/src/utility.rs 
b/rust/sedona-datasource/src/utility.rs
new file mode 100644
index 00000000..ad36cf0c
--- /dev/null
+++ b/rust/sedona-datasource/src/utility.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::{RecordBatch, RecordBatchReader};
+use arrow_schema::{ArrowError, SchemaRef};
+use datafusion_common::Result;
+
+/// [RecordBatchReader] wrapper that applies a projection
+///
+/// This utility can be used to implement a reader that conforms to the
+/// DataFusion requirement that datasources apply the specified projection
+/// when producing output.
+pub struct ProjectedRecordBatchReader {
+    inner: Box<dyn RecordBatchReader + Send>,
+    projection: Vec<usize>,
+    schema: SchemaRef,
+}
+
+impl ProjectedRecordBatchReader {
+    /// Create a new wrapper from the indices into the input desired in the 
output
+    pub fn from_projection(
+        inner: Box<dyn RecordBatchReader + Send>,
+        projection: Vec<usize>,
+    ) -> Result<Self> {
+        let schema = inner.schema().project(&projection)?;
+        Ok(Self {
+            inner,
+            projection,
+            schema: Arc::new(schema),
+        })
+    }
+
+    /// Create a new wrapper from the column names from the input desired in 
the output
+    pub fn from_output_names(
+        inner: Box<dyn RecordBatchReader + Send>,
+        projection: &[&str],
+    ) -> Result<Self> {
+        let input_indices = projection
+            .iter()
+            .map(|col| inner.schema().index_of(col))
+            .collect::<Result<Vec<usize>, ArrowError>>()?;
+        Self::from_projection(inner, input_indices)
+    }
+}
+
+impl RecordBatchReader for ProjectedRecordBatchReader {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+impl Iterator for ProjectedRecordBatchReader {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(next) = self.inner.next() {
+            match next {
+                Ok(batch) => Some(batch.project(&self.projection)),
+                Err(err) => Some(Err(err)),
+            }
+        } else {
+            None
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+
+    use arrow_array::{create_array, ArrayRef, RecordBatchIterator};
+    use datafusion::assert_batches_eq;
+
+    use super::*;
+
+    #[test]
+    fn projected_record_batch_reader() {
+        let batch = RecordBatch::try_from_iter([
+            (
+                "x",
+                create_array!(Utf8, ["one", "two", "three", "four"]) as 
ArrayRef,
+            ),
+            (
+                "y",
+                create_array!(Utf8, ["five", "six", "seven", "eight"]) as 
ArrayRef,
+            ),
+        ])
+        .unwrap();
+
+        let schema = batch.schema();
+
+        // From indices
+        let reader = RecordBatchIterator::new([Ok(batch.clone())], 
schema.clone());
+        let projected =
+            ProjectedRecordBatchReader::from_projection(Box::new(reader), 
vec![1, 0]).unwrap();
+        let projected_batches = projected.collect::<Result<Vec<_>, 
ArrowError>>().unwrap();
+        assert_batches_eq!(
+            [
+                "+-------+-------+",
+                "| y     | x     |",
+                "+-------+-------+",
+                "| five  | one   |",
+                "| six   | two   |",
+                "| seven | three |",
+                "| eight | four  |",
+                "+-------+-------+",
+            ],
+            &projected_batches
+        );
+
+        // From output names
+        let reader = RecordBatchIterator::new([Ok(batch.clone())], 
schema.clone());
+        let projected =
+            ProjectedRecordBatchReader::from_output_names(Box::new(reader), 
&["y", "x"]).unwrap();
+        let projected_batches = projected.collect::<Result<Vec<_>, 
ArrowError>>().unwrap();
+        assert_batches_eq!(
+            [
+                "+-------+-------+",
+                "| y     | x     |",
+                "+-------+-------+",
+                "| five  | one   |",
+                "| six   | two   |",
+                "| seven | three |",
+                "| eight | four  |",
+                "+-------+-------+",
+            ],
+            &projected_batches
+        );
+    }
+}
diff --git a/rust/sedona-expr/src/spatial_filter.rs 
b/rust/sedona-expr/src/spatial_filter.rs
index 83e314eb..1160cf2c 100644
--- a/rust/sedona-expr/src/spatial_filter.rs
+++ b/rust/sedona-expr/src/spatial_filter.rs
@@ -25,7 +25,11 @@ use datafusion_physical_expr::{
 };
 use geo_traits::Dimensions;
 use sedona_common::sedona_internal_err;
-use sedona_geometry::{bounding_box::BoundingBox, bounds::wkb_bounds_xy, 
interval::IntervalTrait};
+use sedona_geometry::{
+    bounding_box::BoundingBox,
+    bounds::wkb_bounds_xy,
+    interval::{Interval, IntervalTrait},
+};
 use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
 
 use crate::{
@@ -41,7 +45,7 @@ use crate::{
 /// to attempt pruning unnecessary files or parts of files specifically with 
respect
 /// to a spatial filter (i.e., non-spatial filters we leave to an underlying
 /// implementation).
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 pub enum SpatialFilter {
     /// ST_Intersects(\<column\>, \<literal\>) or ST_Intersects(\<literal\>, 
\<column\>)
     Intersects(Column, BoundingBox),
@@ -60,6 +64,44 @@ pub enum SpatialFilter {
 }
 
 impl SpatialFilter {
+    /// Compute the maximum extent of a filter for a specific column index
+    ///
+    /// Some spatial file formats have the ability to push down a bounding box
+    /// into an index. This function allows deriving that bounding box based
+    /// on what DataFusion provides, which is a physical expression.
+    ///
+    /// Note that this always succeeds; however, for a non-spatial expression 
or
+    /// a non-spatial expression that is unsupported, the full bounding box is
+    /// returned.
+    pub fn filter_bbox(&self, column_index: usize) -> BoundingBox {
+        match self {
+            SpatialFilter::Intersects(column, bounding_box)
+            | SpatialFilter::Covers(column, bounding_box) => {
+                if column.index() == column_index {
+                    return bounding_box.clone();
+                }
+            }
+            SpatialFilter::And(lhs, rhs) => {
+                let lhs_box = lhs.filter_bbox(column_index);
+                let rhs_box = rhs.filter_bbox(column_index);
+                if let Ok(bounds) = lhs_box.intersection(&rhs_box) {
+                    return bounds;
+                }
+            }
+            SpatialFilter::Or(lhs, rhs) => {
+                let mut bounds = lhs.filter_bbox(column_index);
+                bounds.update_box(&rhs.filter_bbox(column_index));
+                return bounds;
+            }
+            SpatialFilter::LiteralFalse => {
+                return BoundingBox::xy(Interval::empty(), Interval::empty())
+            }
+            SpatialFilter::HasZ(_) | SpatialFilter::Unknown => {}
+        }
+
+        BoundingBox::xy(Interval::full(), Interval::full())
+    }
+
     /// Returns true if there is any chance the expression might be true
     ///
     /// In other words, returns false if and only if the expression is 
guaranteed
@@ -1103,4 +1145,54 @@ mod test {
             panic!("Parse incorrect!")
         }
     }
+
+    #[test]
+    fn bounding_box() {
+        let col_zero = Column::new("foofy", 0);
+        let bbox_02 = BoundingBox::xy((0, 2), (0, 2));
+        let bbox_13 = BoundingBox::xy((1, 3), (1, 3));
+
+        assert_eq!(
+            SpatialFilter::Intersects(col_zero.clone(), 
bbox_02.clone()).filter_bbox(0),
+            bbox_02
+        );
+
+        assert_eq!(
+            SpatialFilter::Covers(col_zero.clone(), 
bbox_02.clone()).filter_bbox(0),
+            bbox_02
+        );
+
+        assert_eq!(
+            SpatialFilter::LiteralFalse.filter_bbox(0),
+            BoundingBox::xy(Interval::empty(), Interval::empty())
+        );
+        assert_eq!(
+            SpatialFilter::HasZ(col_zero.clone()).filter_bbox(0),
+            BoundingBox::xy(Interval::full(), Interval::full())
+        );
+        assert_eq!(
+            SpatialFilter::Unknown.filter_bbox(0),
+            BoundingBox::xy(Interval::full(), Interval::full())
+        );
+
+        let intersects_02 = SpatialFilter::Intersects(col_zero.clone(), 
bbox_02.clone());
+        let intersects_13 = SpatialFilter::Intersects(col_zero.clone(), 
bbox_13.clone());
+        assert_eq!(
+            SpatialFilter::And(
+                Box::new(intersects_02.clone()),
+                Box::new(intersects_13.clone())
+            )
+            .filter_bbox(0),
+            BoundingBox::xy((1, 2), (1, 2))
+        );
+
+        assert_eq!(
+            SpatialFilter::Or(
+                Box::new(intersects_02.clone()),
+                Box::new(intersects_13.clone())
+            )
+            .filter_bbox(0),
+            BoundingBox::xy((0, 3), (0, 3))
+        );
+    }
 }
diff --git a/rust/sedona-geometry/src/bounding_box.rs 
b/rust/sedona-geometry/src/bounding_box.rs
index fb5b9777..7a018fb5 100644
--- a/rust/sedona-geometry/src/bounding_box.rs
+++ b/rust/sedona-geometry/src/bounding_box.rs
@@ -16,7 +16,10 @@
 // under the License.
 use serde::{Deserialize, Serialize};
 
-use crate::interval::{Interval, IntervalTrait, WraparoundInterval};
+use crate::{
+    error::SedonaGeometryError,
+    interval::{Interval, IntervalTrait, WraparoundInterval},
+};
 
 /// Bounding Box implementation with wraparound support
 ///
@@ -162,6 +165,25 @@ impl BoundingBox {
             _ => None,
         };
     }
+
+    /// Compute the intersection of this bounding box with another
+    ///
+    /// This method will propagate missingness of Z or M dimensions from the 
two boxes
+    /// (e.g., Z will be `None` if Z if `self.z().is_none()` OR 
`other.z().is_none()`).
+    pub fn intersection(&self, other: &Self) -> Result<Self, 
SedonaGeometryError> {
+        Ok(Self {
+            x: self.x.intersection(&other.x)?,
+            y: self.y.intersection(&other.y)?,
+            z: match (self.z, other.z) {
+                (Some(z), Some(other_z)) => Some(z.intersection(&other_z)?),
+                _ => None,
+            },
+            m: match (self.m, other.m) {
+                (Some(m), Some(other_m)) => Some(m.intersection(&other_m)?),
+                _ => None,
+            },
+        })
+    }
 }
 
 #[cfg(test)]
@@ -358,6 +380,53 @@ mod test {
         assert!(bounding_box.m().is_none());
     }
 
+    #[test]
+    fn bounding_box_intersection() {
+        assert_eq!(
+            BoundingBox::xy((1, 2), (3, 4))
+                .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5)))
+                .unwrap(),
+            BoundingBox::xy((1.5, 2.0), (3.5, 4.0))
+        );
+
+        // If z and m are present in one input but not the other, we propagate 
the unknownness
+        // to the intersection
+        assert_eq!(
+            BoundingBox::xyzm(
+                (1, 2),
+                (3, 4),
+                Some(Interval::empty()),
+                Some(Interval::empty())
+            )
+            .intersection(&BoundingBox::xy((1.5, 2.5), (3.5, 4.5)))
+            .unwrap(),
+            BoundingBox::xy((1.5, 2.0), (3.5, 4.0))
+        );
+
+        // If z and m are specified in both, we include the intersection in 
the output
+        assert_eq!(
+            BoundingBox::xyzm(
+                (1, 2),
+                (3, 4),
+                Some(Interval::empty()),
+                Some(Interval::empty())
+            )
+            .intersection(&BoundingBox::xyzm(
+                (1.5, 2.5),
+                (3.5, 4.5),
+                Some(Interval::empty()),
+                Some(Interval::empty())
+            ))
+            .unwrap(),
+            BoundingBox::xyzm(
+                (1.5, 2.0),
+                (3.5, 4.0),
+                Some(Interval::empty()),
+                Some(Interval::empty())
+            )
+        );
+    }
+
     fn check_serialize_deserialize_roundtrip(bounding_box: BoundingBox) {
         let json_bytes = serde_json::to_vec(&bounding_box).unwrap();
         let bounding_box_roundtrip: BoundingBox = 
serde_json::from_slice(&json_bytes).unwrap();
diff --git a/rust/sedona-geometry/src/interval.rs 
b/rust/sedona-geometry/src/interval.rs
index 1037bf7e..d02608d7 100644
--- a/rust/sedona-geometry/src/interval.rs
+++ b/rust/sedona-geometry/src/interval.rs
@@ -26,7 +26,7 @@ use crate::error::SedonaGeometryError;
 /// incurs overhead (particularly in a loop). This trait is mostly used to
 /// simplify testing and unify documentation for the two concrete
 /// implementations.
-pub trait IntervalTrait: std::fmt::Debug + PartialEq {
+pub trait IntervalTrait: std::fmt::Debug + PartialEq + Sized {
     /// Create an interval from lo and hi values
     fn new(lo: f64, hi: f64) -> Self;
 
@@ -98,6 +98,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq {
     /// True if this interval is empty (i.e. intersects no values)
     fn is_empty(&self) -> bool;
 
+    /// True if this interval is full (i.e. intersects all values)
+    fn is_full(&self) -> bool;
+
     /// Compute a new interval that is the union of both
     ///
     /// When accumulating intervals in a loop, use [Interval::update_interval].
@@ -114,6 +117,9 @@ pub trait IntervalTrait: std::fmt::Debug + PartialEq {
     /// For regular intervals, this expands both lo and hi by the distance.
     /// For wraparound intervals, this may result in the full interval if 
expansion is large enough.
     fn expand_by(&self, distance: f64) -> Self;
+
+    /// Compute the interval contained by both self and other
+    fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError>;
 }
 
 /// 1D Interval that never wraps around
@@ -236,6 +242,10 @@ impl IntervalTrait for Interval {
         self.width() == -f64::INFINITY
     }
 
+    fn is_full(&self) -> bool {
+        self == &Self::full()
+    }
+
     fn merge_interval(&self, other: &Self) -> Self {
         let mut out = *self;
         out.update_interval(other);
@@ -255,6 +265,16 @@ impl IntervalTrait for Interval {
 
         Self::new(self.lo - distance, self.hi + distance)
     }
+
+    fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError> {
+        let new_lo = self.lo.max(other.lo);
+        let new_hi = self.hi.min(other.hi);
+        if new_lo > new_hi {
+            Ok(Self::empty())
+        } else {
+            Ok(Self::new(new_lo, new_hi))
+        }
+    }
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
@@ -370,6 +390,10 @@ impl IntervalTrait for WraparoundInterval {
         self.inner.is_empty()
     }
 
+    fn is_full(&self) -> bool {
+        self == &Self::full()
+    }
+
     fn merge_interval(&self, other: &Self) -> Self {
         if self.is_empty() {
             return *other;
@@ -486,6 +510,53 @@ impl IntervalTrait for WraparoundInterval {
         // So the interval itself is (excluded_hi, excluded_lo)
         Self::new(excluded_hi, excluded_lo)
     }
+
+    fn intersection(&self, other: &Self) -> Result<Self, SedonaGeometryError> {
+        match (self.is_wraparound(), other.is_wraparound()) {
+            // Neither is wraparound
+            (false, false) => 
Ok(self.inner.intersection(&other.inner)?.into()),
+            // One is wraparound
+            (true, false) => other.intersection(self),
+            (false, true) => {
+                let inner = self.inner;
+                let (left, right) = other.split();
+                match (inner.intersects_interval(&left), 
inner.intersects_interval(&right)) {
+                    // Intersects both the left and right intervals
+                    (true, true) => {
+                        Err(SedonaGeometryError::Invalid(format!("Can't 
represent the intersection of {self:?} and {other:?} as a single 
WraparoundInterval")))
+                    },
+                    // Intersects only the left interval
+                    (true, false) => Ok(inner.intersection(&left)?.into()),
+                    // Intersects only the right interval
+                    (false, true) => Ok(inner.intersection(&right)?.into()),
+                    (false, false) => Ok(WraparoundInterval::empty()),
+                }
+            }
+            // Both are wraparound
+            (true, true) => {
+                // Both wraparound intervals represent complements of excluded 
regions
+                // Intersection of complements = complement of union of 
excluded regions
+                // self excludes (hi, lo), other excludes (other.hi, other.lo)
+                // We need to find the union of these excluded regions
+                let excluded_self = Interval::new(self.inner.hi, 
self.inner.lo);
+                let excluded_other = Interval::new(other.inner.hi, 
other.inner.lo);
+
+                // We can't use the excluded union if the excluded region of 
self and other
+                // are disjoint
+                if excluded_self.intersects_interval(&excluded_other) {
+                    let excluded_union = 
excluded_self.merge_interval(&excluded_other);
+
+                    // The intersection is the complement of the union of 
excluded regions
+                    Ok(WraparoundInterval::new(
+                        excluded_union.hi(),
+                        excluded_union.lo(),
+                    ))
+                } else {
+                    Err(SedonaGeometryError::Invalid(format!("Can't represent 
the intersection of {self:?} and {other:?} as a single WraparoundInterval")))
+                }
+            }
+        }
+    }
 }
 
 #[cfg(test)]
@@ -495,6 +566,8 @@ mod test {
     use super::*;
 
     fn test_empty<T: IntervalTrait>(empty: T) {
+        assert!(empty.is_empty());
+
         // Equals itself
         #[allow(clippy::eq_op)]
         {
@@ -536,6 +609,12 @@ mod test {
             T::new(10.0, 20.0)
         );
 
+        // Intersecting an empty interval results in an empty interval
+        assert_eq!(empty.intersection(&empty).unwrap(), empty);
+
+        // Intersecting a full interval results in an empty interval
+        assert_eq!(empty.intersection(&T::full()).unwrap(), empty);
+
         // Expanding empty interval keeps it empty
         assert_eq!(empty.expand_by(5.0), empty);
         assert_eq!(empty.expand_by(0.0), empty);
@@ -567,6 +646,21 @@ mod test {
         );
     }
 
+    fn test_full<T: IntervalTrait>(full: T) {
+        assert!(full.is_full());
+        assert_eq!(full.intersection(&full).unwrap(), full);
+    }
+
+    #[test]
+    fn interval_full() {
+        test_full(Interval::full());
+    }
+
+    #[test]
+    fn wraparound_interval_full() {
+        test_full(WraparoundInterval::full());
+    }
+
     fn test_finite<T: IntervalTrait>(finite: T) {
         // Check accessors
         assert_eq!(finite.lo(), 10.0);
@@ -671,6 +765,28 @@ mod test {
             T::new(10.0, 30.0)
         );
 
+        // Intersecting an interval with the empty interval
+        assert_eq!(finite.intersection(&T::empty()).unwrap(), T::empty());
+
+        // Intersecting an interval with the full interval
+        assert_eq!(finite.intersection(&T::full()).unwrap(), finite);
+
+        // Intersecting finite intervals with a non-empty result
+        assert_eq!(
+            finite
+                .intersection(&T::new(finite.mid(), finite.hi() + 1.0))
+                .unwrap(),
+            T::new(finite.mid(), finite.hi())
+        );
+
+        // Intersecting finite intervals with an empty result
+        assert_eq!(
+            finite
+                .intersection(&T::new(finite.hi() + 1.0, finite.hi() + 2.0))
+                .unwrap(),
+            T::empty()
+        );
+
         // Expanding by positive distance
         assert_eq!(finite.expand_by(2.0), T::new(8.0, 22.0));
         assert_eq!(finite.expand_by(5.0), T::new(5.0, 25.0));
@@ -979,6 +1095,158 @@ mod test {
         );
     }
 
+    #[test]
+    fn wraparound_interval_actually_wraparound_intersection() {
+        // Everything *except* the interval (10, 20)
+        let wraparound = WraparoundInterval::new(20.0, 10.0);
+
+        // Intersecting an empty interval
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::empty())
+                .unwrap(),
+            WraparoundInterval::empty()
+        );
+
+        // Intersecting an interval with itself
+        assert_eq!(wraparound.intersection(&wraparound).unwrap(), wraparound);
+
+        // Intersecting a wraparound interval with a "larger" wraparound 
interval
+        //           10         20
+        // <==========|          |============>
+        // <==============|  |================>
+        //               14  16
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(16.0, 14.0))
+                .unwrap(),
+            wraparound
+        );
+
+        // Intersecting a wraparound interval with a "smaller" wraparound 
interval
+        //           10         20
+        // <==========|          |============>
+        // <=====|                    |=======>
+        //       5                    25
+        // <=====|                    |=======>
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(25.0, 5.0))
+                .unwrap(),
+            WraparoundInterval::new(25.0, 5.0)
+        );
+
+        // Intersecting with partially intersecting wraparounds
+        //           10         20
+        // <==========|          |============>
+        // <=====|          |=================>
+        //       5          15
+        // <=====|               |============>
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(15.0, 5.0))
+                .unwrap(),
+            WraparoundInterval::new(20.0, 5.0)
+        );
+
+        //           10         20
+        // <==========|          |============>
+        // <================|          |======>
+        //                  15         25
+        // <==========|                |======>
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(25.0, 15.0))
+                .unwrap(),
+            WraparoundInterval::new(25.0, 10.0)
+        );
+
+        // Intersecting wraparound with that would require >1 interval to 
represent
+        //           10         20
+        // <==========|          |=========================>
+        // <=============================|          |======>
+        //                               25         30
+        // <==========|          |=======|          |======>
+        wraparound
+            .intersection(&WraparoundInterval::new(30.0, 25.0))
+            .unwrap_err();
+
+        //                    10         20
+        // <===================|          |================>
+        // <==|          |=================================>
+        //    0          5
+        // <==|          |=====|          |================>
+        wraparound
+            .intersection(&WraparoundInterval::new(5.0, 0.0))
+            .unwrap_err();
+
+        // Intersecting wraparound with a regular interval completely 
contained by the original
+        //                  10         20
+        // <=================|          |==================>
+        //                                   |=========|
+        //                                  25         30
+        //                                   |=========|
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(25.0, 30.0))
+                .unwrap(),
+            WraparoundInterval::new(25.0, 30.0)
+        );
+
+        //                  10         20
+        // <=================|          |==================>
+        //  |=========|
+        //  0         5
+        //  |=========|
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(0.0, 5.0))
+                .unwrap(),
+            WraparoundInterval::new(0.0, 5.0)
+        );
+
+        // Intersecting wraparound with a partially intersecting regular 
interval that
+        // intersects the left side
+        //                  10         20
+        // <=================|          |==================>
+        //              |=========|
+        //              5         15
+        //              |====|
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(5.0, 15.0))
+                .unwrap(),
+            WraparoundInterval::new(5.0, 10.0)
+        );
+
+        // Intersecting wraparound with a partially intersecting regular 
interval that
+        // intersects the right side
+        //                  10         20
+        // <=================|          |==================>
+        //                         |=========|
+        //                         15        25
+        //                              |====|
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(15.0, 25.0))
+                .unwrap(),
+            WraparoundInterval::new(20.0, 25.0)
+        );
+
+        // Intersecting wraparound with a disjoint regular interval
+        //                  10         20
+        // <=================|          |==================>
+        //                     |==|
+        //                    12  15
+        //
+        assert_eq!(
+            wraparound
+                .intersection(&WraparoundInterval::new(12.0, 15.0))
+                .unwrap(),
+            WraparoundInterval::empty()
+        );
+    }
+
     #[test]
     fn wraparound_interval_actually_wraparound_expand_by() {
         // Everything *except* the interval (10, 20)
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index ece17adf..4119280d 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -63,6 +63,7 @@ geo-types = { workspace = true }
 object_store = { workspace = true }
 parking_lot = { workspace = true }
 sedona-common = { workspace = true }
+sedona-datasource = { workspace = true }
 sedona-expr = { workspace = true }
 sedona-functions = { workspace = true }
 sedona-geo = { workspace = true, optional = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 3903cfa9..7723c20f 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -14,7 +14,10 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use std::{collections::VecDeque, sync::Arc};
+use std::{
+    collections::{HashMap, VecDeque},
+    sync::Arc,
+};
 
 use crate::exec::create_plan_from_sql;
 use crate::object_storage::ensure_object_store_registered_with_options;
@@ -40,6 +43,8 @@ use datafusion_expr::sqlparser::dialect::{dialect_from_str, 
Dialect};
 use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, SortExpr};
 use parking_lot::Mutex;
 use sedona_common::option::add_sedona_option_extension;
+use sedona_datasource::provider::external_listing_table;
+use sedona_datasource::spec::ExternalFormatSpec;
 use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
 use sedona_expr::{function_set::FunctionSet, scalar_udf::ScalarKernelRef};
 use sedona_geoparquet::options::TableGeoParquetOptions;
@@ -258,6 +263,43 @@ impl SedonaContext {
 
         self.ctx.read_table(Arc::new(provider))
     }
+
+    /// Creates a [`DataFrame`] for reading a [ExternalFormatSpec]
+    pub async fn read_external_format<P: DataFilePaths>(
+        &self,
+        spec: Arc<dyn ExternalFormatSpec>,
+        table_paths: P,
+        options: Option<&HashMap<String, String>>,
+        check_extension: bool,
+    ) -> Result<DataFrame> {
+        let urls = table_paths.to_urls()?;
+
+        // Pre-register object store with our custom options before creating 
GeoParquetReadOptions
+        if !urls.is_empty() {
+            // Extract the table options from GeoParquetReadOptions for object 
store registration
+            ensure_object_store_registered_with_options(
+                &mut self.ctx.state(),
+                urls[0].as_str(),
+                options,
+            )
+            .await?;
+        }
+
+        let provider = if let Some(options) = options {
+            // Strip the filesystem-based options
+            let options_without_filesystems = options
+                .iter()
+                .filter(|(k, _)| !k.starts_with("gcs.") && 
!k.starts_with("aws."))
+                .map(|(k, v)| (k.clone(), v.clone()))
+                .collect::<HashMap<String, String>>();
+            let spec = spec.with_options(&options_without_filesystems)?;
+            external_listing_table(spec, &self.ctx, urls, 
check_extension).await?
+        } else {
+            external_listing_table(spec, &self.ctx, urls, 
check_extension).await?
+        };
+
+        self.ctx.read_table(Arc::new(provider))
+    }
 }
 
 impl Default for SedonaContext {
@@ -468,11 +510,14 @@ impl ThreadSafeDialect {
 #[cfg(test)]
 mod tests {
 
-    use arrow_schema::DataType;
+    use arrow_array::{create_array, ArrayRef, RecordBatchIterator, 
RecordBatchReader};
+    use arrow_schema::{DataType, Field, Schema};
     use datafusion::assert_batches_eq;
+    use sedona_datasource::spec::{Object, OpenReaderArgs};
     use sedona_schema::{
         crs::lnglat,
         datatypes::{Edges, SedonaType},
+        schema::SedonaSchema,
     };
     use sedona_testing::data::test_geoparquet;
     use tempfile::tempdir;
@@ -579,20 +624,122 @@ mod tests {
         // GeoParquet files
         let ctx = SedonaContext::new_local_interactive().await.unwrap();
         let example = test_geoparquet("example", "geometry").unwrap();
-        let df = ctx.ctx.table(example).await.unwrap();
-        let sedona_types: Result<Vec<_>> = df
+        let df = ctx.ctx.table(example.clone()).await.unwrap();
+        let sedona_types = df
             .schema()
-            .as_arrow()
-            .fields()
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect();
-        let sedona_types = sedona_types.unwrap();
+            .sedona_types()
+            .collect::<Result<Vec<_>>>()
+            .unwrap();
         assert_eq!(sedona_types.len(), 2);
         assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
         assert_eq!(
             sedona_types[1],
             SedonaType::WkbView(Edges::Planar, lnglat())
         );
+
+        // Ensure read_parquet() works
+        let df = ctx
+            .read_parquet(example.clone(), GeoParquetReadOptions::default())
+            .await
+            .unwrap();
+        let sedona_types = df
+            .schema()
+            .sedona_types()
+            .collect::<Result<Vec<_>>>()
+            .unwrap();
+        assert_eq!(sedona_types.len(), 2);
+        assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+        assert_eq!(
+            sedona_types[1],
+            SedonaType::WkbView(Edges::Planar, lnglat())
+        );
+    }
+
+    #[derive(Debug)]
+    struct ExampleSpec {}
+
+    #[async_trait]
+    impl ExternalFormatSpec for ExampleSpec {
+        async fn infer_schema(&self, _location: &Object) -> Result<Schema> {
+            Ok(Schema::new(vec![Field::new("x", DataType::Utf8, true)]))
+        }
+
+        async fn open_reader(
+            &self,
+            _args: &OpenReaderArgs,
+        ) -> Result<Box<dyn RecordBatchReader + Send>> {
+            let batch = RecordBatch::try_from_iter([(
+                "x",
+                create_array!(Utf8, ["one", "two", "three", "four"]) as 
ArrayRef,
+            )])
+            .unwrap();
+            let schema = batch.schema();
+            Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
+        }
+
+        fn with_options(
+            &self,
+            options: &HashMap<String, String>,
+        ) -> Result<Arc<dyn ExternalFormatSpec>> {
+            // Ensure we fail if we see any key/value options to ensure 
aws/gcs options
+            // are stripped.
+            if !options.is_empty() {
+                return not_impl_err!("key/value options not implemented");
+            }
+
+            Ok(Arc::new(Self {}))
+        }
+    }
+
+    #[tokio::test]
+    async fn external_format() {
+        let ctx = SedonaContext::new_local_interactive().await.unwrap();
+        let spec = Arc::new(ExampleSpec {});
+        let file_that_exists = test_geoparquet("example", "geometry").unwrap();
+
+        // Ensure read_external_format() works
+        let df = ctx
+            .read_external_format(spec.clone(), file_that_exists.clone(), 
None, false)
+            .await
+            .unwrap();
+        let batches = df.collect().await.unwrap();
+
+        assert_batches_eq!(
+            [
+                "+-------+",
+                "| x     |",
+                "+-------+",
+                "| one   |",
+                "| two   |",
+                "| three |",
+                "| four  |",
+                "+-------+",
+            ],
+            &batches
+        );
+
+        // Ensure that key/value options used by aws/gcs are stripped
+        let kv_options = HashMap::from([("key".to_string(), 
"value".to_string())]);
+        ctx.read_external_format(
+            spec.clone(),
+            file_that_exists.clone(),
+            Some(&kv_options),
+            false,
+        )
+        .await
+        .expect_err("should error for unsupported key/value options");
+
+        let kv_options = HashMap::from([
+            ("gcs.something".to_string(), "value".to_string()),
+            ("aws.something".to_string(), "value".to_string()),
+        ]);
+        ctx.read_external_format(
+            spec.clone(),
+            file_that_exists.clone(),
+            Some(&kv_options),
+            false,
+        )
+        .await
+        .expect("should succeed because aws and gcs options were stripped");
     }
 }

Reply via email to