This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 964c4dd  feat(rust/sedona-datasource): Implement generic 
RecordBatchReader-based format (#251)
964c4dd is described below

commit 964c4dd867f72b723417f63e1660462e0015d497
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Nov 7 14:46:13 2025 -0600

    feat(rust/sedona-datasource): Implement generic RecordBatchReader-based 
format (#251)
---
 Cargo.lock                               |  23 +
 Cargo.toml                               |   7 +-
 python/sedonadb/python/sedonadb/dbapi.py |   2 +-
 rust/sedona-datasource/Cargo.toml        |  50 +++
 rust/sedona-datasource/src/format.rs     | 710 +++++++++++++++++++++++++++++++
 rust/sedona-datasource/src/lib.rs        |  20 +
 rust/sedona-datasource/src/provider.rs   | 112 +++++
 rust/sedona-datasource/src/spec.rs       | 197 +++++++++
 8 files changed, 1117 insertions(+), 4 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index bf0e67e..1053b24 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4893,6 +4893,29 @@ dependencies = [
  "regex",
 ]
 
+[[package]]
+name = "sedona-datasource"
+version = "0.2.0"
+dependencies = [
+ "arrow-array",
+ "arrow-schema",
+ "async-trait",
+ "datafusion",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "futures",
+ "object_store",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+ "tempfile",
+ "tokio",
+ "url",
+]
+
 [[package]]
 name = "sedona-expr"
 version = "0.2.0"
diff --git a/Cargo.toml b/Cargo.toml
index 5d4bc5c..deb8c75 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,12 +21,14 @@ members = [
     "c/sedona-proj",
     "c/sedona-s2geography",
     "c/sedona-tg",
+    "python/sedonadb",
     "r/sedonadb/src/rust",
-    "rust/sedona-geo-traits-ext",
-    "rust/sedona-geo-generic-alg",
     "rust/sedona-adbc",
+    "rust/sedona-datasource",
     "rust/sedona-expr",
     "rust/sedona-functions",
+    "rust/sedona-geo-generic-alg",
+    "rust/sedona-geo-traits-ext",
     "rust/sedona-geo",
     "rust/sedona-geometry",
     "rust/sedona-geoparquet",
@@ -36,7 +38,6 @@ members = [
     "rust/sedona-spatial-join",
     "rust/sedona-testing",
     "rust/sedona",
-    "python/sedonadb",
     "sedona-cli",
 ]
 resolver = "2"
diff --git a/python/sedonadb/python/sedonadb/dbapi.py 
b/python/sedonadb/python/sedonadb/dbapi.py
index 968b1b0..22596f3 100644
--- a/python/sedonadb/python/sedonadb/dbapi.py
+++ b/python/sedonadb/python/sedonadb/dbapi.py
@@ -38,7 +38,7 @@ def connect(**kwargs: Mapping[str, Any]) -> "Connection":
 
         >>> con = sedona.dbapi.connect()
         >>> with con.cursor() as cur:
-        ...     cur.execute("SELECT 1 as one")
+        ...     _ = cur.execute("SELECT 1 as one")
         ...     cur.fetchall()
         [(1,)]
     """
diff --git a/rust/sedona-datasource/Cargo.toml 
b/rust/sedona-datasource/Cargo.toml
new file mode 100644
index 0000000..de0dfff
--- /dev/null
+++ b/rust/sedona-datasource/Cargo.toml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-datasource"
+version.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[features]
+default = []
+
+[dev-dependencies]
+url = { workspace = true }
+tempfile = { workspace = true }
+tokio = { workspace = true }
+
+[dependencies]
+async-trait = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-array = { workspace = true }
+datafusion = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+futures = { workspace = true }
+object_store = { workspace = true }
+sedona-common = { path = "../sedona-common" }
+sedona-expr = { path = "../sedona-expr" }
+sedona-schema = { path = "../sedona-schema" }
diff --git a/rust/sedona-datasource/src/format.rs 
b/rust/sedona-datasource/src/format.rs
new file mode 100644
index 0000000..5e6d531
--- /dev/null
+++ b/rust/sedona-datasource/src/format.rs
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
+
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{
+    config::ConfigOptions,
+    datasource::{
+        file_format::{file_compression_type::FileCompressionType, FileFormat, 
FileFormatFactory},
+        listing::PartitionedFile,
+        physical_plan::{
+            FileGroupPartitioner, FileMeta, FileOpenFuture, FileOpener, 
FileScanConfig,
+            FileSinkConfig, FileSource,
+        },
+    },
+};
+use datafusion_catalog::{memory::DataSourceExec, Session};
+use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result, 
Statistics};
+use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr};
+use datafusion_physical_plan::{
+    filter_pushdown::{FilterPushdownPropagation, PushedDown},
+    metrics::ExecutionPlanMetricsSet,
+    ExecutionPlan,
+};
+use futures::{StreamExt, TryStreamExt};
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::spec::{ExternalFormatSpec, Object, OpenReaderArgs, 
SupportsRepartition};
+
+/// Create a [FileFormatFactory] from a [ExternalFormatSpec]
+///
+/// The FileFormatFactory is the object that may be registered with a
+/// SessionStateBuilder to allow SQL queries to access this format.
+#[derive(Debug)]
+pub struct ExternalFormatFactory {
+    spec: Arc<dyn ExternalFormatSpec>,
+}
+
+impl ExternalFormatFactory {
+    pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+        Self { spec }
+    }
+}
+
+impl FileFormatFactory for ExternalFormatFactory {
+    fn create(
+        &self,
+        _state: &dyn Session,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>> {
+        Ok(Arc::new(ExternalFileFormat {
+            spec: self.spec.with_options(format_options)?,
+        }))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(ExternalFileFormat {
+            spec: self.spec.clone(),
+        })
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+}
+
+impl GetExt for ExternalFormatFactory {
+    fn get_ext(&self) -> String {
+        self.spec.extension().to_string()
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct ExternalFileFormat {
+    spec: Arc<dyn ExternalFormatSpec>,
+}
+
+impl ExternalFileFormat {
+    pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+        Self { spec }
+    }
+}
+
+#[async_trait]
+impl FileFormat for ExternalFileFormat {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_ext(&self) -> String {
+        self.spec.extension().to_string()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        _file_compression_type: &FileCompressionType,
+    ) -> Result<String> {
+        not_impl_err!("extension with compression type")
+    }
+
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        None
+    }
+
+    async fn infer_schema(
+        &self,
+        state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        objects: &[ObjectMeta],
+    ) -> Result<SchemaRef> {
+        let mut schemas: Vec<_> = futures::stream::iter(objects)
+            .map(|object| async move {
+                let schema = self
+                    .spec
+                    .infer_schema(&Object {
+                        store: Some(store.clone()),
+                        url: None,
+                        meta: Some(object.clone()),
+                        range: None,
+                    })
+                    .await?;
+                Ok::<_, DataFusionError>((object.location.clone(), schema))
+            })
+            .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
+            .buffered(state.config_options().execution.meta_fetch_concurrency)
+            .try_collect()
+            .await?;
+
+        schemas.sort_by(|(location1, _), (location2, _)| 
location1.cmp(location2));
+
+        let schemas = schemas
+            .into_iter()
+            .map(|(_, schema)| schema)
+            .collect::<Vec<_>>();
+
+        let schema = Schema::try_merge(schemas)?;
+        Ok(Arc::new(schema))
+    }
+
+    async fn infer_stats(
+        &self,
+        _state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        table_schema: SchemaRef,
+        object: &ObjectMeta,
+    ) -> Result<Statistics> {
+        self.spec
+            .infer_stats(
+                &Object {
+                    store: Some(store.clone()),
+                    url: None,
+                    meta: Some(object.clone()),
+                    range: None,
+                },
+                &table_schema,
+            )
+            .await
+    }
+
+    async fn create_physical_plan(
+        &self,
+        _state: &dyn Session,
+        config: FileScanConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(DataSourceExec::from_data_source(config))
+    }
+
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &dyn Session,
+        _conf: FileSinkConfig,
+        _order_requirements: Option<LexRequirement>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        not_impl_err!("writing not yet supported for ExternalFileFormat")
+    }
+
+    fn file_source(&self) -> Arc<dyn FileSource> {
+        Arc::new(ExternalFileSource::new(self.spec.clone()))
+    }
+}
+
+#[derive(Debug, Clone)]
+struct ExternalFileSource {
+    spec: Arc<dyn ExternalFormatSpec>,
+    batch_size: Option<usize>,
+    file_schema: Option<SchemaRef>,
+    file_projection: Option<Vec<usize>>,
+    filters: Vec<Arc<dyn PhysicalExpr>>,
+    metrics: ExecutionPlanMetricsSet,
+    projected_statistics: Option<Statistics>,
+}
+
+impl ExternalFileSource {
+    pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+        Self {
+            spec,
+            batch_size: None,
+            file_schema: None,
+            file_projection: None,
+            filters: Vec::new(),
+            metrics: ExecutionPlanMetricsSet::default(),
+            projected_statistics: None,
+        }
+    }
+}
+
+impl FileSource for ExternalFileSource {
+    fn create_file_opener(
+        &self,
+        store: Arc<dyn ObjectStore>,
+        base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        let args = OpenReaderArgs {
+            src: Object {
+                store: Some(store.clone()),
+                url: Some(base_config.object_store_url.clone()),
+                meta: None,
+                range: None,
+            },
+            batch_size: self.batch_size,
+            file_schema: self.file_schema.clone(),
+            file_projection: self.file_projection.clone(),
+            filters: self.filters.clone(),
+        };
+
+        Arc::new(ExternalFileOpener {
+            spec: self.spec.clone(),
+            args,
+        })
+    }
+
+    fn try_pushdown_filters(
+        &self,
+        filters: Vec<Arc<dyn PhysicalExpr>>,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
+        // Record any new filters
+        let num_filters = filters.len();
+        let mut new_filters = self.filters.clone();
+        new_filters.extend(filters);
+        let source = Self {
+            filters: new_filters,
+            ..self.clone()
+        };
+
+        // ...but don't indicate that we handled them so that the filters are
+        // applied by the other node.
+        Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
+            PushedDown::No;
+            num_filters
+        ])
+        .with_updated_node(Arc::new(source)))
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            batch_size: Some(batch_size),
+            ..self.clone()
+        })
+    }
+
+    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            file_schema: Some(schema),
+            ..self.clone()
+        })
+    }
+
+    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            file_projection: config.file_column_projection_indices(),
+            ..self.clone()
+        })
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        Arc::new(Self {
+            projected_statistics: Some(statistics),
+            ..self.clone()
+        })
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        &self.metrics
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        let statistics = &self.projected_statistics;
+        Ok(statistics
+            .clone()
+            .expect("projected_statistics must be set"))
+    }
+
+    fn file_type(&self) -> &str {
+        self.spec.extension()
+    }
+
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+        output_ordering: Option<LexOrdering>,
+        config: &FileScanConfig,
+    ) -> Result<Option<FileScanConfig>> {
+        match self.spec.supports_repartition() {
+            SupportsRepartition::None => Ok(None),
+            SupportsRepartition::ByRange => {
+                // Default implementation
+                if config.file_compression_type.is_compressed() || 
config.new_lines_in_values {
+                    return Ok(None);
+                }
+
+                let repartitioned_file_groups_option = 
FileGroupPartitioner::new()
+                    .with_target_partitions(target_partitions)
+                    .with_repartition_file_min_size(repartition_file_min_size)
+                    
.with_preserve_order_within_groups(output_ordering.is_some())
+                    .repartition_file_groups(&config.file_groups);
+
+                if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+                    let mut source = config.clone();
+                    source.file_groups = repartitioned_file_groups;
+                    return Ok(Some(source));
+                }
+                Ok(None)
+            }
+        }
+    }
+}
+
+#[derive(Debug, Clone)]
+struct ExternalFileOpener {
+    spec: Arc<dyn ExternalFormatSpec>,
+    args: OpenReaderArgs,
+}
+
+impl FileOpener for ExternalFileOpener {
+    fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> 
Result<FileOpenFuture> {
+        let mut self_clone = self.clone();
+        Ok(Box::pin(async move {
+            self_clone.args.src.meta.replace(file_meta.object_meta);
+            self_clone.args.src.range = file_meta.range;
+            let reader = self_clone.spec.open_reader(&self_clone.args).await?;
+            let stream =
+                futures::stream::iter(reader.into_iter().map(|batch| 
batch.map_err(Into::into)));
+            Ok(stream.boxed())
+        }))
+    }
+}
+
+#[cfg(test)]
+mod test {
+
+    use arrow_array::{
+        Int32Array, Int64Array, RecordBatch, RecordBatchIterator, 
RecordBatchReader, StringArray,
+    };
+    use arrow_schema::{DataType, Field};
+    use datafusion::{
+        assert_batches_eq,
+        datasource::listing::ListingTableUrl,
+        execution::SessionStateBuilder,
+        prelude::{col, lit, SessionContext},
+    };
+    use datafusion_common::plan_err;
+    use std::{
+        io::{Read, Write},
+        path::PathBuf,
+    };
+    use tempfile::TempDir;
+    use url::Url;
+
+    use crate::provider::external_listing_table;
+
+    use super::*;
+
+    fn create_echo_spec_ctx() -> SessionContext {
+        let spec = Arc::new(EchoSpec::default());
+        let factory = ExternalFormatFactory::new(spec.clone());
+
+        // Register the format
+        let mut state = SessionStateBuilder::new().build();
+        state.register_file_format(Arc::new(factory), true).unwrap();
+        SessionContext::new_with_state(state).enable_url_table()
+    }
+
+    fn create_echo_spec_temp_dir() -> (TempDir, Vec<PathBuf>) {
+        // Create a temporary directory with a few files with the declared 
extension
+        let temp_dir = TempDir::new().unwrap();
+        let temp_path = temp_dir.path();
+        let file0 = temp_path.join("item0.echospec");
+        std::fs::File::create(&file0)
+            .unwrap()
+            .write_all(b"not empty")
+            .unwrap();
+        let file1 = temp_path.join("item1.echospec");
+        std::fs::File::create(&file1)
+            .unwrap()
+            .write_all(b"not empty")
+            .unwrap();
+        (temp_dir, vec![file0, file1])
+    }
+
+    fn check_object_is_readable_file(location: &Object) {
+        let url = Url::parse(&location.to_url_string().unwrap()).expect("valid 
uri");
+        assert_eq!(url.scheme(), "file");
+        let path = url.to_file_path().expect("can extract file path");
+
+        let mut content = String::new();
+        std::fs::File::open(path)
+            .expect("url can't be opened")
+            .read_to_string(&mut content)
+            .expect("failed to read");
+        if content.is_empty() {
+            panic!("empty file at url {url}");
+        }
+    }
+
+    #[derive(Debug, Default, Clone)]
+    struct EchoSpec {
+        option_value: Option<String>,
+    }
+
+    #[async_trait]
+    impl ExternalFormatSpec for EchoSpec {
+        fn extension(&self) -> &str {
+            "echospec"
+        }
+
+        fn with_options(
+            &self,
+            options: &HashMap<String, String>,
+        ) -> Result<Arc<dyn ExternalFormatSpec>> {
+            let mut self_clone = self.clone();
+            for (k, v) in options {
+                if k == "option_value" {
+                    self_clone.option_value = Some(v.to_string());
+                } else {
+                    return plan_err!("Unsupported option for EchoSpec: '{k}'");
+                }
+            }
+
+            Ok(Arc::new(self_clone))
+        }
+
+        async fn infer_schema(&self, location: &Object) -> Result<Schema> {
+            check_object_is_readable_file(location);
+            Ok(Schema::new(vec![
+                Field::new("src", DataType::Utf8, true),
+                Field::new("batch_size", DataType::Int64, true),
+                Field::new("filter_count", DataType::Int32, true),
+                Field::new("option_value", DataType::Utf8, true),
+            ]))
+        }
+
+        async fn infer_stats(
+            &self,
+            location: &Object,
+            table_schema: &Schema,
+        ) -> Result<Statistics> {
+            check_object_is_readable_file(location);
+            Ok(Statistics::new_unknown(table_schema))
+        }
+
+        async fn open_reader(
+            &self,
+            args: &OpenReaderArgs,
+        ) -> Result<Box<dyn RecordBatchReader + Send>> {
+            check_object_is_readable_file(&args.src);
+
+            let src: StringArray = [args.src.clone()]
+                .iter()
+                .map(|item| Some(item.to_url_string().unwrap()))
+                .collect();
+            let batch_size: Int64Array = [args.batch_size]
+                .iter()
+                .map(|item| item.map(|i| i as i64))
+                .collect();
+            let filter_count: Int32Array = [args.filters.len() as 
i32].into_iter().collect();
+            let option_value: StringArray = 
[self.option_value.clone()].iter().collect();
+
+            let schema = Arc::new(self.infer_schema(&args.src).await?);
+            let mut batch = RecordBatch::try_new(
+                schema.clone(),
+                vec![
+                    Arc::new(src),
+                    Arc::new(batch_size),
+                    Arc::new(filter_count),
+                    Arc::new(option_value),
+                ],
+            )?;
+
+            if let Some(projection) = &args.file_projection {
+                batch = batch.project(projection)?;
+            }
+
+            Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
+        }
+    }
+
+    #[tokio::test]
+    async fn spec_format() {
+        let ctx = create_echo_spec_ctx();
+        let (temp_dir, files) = create_echo_spec_temp_dir();
+
+        // Select using just the filename and ensure we get a result
+        let batches_item0 = ctx
+            .table(files[0].to_string_lossy().to_string())
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        assert_eq!(batches_item0.len(), 1);
+        assert_eq!(batches_item0[0].num_rows(), 1);
+
+        // With a glob we should get all the files
+        let batches = ctx
+            .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy()))
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+        // We should get one value per partition
+        assert_eq!(batches.len(), 2);
+        assert_eq!(batches[0].num_rows(), 1);
+        assert_eq!(batches[1].num_rows(), 1);
+    }
+
+    #[tokio::test]
+    async fn spec_format_project_filter() {
+        let ctx = create_echo_spec_ctx();
+        let (temp_dir, _files) = create_echo_spec_temp_dir();
+
+        // Ensure that if we pass
+        let batches = ctx
+            .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy()))
+            .await
+            .unwrap()
+            .filter(col("src").like(lit("%item0%")))
+            .unwrap()
+            .select(vec![col("batch_size"), col("filter_count")])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        assert_batches_eq!(
+            [
+                "+------------+--------------+",
+                "| batch_size | filter_count |",
+                "+------------+--------------+",
+                "| 8192       | 1            |",
+                "+------------+--------------+",
+            ],
+            &batches
+        );
+    }
+
+    #[tokio::test]
+    async fn spec_listing_table() {
+        let spec = Arc::new(EchoSpec::default());
+        let ctx = SessionContext::new();
+        let (_temp_dir, files) = create_echo_spec_temp_dir();
+
+        // Select using a listing table and ensure we get a result
+        let provider = external_listing_table(
+            spec,
+            &ctx,
+            files
+                .iter()
+                .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+                .collect(),
+            true,
+        )
+        .await
+        .unwrap();
+
+        let batches = ctx
+            .read_table(Arc::new(provider))
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // We should get one value per partition
+        assert_eq!(batches.len(), 2);
+        assert_eq!(batches[0].num_rows(), 1);
+        assert_eq!(batches[1].num_rows(), 1);
+    }
+
+    #[tokio::test]
+    async fn spec_listing_table_options() {
+        let spec = Arc::new(EchoSpec::default())
+            .with_options(&[("option_value".to_string(), 
"foofy".to_string())].into())
+            .unwrap();
+
+        let ctx = SessionContext::new();
+        let (_temp_dir, files) = create_echo_spec_temp_dir();
+
+        // Select using a listing table and ensure we get a result with the 
option passed
+        let provider = external_listing_table(
+            spec,
+            &ctx,
+            files
+                .iter()
+                .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+                .collect(),
+            true,
+        )
+        .await
+        .unwrap();
+
+        let batches = ctx
+            .read_table(Arc::new(provider))
+            .unwrap()
+            .select(vec![col("batch_size"), col("option_value")])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+        assert_batches_eq!(
+            [
+                "+------------+--------------+",
+                "| batch_size | option_value |",
+                "+------------+--------------+",
+                "| 8192       | foofy        |",
+                "| 8192       | foofy        |",
+                "+------------+--------------+",
+            ],
+            &batches
+        );
+    }
+
+    #[tokio::test]
+    async fn spec_listing_table_errors() {
+        let spec = Arc::new(EchoSpec::default())
+            .with_options(&[("option_value".to_string(), 
"foofy".to_string())].into())
+            .unwrap();
+
+        let ctx = SessionContext::new();
+        let (temp_dir, mut files) = create_echo_spec_temp_dir();
+
+        // Listing table with no files should error
+        let err = external_listing_table(spec.clone(), &ctx, vec![], true)
+            .await
+            .unwrap_err();
+        assert_eq!(err.message(), "No table paths were provided");
+
+        // Create a file with a different extension
+        let file2 = temp_dir.path().join("item2.echospecNOT");
+        std::fs::File::create(&file2)
+            .unwrap()
+            .write_all(b"not empty")
+            .unwrap();
+        files.push(file2);
+
+        // With check_extension as true we should get an error
+        let err = external_listing_table(
+            spec.clone(),
+            &ctx,
+            files
+                .iter()
+                .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+                .collect(),
+            true,
+        )
+        .await
+        .unwrap_err();
+
+        assert!(err
+            .message()
+            .ends_with("does not match the expected extension 'echospec'"));
+
+        // ...but we should be able to turn off the error
+        external_listing_table(
+            spec,
+            &ctx,
+            files
+                .iter()
+                .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+                .collect(),
+            false,
+        )
+        .await
+        .unwrap();
+    }
+}
diff --git a/rust/sedona-datasource/src/lib.rs 
b/rust/sedona-datasource/src/lib.rs
new file mode 100644
index 0000000..4bdc596
--- /dev/null
+++ b/rust/sedona-datasource/src/lib.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod format;
+pub mod provider;
+pub mod spec;
diff --git a/rust/sedona-datasource/src/provider.rs 
b/rust/sedona-datasource/src/provider.rs
new file mode 100644
index 0000000..6b06a70
--- /dev/null
+++ b/rust/sedona-datasource/src/provider.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion::{
+    config::TableOptions,
+    datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, 
ListingTableUrl},
+    execution::{options::ReadOptions, SessionState},
+    prelude::{SessionConfig, SessionContext},
+};
+use datafusion_common::{exec_err, Result};
+
+use crate::{format::ExternalFileFormat, spec::ExternalFormatSpec};
+
+/// Create a [ListingTable] from an [ExternalFormatSpec] and one or more URLs
+///
+/// This can be used to resolve a format specification into a TableProvider 
that
+/// may be registered with a [SessionContext].
+pub async fn external_listing_table(
+    spec: Arc<dyn ExternalFormatSpec>,
+    context: &SessionContext,
+    table_paths: Vec<ListingTableUrl>,
+    check_extension: bool,
+) -> Result<ListingTable> {
+    let session_config = context.copied_config();
+    let options = RecordBatchReaderTableOptions {
+        spec,
+        check_extension,
+    };
+    let listing_options =
+        options.to_listing_options(&session_config, 
context.copied_table_options());
+
+    let option_extension = listing_options.file_extension.clone();
+
+    if table_paths.is_empty() {
+        return exec_err!("No table paths were provided");
+    }
+
+    // check if the file extension matches the expected extension if one is 
provided
+    if !option_extension.is_empty() && options.check_extension {
+        for path in &table_paths {
+            let file_path = path.as_str();
+            if !file_path.ends_with(option_extension.clone().as_str()) && 
!path.is_collection() {
+                return exec_err!(
+                        "File path '{file_path}' does not match the expected 
extension '{option_extension}'"
+                    );
+            }
+        }
+    }
+
+    let resolved_schema = options
+        .get_resolved_schema(&session_config, context.state(), 
table_paths[0].clone())
+        .await?;
+    let config = ListingTableConfig::new_with_multi_paths(table_paths)
+        .with_listing_options(listing_options)
+        .with_schema(resolved_schema);
+
+    ListingTable::try_new(config)
+}
+
+#[derive(Debug, Clone)]
+struct RecordBatchReaderTableOptions {
+    spec: Arc<dyn ExternalFormatSpec>,
+    check_extension: bool,
+}
+
+#[async_trait]
+impl ReadOptions<'_> for RecordBatchReaderTableOptions {
+    fn to_listing_options(
+        &self,
+        config: &SessionConfig,
+        table_options: TableOptions,
+    ) -> ListingOptions {
+        let format = if let Some(modified) = 
self.spec.with_table_options(&table_options) {
+            ExternalFileFormat::new(modified)
+        } else {
+            ExternalFileFormat::new(self.spec.clone())
+        };
+
+        ListingOptions::new(Arc::new(format))
+            .with_file_extension(self.spec.extension())
+            .with_session_config_options(config)
+    }
+
+    async fn get_resolved_schema(
+        &self,
+        config: &SessionConfig,
+        state: SessionState,
+        table_path: ListingTableUrl,
+    ) -> Result<SchemaRef> {
+        self.to_listing_options(config, state.default_table_options())
+            .infer_schema(&state, &table_path)
+            .await
+    }
+}
diff --git a/rust/sedona-datasource/src/spec.rs 
b/rust/sedona-datasource/src/spec.rs
new file mode 100644
index 0000000..a3abf8d
--- /dev/null
+++ b/rust/sedona-datasource/src/spec.rs
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, fmt::Debug, sync::Arc};
+
+use arrow_array::RecordBatchReader;
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::{config::TableOptions, datasource::listing::FileRange};
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_physical_expr::PhysicalExpr;
+use object_store::{ObjectMeta, ObjectStore};
+
+/// Simple file format specification
+///
+/// In DataFusion, various parts of the file format are split among the
+/// FileFormatFactory, the FileFormat, the FileSource, the FileOpener,
+/// and a few other traits. This trait is designed to provide a few
+/// important features of a natively implemented FileFormat but consolidating
+/// the components of implementing the format in the same place. This is
+/// intended to provide a less verbose way to implement readers for a wide
+/// variety of spatial formats.
+#[async_trait]
+pub trait ExternalFormatSpec: Debug + Send + Sync {
+    /// Infer a schema for a given file
+    ///
+    /// Given a single file, infer what schema 
[ExternalFormatSpec::open_reader]
+    /// would produce in the absence of any other guidance.
+    async fn infer_schema(&self, location: &Object) -> Result<Schema>;
+
+    /// Open a [RecordBatchReader] for a given file
+    ///
+    /// The implementation must handle the `file_projection`; however,
+    /// need not handle the `filters` (but may use them for pruning).
+    async fn open_reader(&self, args: &OpenReaderArgs)
+        -> Result<Box<dyn RecordBatchReader + Send>>;
+
+    /// A file extension or `""` if this concept does not apply
+    fn extension(&self) -> &str {
+        ""
+    }
+
+    /// Compute a clone of self but with the key/value options specified
+    ///
+    /// Implementations should error for invalid key/value input that does
+    /// not apply to this reader.
+    fn with_options(
+        &self,
+        options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn ExternalFormatSpec>>;
+
+    /// Fill in default options from [TableOptions]
+    ///
+    /// The TableOptions are a DataFusion concept that provide a means by which
+    /// options can be set for various table formats. If the defaults for a 
built-in
+    /// table format are reasonable to fill in or if Extensions have been set,
+    /// these can be accessed and used to fill default options. Note that any 
options
+    /// set with [ExternalFormatSpec::with_options] should take precedent.
+    fn with_table_options(
+        &self,
+        _table_options: &TableOptions,
+    ) -> Option<Arc<dyn ExternalFormatSpec>> {
+        None
+    }
+
+    /// Allow repartitioning
+    ///
+    /// This allows an implementation to opt in to DataFusion's built-in file 
size
+    /// based partitioner, which works well for partitioning files where a 
simple
+    /// file plus byte range is sufficient. The default opts out of this 
feature
+    /// (i.e., every file is passed exactly one to 
[ExternalFormatSpec::open_reader]
+    /// without a `range`).
+    fn supports_repartition(&self) -> SupportsRepartition {
+        SupportsRepartition::None
+    }
+
+    /// Infer [Statistics] for a given file
+    async fn infer_stats(&self, _location: &Object, table_schema: &Schema) -> 
Result<Statistics> {
+        Ok(Statistics::new_unknown(table_schema))
+    }
+}
+
+/// Enumerator for repartitioning support
+#[derive(Debug, Clone, Copy)]
+pub enum SupportsRepartition {
+    /// This implementation does not support repartitioning beyond the file 
level
+    None,
+    /// This implementation supports partitioning by arbitrary ranges with a 
file
+    ///
+    /// Implementations that return this must check [Object::range] when 
opening
+    /// a file and ensure that each record is read exactly once from one file
+    /// with potentially multiple ranges.
+    ByRange,
+}
+
+/// Arguments to [ExternalFormatSpec::open_reader]
+#[derive(Debug, Clone)]
+pub struct OpenReaderArgs {
+    /// The input file, or partial file if [SupportsRepartition::ByRange] is 
used
+    pub src: Object,
+
+    /// The requested batch size
+    ///
+    /// DataFusion will usually fill this in to a default of 8192 or a 
user-specified
+    /// default in the session configuration.
+    pub batch_size: Option<usize>,
+
+    /// The requested file schema, if specified
+    ///
+    /// DataFusion will usually fill this in to the schema inferred by
+    /// [ExternalFormatSpec::infer_schema].
+    pub file_schema: Option<SchemaRef>,
+
+    /// The requested field indices
+    ///
+    /// Implementations must handle this (e.g., using `RecordBatch::project`
+    /// or by implementing partial reads).
+    pub file_projection: Option<Vec<usize>>,
+
+    /// Filter expressions
+    ///
+    /// Expressions that may be used for pruning. Implementations need not
+    /// apply these filters.
+    pub filters: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+/// The information required to specify a file or partial file
+///
+/// Depending exactly where in DataFusion we are calling in from, we might
+/// have various information about the file. In general, implementations should
+/// use [ObjectStore] and [ObjectMeta] to access the file to use DataFusion's
+/// registered IO for these protocols. When implementing a filename-based 
reader
+/// (e.g., that uses some external API to read files), use 
[Object::to_url_string].
+#[derive(Debug, Clone)]
+pub struct Object {
+    /// The object store reference
+    pub store: Option<Arc<dyn ObjectStore>>,
+
+    /// A URL that may be used to retrieve an [ObjectStore] from a registry
+    ///
+    /// These URLs typically are populated only with the scheme.
+    pub url: Option<ObjectStoreUrl>,
+
+    /// An individual object in an ObjectStore
+    pub meta: Option<ObjectMeta>,
+
+    /// If this represents a partial file, the byte range within the file
+    ///
+    /// This is only set if partitioning other than `None` is provided
+    pub range: Option<FileRange>,
+}
+
+impl Object {
+    /// Convert this object to a URL string, if possible
+    ///
+    /// Returns `None` if there is not sufficient information in the Object to 
calculate
+    /// this.
+    pub fn to_url_string(&self) -> Option<String> {
+        match (&self.url, &self.meta) {
+            (None, Some(meta)) => {
+                // There's no great way to map an object_store to a url prefix 
if we're not
+                // provided the `url`; however, this is what we have access to 
in the
+                // Schema and Statistics resolution phases of the FileFormat.
+                // This is a heuristic that should work for https and a local 
filesystem,
+                // which is what we might be able to expect a non-DataFusion 
system like
+                // GDAL to be able to translate.
+                let object_store_debug = format!("{:?}", 
self.store).to_lowercase();
+                if object_store_debug.contains("http") {
+                    Some(format!("https://{}";, meta.location))
+                } else if object_store_debug.contains("local") {
+                    Some(format!("file:///{}", meta.location))
+                } else {
+                    None
+                }
+            }
+            (Some(url), None) => Some(url.to_string()),
+            (Some(url), Some(meta)) => Some(format!("{url}/{}", 
meta.location)),
+            (None, None) => None,
+        }
+    }
+}


Reply via email to