This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 964c4dd feat(rust/sedona-datasource): Implement generic
RecordBatchReader-based format (#251)
964c4dd is described below
commit 964c4dd867f72b723417f63e1660462e0015d497
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Nov 7 14:46:13 2025 -0600
feat(rust/sedona-datasource): Implement generic RecordBatchReader-based
format (#251)
---
Cargo.lock | 23 +
Cargo.toml | 7 +-
python/sedonadb/python/sedonadb/dbapi.py | 2 +-
rust/sedona-datasource/Cargo.toml | 50 +++
rust/sedona-datasource/src/format.rs | 710 +++++++++++++++++++++++++++++++
rust/sedona-datasource/src/lib.rs | 20 +
rust/sedona-datasource/src/provider.rs | 112 +++++
rust/sedona-datasource/src/spec.rs | 197 +++++++++
8 files changed, 1117 insertions(+), 4 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index bf0e67e..1053b24 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4893,6 +4893,29 @@ dependencies = [
"regex",
]
+[[package]]
+name = "sedona-datasource"
+version = "0.2.0"
+dependencies = [
+ "arrow-array",
+ "arrow-schema",
+ "async-trait",
+ "datafusion",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "futures",
+ "object_store",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+ "tempfile",
+ "tokio",
+ "url",
+]
+
[[package]]
name = "sedona-expr"
version = "0.2.0"
diff --git a/Cargo.toml b/Cargo.toml
index 5d4bc5c..deb8c75 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -21,12 +21,14 @@ members = [
"c/sedona-proj",
"c/sedona-s2geography",
"c/sedona-tg",
+ "python/sedonadb",
"r/sedonadb/src/rust",
- "rust/sedona-geo-traits-ext",
- "rust/sedona-geo-generic-alg",
"rust/sedona-adbc",
+ "rust/sedona-datasource",
"rust/sedona-expr",
"rust/sedona-functions",
+ "rust/sedona-geo-generic-alg",
+ "rust/sedona-geo-traits-ext",
"rust/sedona-geo",
"rust/sedona-geometry",
"rust/sedona-geoparquet",
@@ -36,7 +38,6 @@ members = [
"rust/sedona-spatial-join",
"rust/sedona-testing",
"rust/sedona",
- "python/sedonadb",
"sedona-cli",
]
resolver = "2"
diff --git a/python/sedonadb/python/sedonadb/dbapi.py
b/python/sedonadb/python/sedonadb/dbapi.py
index 968b1b0..22596f3 100644
--- a/python/sedonadb/python/sedonadb/dbapi.py
+++ b/python/sedonadb/python/sedonadb/dbapi.py
@@ -38,7 +38,7 @@ def connect(**kwargs: Mapping[str, Any]) -> "Connection":
>>> con = sedona.dbapi.connect()
>>> with con.cursor() as cur:
- ... cur.execute("SELECT 1 as one")
+ ... _ = cur.execute("SELECT 1 as one")
... cur.fetchall()
[(1,)]
"""
diff --git a/rust/sedona-datasource/Cargo.toml
b/rust/sedona-datasource/Cargo.toml
new file mode 100644
index 0000000..de0dfff
--- /dev/null
+++ b/rust/sedona-datasource/Cargo.toml
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-datasource"
+version.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[features]
+default = []
+
+[dev-dependencies]
+url = { workspace = true }
+tempfile = { workspace = true }
+tokio = { workspace = true }
+
+[dependencies]
+async-trait = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-array = { workspace = true }
+datafusion = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+futures = { workspace = true }
+object_store = { workspace = true }
+sedona-common = { path = "../sedona-common" }
+sedona-expr = { path = "../sedona-expr" }
+sedona-schema = { path = "../sedona-schema" }
diff --git a/rust/sedona-datasource/src/format.rs
b/rust/sedona-datasource/src/format.rs
new file mode 100644
index 0000000..5e6d531
--- /dev/null
+++ b/rust/sedona-datasource/src/format.rs
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, fmt::Debug, sync::Arc};
+
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{
+ config::ConfigOptions,
+ datasource::{
+ file_format::{file_compression_type::FileCompressionType, FileFormat,
FileFormatFactory},
+ listing::PartitionedFile,
+ physical_plan::{
+ FileGroupPartitioner, FileMeta, FileOpenFuture, FileOpener,
FileScanConfig,
+ FileSinkConfig, FileSource,
+ },
+ },
+};
+use datafusion_catalog::{memory::DataSourceExec, Session};
+use datafusion_common::{not_impl_err, DataFusionError, GetExt, Result,
Statistics};
+use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExpr};
+use datafusion_physical_plan::{
+ filter_pushdown::{FilterPushdownPropagation, PushedDown},
+ metrics::ExecutionPlanMetricsSet,
+ ExecutionPlan,
+};
+use futures::{StreamExt, TryStreamExt};
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::spec::{ExternalFormatSpec, Object, OpenReaderArgs,
SupportsRepartition};
+
+/// Create a [FileFormatFactory] from a [ExternalFormatSpec]
+///
+/// The FileFormatFactory is the object that may be registered with a
+/// SessionStateBuilder to allow SQL queries to access this format.
+#[derive(Debug)]
+pub struct ExternalFormatFactory {
+ spec: Arc<dyn ExternalFormatSpec>,
+}
+
+impl ExternalFormatFactory {
+ pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+ Self { spec }
+ }
+}
+
+impl FileFormatFactory for ExternalFormatFactory {
+ fn create(
+ &self,
+ _state: &dyn Session,
+ format_options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn FileFormat>> {
+ Ok(Arc::new(ExternalFileFormat {
+ spec: self.spec.with_options(format_options)?,
+ }))
+ }
+
+ fn default(&self) -> Arc<dyn FileFormat> {
+ Arc::new(ExternalFileFormat {
+ spec: self.spec.clone(),
+ })
+ }
+
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+}
+
+impl GetExt for ExternalFormatFactory {
+ fn get_ext(&self) -> String {
+ self.spec.extension().to_string()
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct ExternalFileFormat {
+ spec: Arc<dyn ExternalFormatSpec>,
+}
+
+impl ExternalFileFormat {
+ pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+ Self { spec }
+ }
+}
+
+#[async_trait]
+impl FileFormat for ExternalFileFormat {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_ext(&self) -> String {
+ self.spec.extension().to_string()
+ }
+
+ fn get_ext_with_compression(
+ &self,
+ _file_compression_type: &FileCompressionType,
+ ) -> Result<String> {
+ not_impl_err!("extension with compression type")
+ }
+
+ fn compression_type(&self) -> Option<FileCompressionType> {
+ None
+ }
+
+ async fn infer_schema(
+ &self,
+ state: &dyn Session,
+ store: &Arc<dyn ObjectStore>,
+ objects: &[ObjectMeta],
+ ) -> Result<SchemaRef> {
+ let mut schemas: Vec<_> = futures::stream::iter(objects)
+ .map(|object| async move {
+ let schema = self
+ .spec
+ .infer_schema(&Object {
+ store: Some(store.clone()),
+ url: None,
+ meta: Some(object.clone()),
+ range: None,
+ })
+ .await?;
+ Ok::<_, DataFusionError>((object.location.clone(), schema))
+ })
+ .boxed() // Workaround
https://github.com/rust-lang/rust/issues/64552
+ .buffered(state.config_options().execution.meta_fetch_concurrency)
+ .try_collect()
+ .await?;
+
+ schemas.sort_by(|(location1, _), (location2, _)|
location1.cmp(location2));
+
+ let schemas = schemas
+ .into_iter()
+ .map(|(_, schema)| schema)
+ .collect::<Vec<_>>();
+
+ let schema = Schema::try_merge(schemas)?;
+ Ok(Arc::new(schema))
+ }
+
+ async fn infer_stats(
+ &self,
+ _state: &dyn Session,
+ store: &Arc<dyn ObjectStore>,
+ table_schema: SchemaRef,
+ object: &ObjectMeta,
+ ) -> Result<Statistics> {
+ self.spec
+ .infer_stats(
+ &Object {
+ store: Some(store.clone()),
+ url: None,
+ meta: Some(object.clone()),
+ range: None,
+ },
+ &table_schema,
+ )
+ .await
+ }
+
+ async fn create_physical_plan(
+ &self,
+ _state: &dyn Session,
+ config: FileScanConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(DataSourceExec::from_data_source(config))
+ }
+
+ async fn create_writer_physical_plan(
+ &self,
+ _input: Arc<dyn ExecutionPlan>,
+ _state: &dyn Session,
+ _conf: FileSinkConfig,
+ _order_requirements: Option<LexRequirement>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ not_impl_err!("writing not yet supported for ExternalFileFormat")
+ }
+
+ fn file_source(&self) -> Arc<dyn FileSource> {
+ Arc::new(ExternalFileSource::new(self.spec.clone()))
+ }
+}
+
+#[derive(Debug, Clone)]
+struct ExternalFileSource {
+ spec: Arc<dyn ExternalFormatSpec>,
+ batch_size: Option<usize>,
+ file_schema: Option<SchemaRef>,
+ file_projection: Option<Vec<usize>>,
+ filters: Vec<Arc<dyn PhysicalExpr>>,
+ metrics: ExecutionPlanMetricsSet,
+ projected_statistics: Option<Statistics>,
+}
+
+impl ExternalFileSource {
+ pub fn new(spec: Arc<dyn ExternalFormatSpec>) -> Self {
+ Self {
+ spec,
+ batch_size: None,
+ file_schema: None,
+ file_projection: None,
+ filters: Vec::new(),
+ metrics: ExecutionPlanMetricsSet::default(),
+ projected_statistics: None,
+ }
+ }
+}
+
+impl FileSource for ExternalFileSource {
+ fn create_file_opener(
+ &self,
+ store: Arc<dyn ObjectStore>,
+ base_config: &FileScanConfig,
+ _partition: usize,
+ ) -> Arc<dyn FileOpener> {
+ let args = OpenReaderArgs {
+ src: Object {
+ store: Some(store.clone()),
+ url: Some(base_config.object_store_url.clone()),
+ meta: None,
+ range: None,
+ },
+ batch_size: self.batch_size,
+ file_schema: self.file_schema.clone(),
+ file_projection: self.file_projection.clone(),
+ filters: self.filters.clone(),
+ };
+
+ Arc::new(ExternalFileOpener {
+ spec: self.spec.clone(),
+ args,
+ })
+ }
+
+ fn try_pushdown_filters(
+ &self,
+ filters: Vec<Arc<dyn PhysicalExpr>>,
+ _config: &ConfigOptions,
+ ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
+ // Record any new filters
+ let num_filters = filters.len();
+ let mut new_filters = self.filters.clone();
+ new_filters.extend(filters);
+ let source = Self {
+ filters: new_filters,
+ ..self.clone()
+ };
+
+ // ...but don't indicate that we handled them so that the filters are
+ // applied by the other node.
+ Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
+ PushedDown::No;
+ num_filters
+ ])
+ .with_updated_node(Arc::new(source)))
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
+ Arc::new(Self {
+ batch_size: Some(batch_size),
+ ..self.clone()
+ })
+ }
+
+ fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
+ Arc::new(Self {
+ file_schema: Some(schema),
+ ..self.clone()
+ })
+ }
+
+ fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
+ Arc::new(Self {
+ file_projection: config.file_column_projection_indices(),
+ ..self.clone()
+ })
+ }
+
+ fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+ Arc::new(Self {
+ projected_statistics: Some(statistics),
+ ..self.clone()
+ })
+ }
+
+ fn metrics(&self) -> &ExecutionPlanMetricsSet {
+ &self.metrics
+ }
+
+ fn statistics(&self) -> Result<Statistics> {
+ let statistics = &self.projected_statistics;
+ Ok(statistics
+ .clone()
+ .expect("projected_statistics must be set"))
+ }
+
+ fn file_type(&self) -> &str {
+ self.spec.extension()
+ }
+
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ repartition_file_min_size: usize,
+ output_ordering: Option<LexOrdering>,
+ config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>> {
+ match self.spec.supports_repartition() {
+ SupportsRepartition::None => Ok(None),
+ SupportsRepartition::ByRange => {
+ // Default implementation
+ if config.file_compression_type.is_compressed() ||
config.new_lines_in_values {
+ return Ok(None);
+ }
+
+ let repartitioned_file_groups_option =
FileGroupPartitioner::new()
+ .with_target_partitions(target_partitions)
+ .with_repartition_file_min_size(repartition_file_min_size)
+
.with_preserve_order_within_groups(output_ordering.is_some())
+ .repartition_file_groups(&config.file_groups);
+
+ if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
+ let mut source = config.clone();
+ source.file_groups = repartitioned_file_groups;
+ return Ok(Some(source));
+ }
+ Ok(None)
+ }
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+struct ExternalFileOpener {
+ spec: Arc<dyn ExternalFormatSpec>,
+ args: OpenReaderArgs,
+}
+
+impl FileOpener for ExternalFileOpener {
+ fn open(&self, file_meta: FileMeta, _file: PartitionedFile) ->
Result<FileOpenFuture> {
+ let mut self_clone = self.clone();
+ Ok(Box::pin(async move {
+ self_clone.args.src.meta.replace(file_meta.object_meta);
+ self_clone.args.src.range = file_meta.range;
+ let reader = self_clone.spec.open_reader(&self_clone.args).await?;
+ let stream =
+ futures::stream::iter(reader.into_iter().map(|batch|
batch.map_err(Into::into)));
+ Ok(stream.boxed())
+ }))
+ }
+}
+
+#[cfg(test)]
+mod test {
+
+ use arrow_array::{
+ Int32Array, Int64Array, RecordBatch, RecordBatchIterator,
RecordBatchReader, StringArray,
+ };
+ use arrow_schema::{DataType, Field};
+ use datafusion::{
+ assert_batches_eq,
+ datasource::listing::ListingTableUrl,
+ execution::SessionStateBuilder,
+ prelude::{col, lit, SessionContext},
+ };
+ use datafusion_common::plan_err;
+ use std::{
+ io::{Read, Write},
+ path::PathBuf,
+ };
+ use tempfile::TempDir;
+ use url::Url;
+
+ use crate::provider::external_listing_table;
+
+ use super::*;
+
+ fn create_echo_spec_ctx() -> SessionContext {
+ let spec = Arc::new(EchoSpec::default());
+ let factory = ExternalFormatFactory::new(spec.clone());
+
+ // Register the format
+ let mut state = SessionStateBuilder::new().build();
+ state.register_file_format(Arc::new(factory), true).unwrap();
+ SessionContext::new_with_state(state).enable_url_table()
+ }
+
+ fn create_echo_spec_temp_dir() -> (TempDir, Vec<PathBuf>) {
+ // Create a temporary directory with a few files with the declared
extension
+ let temp_dir = TempDir::new().unwrap();
+ let temp_path = temp_dir.path();
+ let file0 = temp_path.join("item0.echospec");
+ std::fs::File::create(&file0)
+ .unwrap()
+ .write_all(b"not empty")
+ .unwrap();
+ let file1 = temp_path.join("item1.echospec");
+ std::fs::File::create(&file1)
+ .unwrap()
+ .write_all(b"not empty")
+ .unwrap();
+ (temp_dir, vec![file0, file1])
+ }
+
+ fn check_object_is_readable_file(location: &Object) {
+ let url = Url::parse(&location.to_url_string().unwrap()).expect("valid
uri");
+ assert_eq!(url.scheme(), "file");
+ let path = url.to_file_path().expect("can extract file path");
+
+ let mut content = String::new();
+ std::fs::File::open(path)
+ .expect("url can't be opened")
+ .read_to_string(&mut content)
+ .expect("failed to read");
+ if content.is_empty() {
+ panic!("empty file at url {url}");
+ }
+ }
+
+ #[derive(Debug, Default, Clone)]
+ struct EchoSpec {
+ option_value: Option<String>,
+ }
+
+ #[async_trait]
+ impl ExternalFormatSpec for EchoSpec {
+ fn extension(&self) -> &str {
+ "echospec"
+ }
+
+ fn with_options(
+ &self,
+ options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn ExternalFormatSpec>> {
+ let mut self_clone = self.clone();
+ for (k, v) in options {
+ if k == "option_value" {
+ self_clone.option_value = Some(v.to_string());
+ } else {
+ return plan_err!("Unsupported option for EchoSpec: '{k}'");
+ }
+ }
+
+ Ok(Arc::new(self_clone))
+ }
+
+ async fn infer_schema(&self, location: &Object) -> Result<Schema> {
+ check_object_is_readable_file(location);
+ Ok(Schema::new(vec![
+ Field::new("src", DataType::Utf8, true),
+ Field::new("batch_size", DataType::Int64, true),
+ Field::new("filter_count", DataType::Int32, true),
+ Field::new("option_value", DataType::Utf8, true),
+ ]))
+ }
+
+ async fn infer_stats(
+ &self,
+ location: &Object,
+ table_schema: &Schema,
+ ) -> Result<Statistics> {
+ check_object_is_readable_file(location);
+ Ok(Statistics::new_unknown(table_schema))
+ }
+
+ async fn open_reader(
+ &self,
+ args: &OpenReaderArgs,
+ ) -> Result<Box<dyn RecordBatchReader + Send>> {
+ check_object_is_readable_file(&args.src);
+
+ let src: StringArray = [args.src.clone()]
+ .iter()
+ .map(|item| Some(item.to_url_string().unwrap()))
+ .collect();
+ let batch_size: Int64Array = [args.batch_size]
+ .iter()
+ .map(|item| item.map(|i| i as i64))
+ .collect();
+ let filter_count: Int32Array = [args.filters.len() as
i32].into_iter().collect();
+ let option_value: StringArray =
[self.option_value.clone()].iter().collect();
+
+ let schema = Arc::new(self.infer_schema(&args.src).await?);
+ let mut batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(src),
+ Arc::new(batch_size),
+ Arc::new(filter_count),
+ Arc::new(option_value),
+ ],
+ )?;
+
+ if let Some(projection) = &args.file_projection {
+ batch = batch.project(projection)?;
+ }
+
+ Ok(Box::new(RecordBatchIterator::new([Ok(batch)], schema)))
+ }
+ }
+
+ #[tokio::test]
+ async fn spec_format() {
+ let ctx = create_echo_spec_ctx();
+ let (temp_dir, files) = create_echo_spec_temp_dir();
+
+ // Select using just the filename and ensure we get a result
+ let batches_item0 = ctx
+ .table(files[0].to_string_lossy().to_string())
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ assert_eq!(batches_item0.len(), 1);
+ assert_eq!(batches_item0[0].num_rows(), 1);
+
+ // With a glob we should get all the files
+ let batches = ctx
+ .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy()))
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ // We should get one value per partition
+ assert_eq!(batches.len(), 2);
+ assert_eq!(batches[0].num_rows(), 1);
+ assert_eq!(batches[1].num_rows(), 1);
+ }
+
+ #[tokio::test]
+ async fn spec_format_project_filter() {
+ let ctx = create_echo_spec_ctx();
+ let (temp_dir, _files) = create_echo_spec_temp_dir();
+
+ // Ensure that if we pass
+ let batches = ctx
+ .table(format!("{}/*.echospec", temp_dir.path().to_string_lossy()))
+ .await
+ .unwrap()
+ .filter(col("src").like(lit("%item0%")))
+ .unwrap()
+ .select(vec![col("batch_size"), col("filter_count")])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ assert_batches_eq!(
+ [
+ "+------------+--------------+",
+ "| batch_size | filter_count |",
+ "+------------+--------------+",
+ "| 8192 | 1 |",
+ "+------------+--------------+",
+ ],
+ &batches
+ );
+ }
+
+ #[tokio::test]
+ async fn spec_listing_table() {
+ let spec = Arc::new(EchoSpec::default());
+ let ctx = SessionContext::new();
+ let (_temp_dir, files) = create_echo_spec_temp_dir();
+
+ // Select using a listing table and ensure we get a result
+ let provider = external_listing_table(
+ spec,
+ &ctx,
+ files
+ .iter()
+ .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+ .collect(),
+ true,
+ )
+ .await
+ .unwrap();
+
+ let batches = ctx
+ .read_table(Arc::new(provider))
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ // We should get one value per partition
+ assert_eq!(batches.len(), 2);
+ assert_eq!(batches[0].num_rows(), 1);
+ assert_eq!(batches[1].num_rows(), 1);
+ }
+
+ #[tokio::test]
+ async fn spec_listing_table_options() {
+ let spec = Arc::new(EchoSpec::default())
+ .with_options(&[("option_value".to_string(),
"foofy".to_string())].into())
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let (_temp_dir, files) = create_echo_spec_temp_dir();
+
+ // Select using a listing table and ensure we get a result with the
option passed
+ let provider = external_listing_table(
+ spec,
+ &ctx,
+ files
+ .iter()
+ .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+ .collect(),
+ true,
+ )
+ .await
+ .unwrap();
+
+ let batches = ctx
+ .read_table(Arc::new(provider))
+ .unwrap()
+ .select(vec![col("batch_size"), col("option_value")])
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ assert_batches_eq!(
+ [
+ "+------------+--------------+",
+ "| batch_size | option_value |",
+ "+------------+--------------+",
+ "| 8192 | foofy |",
+ "| 8192 | foofy |",
+ "+------------+--------------+",
+ ],
+ &batches
+ );
+ }
+
+ #[tokio::test]
+ async fn spec_listing_table_errors() {
+ let spec = Arc::new(EchoSpec::default())
+ .with_options(&[("option_value".to_string(),
"foofy".to_string())].into())
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let (temp_dir, mut files) = create_echo_spec_temp_dir();
+
+ // Listing table with no files should error
+ let err = external_listing_table(spec.clone(), &ctx, vec![], true)
+ .await
+ .unwrap_err();
+ assert_eq!(err.message(), "No table paths were provided");
+
+ // Create a file with a different extension
+ let file2 = temp_dir.path().join("item2.echospecNOT");
+ std::fs::File::create(&file2)
+ .unwrap()
+ .write_all(b"not empty")
+ .unwrap();
+ files.push(file2);
+
+ // With check_extension as true we should get an error
+ let err = external_listing_table(
+ spec.clone(),
+ &ctx,
+ files
+ .iter()
+ .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+ .collect(),
+ true,
+ )
+ .await
+ .unwrap_err();
+
+ assert!(err
+ .message()
+ .ends_with("does not match the expected extension 'echospec'"));
+
+ // ...but we should be able to turn off the error
+ external_listing_table(
+ spec,
+ &ctx,
+ files
+ .iter()
+ .map(|f| ListingTableUrl::parse(f.to_string_lossy()).unwrap())
+ .collect(),
+ false,
+ )
+ .await
+ .unwrap();
+ }
+}
diff --git a/rust/sedona-datasource/src/lib.rs
b/rust/sedona-datasource/src/lib.rs
new file mode 100644
index 0000000..4bdc596
--- /dev/null
+++ b/rust/sedona-datasource/src/lib.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod format;
+pub mod provider;
+pub mod spec;
diff --git a/rust/sedona-datasource/src/provider.rs
b/rust/sedona-datasource/src/provider.rs
new file mode 100644
index 0000000..6b06a70
--- /dev/null
+++ b/rust/sedona-datasource/src/provider.rs
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion::{
+ config::TableOptions,
+ datasource::listing::{ListingOptions, ListingTable, ListingTableConfig,
ListingTableUrl},
+ execution::{options::ReadOptions, SessionState},
+ prelude::{SessionConfig, SessionContext},
+};
+use datafusion_common::{exec_err, Result};
+
+use crate::{format::ExternalFileFormat, spec::ExternalFormatSpec};
+
+/// Create a [ListingTable] from an [ExternalFormatSpec] and one or more URLs
+///
+/// This can be used to resolve a format specification into a TableProvider
that
+/// may be registered with a [SessionContext].
+pub async fn external_listing_table(
+ spec: Arc<dyn ExternalFormatSpec>,
+ context: &SessionContext,
+ table_paths: Vec<ListingTableUrl>,
+ check_extension: bool,
+) -> Result<ListingTable> {
+ let session_config = context.copied_config();
+ let options = RecordBatchReaderTableOptions {
+ spec,
+ check_extension,
+ };
+ let listing_options =
+ options.to_listing_options(&session_config,
context.copied_table_options());
+
+ let option_extension = listing_options.file_extension.clone();
+
+ if table_paths.is_empty() {
+ return exec_err!("No table paths were provided");
+ }
+
+ // check if the file extension matches the expected extension if one is
provided
+ if !option_extension.is_empty() && options.check_extension {
+ for path in &table_paths {
+ let file_path = path.as_str();
+ if !file_path.ends_with(option_extension.clone().as_str()) &&
!path.is_collection() {
+ return exec_err!(
+ "File path '{file_path}' does not match the expected
extension '{option_extension}'"
+ );
+ }
+ }
+ }
+
+ let resolved_schema = options
+ .get_resolved_schema(&session_config, context.state(),
table_paths[0].clone())
+ .await?;
+ let config = ListingTableConfig::new_with_multi_paths(table_paths)
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+
+ ListingTable::try_new(config)
+}
+
+#[derive(Debug, Clone)]
+struct RecordBatchReaderTableOptions {
+ spec: Arc<dyn ExternalFormatSpec>,
+ check_extension: bool,
+}
+
+#[async_trait]
+impl ReadOptions<'_> for RecordBatchReaderTableOptions {
+ fn to_listing_options(
+ &self,
+ config: &SessionConfig,
+ table_options: TableOptions,
+ ) -> ListingOptions {
+ let format = if let Some(modified) =
self.spec.with_table_options(&table_options) {
+ ExternalFileFormat::new(modified)
+ } else {
+ ExternalFileFormat::new(self.spec.clone())
+ };
+
+ ListingOptions::new(Arc::new(format))
+ .with_file_extension(self.spec.extension())
+ .with_session_config_options(config)
+ }
+
+ async fn get_resolved_schema(
+ &self,
+ config: &SessionConfig,
+ state: SessionState,
+ table_path: ListingTableUrl,
+ ) -> Result<SchemaRef> {
+ self.to_listing_options(config, state.default_table_options())
+ .infer_schema(&state, &table_path)
+ .await
+ }
+}
diff --git a/rust/sedona-datasource/src/spec.rs
b/rust/sedona-datasource/src/spec.rs
new file mode 100644
index 0000000..a3abf8d
--- /dev/null
+++ b/rust/sedona-datasource/src/spec.rs
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, fmt::Debug, sync::Arc};
+
+use arrow_array::RecordBatchReader;
+use arrow_schema::{Schema, SchemaRef};
+use async_trait::async_trait;
+
+use datafusion::{config::TableOptions, datasource::listing::FileRange};
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_physical_expr::PhysicalExpr;
+use object_store::{ObjectMeta, ObjectStore};
+
+/// Simple file format specification
+///
+/// In DataFusion, various parts of the file format are split among the
+/// FileFormatFactory, the FileFormat, the FileSource, the FileOpener,
+/// and a few other traits. This trait is designed to provide a few
+/// important features of a natively implemented FileFormat but consolidating
+/// the components of implementing the format in the same place. This is
+/// intended to provide a less verbose way to implement readers for a wide
+/// variety of spatial formats.
+#[async_trait]
+pub trait ExternalFormatSpec: Debug + Send + Sync {
+ /// Infer a schema for a given file
+ ///
+ /// Given a single file, infer what schema
[ExternalFormatSpec::open_reader]
+ /// would produce in the absence of any other guidance.
+ async fn infer_schema(&self, location: &Object) -> Result<Schema>;
+
+ /// Open a [RecordBatchReader] for a given file
+ ///
+ /// The implementation must handle the `file_projection`; however,
+ /// need not handle the `filters` (but may use them for pruning).
+ async fn open_reader(&self, args: &OpenReaderArgs)
+ -> Result<Box<dyn RecordBatchReader + Send>>;
+
+ /// A file extension or `""` if this concept does not apply
+ fn extension(&self) -> &str {
+ ""
+ }
+
+ /// Compute a clone of self but with the key/value options specified
+ ///
+ /// Implementations should error for invalid key/value input that does
+ /// not apply to this reader.
+ fn with_options(
+ &self,
+ options: &HashMap<String, String>,
+ ) -> Result<Arc<dyn ExternalFormatSpec>>;
+
+ /// Fill in default options from [TableOptions]
+ ///
+ /// The TableOptions are a DataFusion concept that provide a means by which
+ /// options can be set for various table formats. If the defaults for a
built-in
+ /// table format are reasonable to fill in or if Extensions have been set,
+ /// these can be accessed and used to fill default options. Note that any
options
+ /// set with [ExternalFormatSpec::with_options] should take precedent.
+ fn with_table_options(
+ &self,
+ _table_options: &TableOptions,
+ ) -> Option<Arc<dyn ExternalFormatSpec>> {
+ None
+ }
+
+ /// Allow repartitioning
+ ///
+ /// This allows an implementation to opt in to DataFusion's built-in file
size
+ /// based partitioner, which works well for partitioning files where a
simple
+ /// file plus byte range is sufficient. The default opts out of this
feature
+ /// (i.e., every file is passed exactly one to
[ExternalFormatSpec::open_reader]
+ /// without a `range`).
+ fn supports_repartition(&self) -> SupportsRepartition {
+ SupportsRepartition::None
+ }
+
+ /// Infer [Statistics] for a given file
+ async fn infer_stats(&self, _location: &Object, table_schema: &Schema) ->
Result<Statistics> {
+ Ok(Statistics::new_unknown(table_schema))
+ }
+}
+
+/// Enumerator for repartitioning support
+#[derive(Debug, Clone, Copy)]
+pub enum SupportsRepartition {
+ /// This implementation does not support repartitioning beyond the file
level
+ None,
+ /// This implementation supports partitioning by arbitrary ranges with a
file
+ ///
+ /// Implementations that return this must check [Object::range] when
opening
+ /// a file and ensure that each record is read exactly once from one file
+ /// with potentially multiple ranges.
+ ByRange,
+}
+
+/// Arguments to [ExternalFormatSpec::open_reader]
+#[derive(Debug, Clone)]
+pub struct OpenReaderArgs {
+ /// The input file, or partial file if [SupportsRepartition::ByRange] is
used
+ pub src: Object,
+
+ /// The requested batch size
+ ///
+ /// DataFusion will usually fill this in to a default of 8192 or a
user-specified
+ /// default in the session configuration.
+ pub batch_size: Option<usize>,
+
+ /// The requested file schema, if specified
+ ///
+ /// DataFusion will usually fill this in to the schema inferred by
+ /// [ExternalFormatSpec::infer_schema].
+ pub file_schema: Option<SchemaRef>,
+
+ /// The requested field indices
+ ///
+ /// Implementations must handle this (e.g., using `RecordBatch::project`
+ /// or by implementing partial reads).
+ pub file_projection: Option<Vec<usize>>,
+
+ /// Filter expressions
+ ///
+ /// Expressions that may be used for pruning. Implementations need not
+ /// apply these filters.
+ pub filters: Vec<Arc<dyn PhysicalExpr>>,
+}
+
+/// The information required to specify a file or partial file
+///
+/// Depending exactly where in DataFusion we are calling in from, we might
+/// have various information about the file. In general, implementations should
+/// use [ObjectStore] and [ObjectMeta] to access the file to use DataFusion's
+/// registered IO for these protocols. When implementing a filename-based
reader
+/// (e.g., that uses some external API to read files), use
[Object::to_url_string].
+#[derive(Debug, Clone)]
+pub struct Object {
+ /// The object store reference
+ pub store: Option<Arc<dyn ObjectStore>>,
+
+ /// A URL that may be used to retrieve an [ObjectStore] from a registry
+ ///
+ /// These URLs typically are populated only with the scheme.
+ pub url: Option<ObjectStoreUrl>,
+
+ /// An individual object in an ObjectStore
+ pub meta: Option<ObjectMeta>,
+
+ /// If this represents a partial file, the byte range within the file
+ ///
+ /// This is only set if partitioning other than `None` is provided
+ pub range: Option<FileRange>,
+}
+
+impl Object {
+ /// Convert this object to a URL string, if possible
+ ///
+ /// Returns `None` if there is not sufficient information in the Object to
calculate
+ /// this.
+ pub fn to_url_string(&self) -> Option<String> {
+ match (&self.url, &self.meta) {
+ (None, Some(meta)) => {
+ // There's no great way to map an object_store to a url prefix
if we're not
+ // provided the `url`; however, this is what we have access to
in the
+ // Schema and Statistics resolution phases of the FileFormat.
+ // This is a heuristic that should work for https and a local
filesystem,
+ // which is what we might be able to expect a non-DataFusion
system like
+ // GDAL to be able to translate.
+ let object_store_debug = format!("{:?}",
self.store).to_lowercase();
+ if object_store_debug.contains("http") {
+ Some(format!("https://{}", meta.location))
+ } else if object_store_debug.contains("local") {
+ Some(format!("file:///{}", meta.location))
+ } else {
+ None
+ }
+ }
+ (Some(url), None) => Some(url.to_string()),
+ (Some(url), Some(meta)) => Some(format!("{url}/{}",
meta.location)),
+ (None, None) => None,
+ }
+ }
+}