This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 82dda378 feat(rust/sedona-geoparquet): Ensure GeoParquet configuration 
options for read and write can be passed via SQL (#607)
82dda378 is described below

commit 82dda37812f900e1f3cedefd1e785aa4b7a70e75
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Feb 17 15:47:19 2026 -0600

    feat(rust/sedona-geoparquet): Ensure GeoParquet configuration options for 
read and write can be passed via SQL (#607)
---
 python/sedonadb/python/sedonadb/dataframe.py |  37 ++++++-
 python/sedonadb/src/dataframe.rs             |  36 +++---
 python/sedonadb/tests/io/test_parquet.py     |  23 ++++
 r/sedonadb/src/rust/src/dataframe.rs         |   6 +-
 rust/sedona-geoparquet/src/file_opener.rs    |   2 +-
 rust/sedona-geoparquet/src/format.rs         |  56 +++++++++-
 rust/sedona-geoparquet/src/options.rs        | 157 +++++++++++++++++++++++++--
 rust/sedona-geoparquet/src/provider.rs       |   3 +-
 rust/sedona-geoparquet/src/writer.rs         | 125 +++++++++++++--------
 rust/sedona/src/context.rs                   |  38 ++++++-
 rust/sedona/src/object_storage.rs            |  18 ++-
 11 files changed, 419 insertions(+), 82 deletions(-)

diff --git a/python/sedonadb/python/sedonadb/dataframe.py 
b/python/sedonadb/python/sedonadb/dataframe.py
index 7f6d630e..c32c8600 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -338,11 +338,14 @@ class DataFrame:
         self,
         path: Union[str, Path],
         *,
+        options: Optional[Dict[str, Any]] = None,
         partition_by: Optional[Union[str, Iterable[str]]] = None,
         sort_by: Optional[Union[str, Iterable[str]]] = None,
         single_file_output: Optional[bool] = None,
-        geoparquet_version: Literal["1.0", "1.1"] = "1.0",
-        overwrite_bbox_columns: bool = False,
+        geoparquet_version: Literal["1.0", "1.1", None] = None,
+        overwrite_bbox_columns: Optional[bool] = None,
+        max_row_group_size: Optional[int] = None,
+        compression: Optional[str] = None,
     ):
         """Write this DataFrame to one or more (Geo)Parquet files
 
@@ -353,6 +356,11 @@ class DataFrame:
 
         Args:
             path: A filename or directory to which parquet file(s) should be 
written.
+            options: Key/value options to be used when constructing a parquet 
writer.
+                Common options are exposed as other arguments to 
`to_parquet()`; however,
+                this argument allows setting any DataFusion Parquet writer 
option. If
+                an option is specified here and by an argument to this 
function, the
+                value specified as a keyword argument takes precedence.
             partition_by: A vector of column names to partition by. If 
non-empty,
                 applies hive-style partitioning to the output.
             sort_by: A vector of column names to sort by. Currently only 
ascending
@@ -376,6 +384,11 @@ class DataFrame:
                 that already exist in the input. This is useful in a read -> 
modify
                 -> write scenario to ensure these columns are up-to-date. If 
`False`
                 (the default), an error will be raised if a bbox column 
already exists.
+            max_row_group_size: Target maximum number of rows in each row 
group. Defaults
+                to the global configuration value (1M rows).
+            compression: Sets the Parquet compression codec. Valid values are: 
uncompressed,
+                snappy, gzip(level), brotli(level), lz4, zstd(level), and 
lz4_raw. Defaults
+                to the global configuration value (zstd(3)).
 
         Examples:
 
@@ -389,6 +402,23 @@ class DataFrame:
 
         path = Path(path)
 
+        if options is not None:
+            options = {k: str(v) for k, v in options.items()}
+        else:
+            options = {}
+
+        if max_row_group_size is not None:
+            options["max_row_group_size"] = str(max_row_group_size)
+
+        if compression is not None:
+            options["compression"] = str(compression)
+
+        if geoparquet_version is not None:
+            options["geoparquet_version"] = str(geoparquet_version)
+
+        if overwrite_bbox_columns is not None:
+            options["overwrite_bbox_columns"] = str(overwrite_bbox_columns)
+
         if single_file_output is None:
             single_file_output = partition_by is None and 
str(path).endswith(".parquet")
 
@@ -409,11 +439,10 @@ class DataFrame:
         self._impl.to_parquet(
             self._ctx,
             str(path),
+            options,
             partition_by,
             sort_by,
             single_file_output,
-            geoparquet_version,
-            overwrite_bbox_columns,
         )
 
     def show(
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index e0bf2151..5de8c011 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -23,6 +23,7 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream;
 use arrow_array::{RecordBatch, RecordBatchReader};
 use arrow_schema::{Schema, SchemaRef};
 use datafusion::catalog::MemTable;
+use datafusion::config::ConfigField;
 use datafusion::logical_expr::SortExpr;
 use datafusion::prelude::DataFrame;
 use datafusion_common::{Column, DataFusionError, ParamValues};
@@ -33,7 +34,7 @@ use pyo3::prelude::*;
 use pyo3::types::{PyCapsule, PyDict, PyList};
 use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
 use sedona::show::{DisplayMode, DisplayTableOptions};
-use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
+use sedona_geoparquet::options::TableGeoParquetOptions;
 use sedona_schema::schema::SedonaSchema;
 use tokio::runtime::Runtime;
 
@@ -181,11 +182,10 @@ impl InternalDataFrame {
         py: Python<'py>,
         ctx: &InternalContext,
         path: String,
+        options: Bound<'py, PyDict>,
         partition_by: Vec<String>,
         sort_by: Vec<String>,
         single_file_output: bool,
-        geoparquet_version: Option<String>,
-        overwrite_bbox_columns: bool,
     ) -> Result<(), PySedonaError> {
         // sort_by needs to be SortExpr. A Vec<String> can unambiguously be 
interpreted as
         // field names (ascending), but other types of expressions aren't 
supported here yet.
@@ -197,18 +197,18 @@ impl InternalDataFrame {
             })
             .collect::<Vec<_>>();
 
-        let options = SedonaWriteOptions::new()
+        let write_options = SedonaWriteOptions::new()
             .with_partition_by(partition_by)
             .with_sort_by(sort_by_expr)
             .with_single_file_output(single_file_output);
 
-        let mut writer_options = TableGeoParquetOptions::new();
-        writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
-        if let Some(geoparquet_version) = geoparquet_version {
-            writer_options.geoparquet_version = geoparquet_version.parse()?;
-        } else {
-            writer_options.geoparquet_version = GeoParquetVersion::Omitted;
-        }
+        let options_map = options
+            .iter()
+            .map(|(k, v)| Ok((k.extract::<String>()?, v.extract::<String>()?)))
+            .collect::<Result<HashMap<_, _>, PySedonaError>>()?;
+
+        // Create GeoParquet options
+        let mut writer_options = TableGeoParquetOptions::default();
 
         // Resolve writer options from the context configuration
         let global_parquet_options = ctx
@@ -222,12 +222,20 @@ impl InternalDataFrame {
             .clone();
         writer_options.inner.global = global_parquet_options;
 
+        // Set values from options dictionary
+        for (k, v) in &options_map {
+            writer_options.set(k, v)?;
+        }
+
         wait_for_future(
             py,
             &self.runtime,
-            self.inner
-                .clone()
-                .write_geoparquet(&ctx.inner, &path, options, 
Some(writer_options)),
+            self.inner.clone().write_geoparquet(
+                &ctx.inner,
+                &path,
+                write_options,
+                Some(writer_options),
+            ),
         )??;
         Ok(())
     }
diff --git a/python/sedonadb/tests/io/test_parquet.py 
b/python/sedonadb/tests/io/test_parquet.py
index c80d7478..f5980659 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -332,6 +332,29 @@ def test_write_geoparquet_options(geoarrow_data):
         metadata = parquet.read_metadata(tmp_parquet)
         assert metadata.row_group(0).num_rows == 1024
 
+        # Set via keyword arg and ensure that value is respected
+        con.sql(
+            "SET datafusion.execution.parquet.max_row_group_size = 1000000"
+        ).execute()
+        con.create_data_frame(gdf).to_parquet(
+            tmp_parquet,
+            max_row_group_size=1024,
+        )
+
+        gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
+        geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+
+        metadata = parquet.read_metadata(tmp_parquet)
+        assert metadata.row_group(0).num_rows == 1024
+
+        # Ensure compression is respected
+        size_with_default_compression = tmp_parquet.stat().st_size
+        con.create_data_frame(gdf).to_parquet(
+            tmp_parquet,
+            compression="uncompressed",
+        )
+        assert tmp_parquet.stat().st_size > (size_with_default_compression * 2)
+
 
 def test_write_geoparquet_1_1(con, geoarrow_data):
     # Checks GeoParquet 1.1 support specifically
diff --git a/r/sedonadb/src/rust/src/dataframe.rs 
b/r/sedonadb/src/rust/src/dataframe.rs
index 275b1f4f..2e24fdfc 100644
--- a/r/sedonadb/src/rust/src/dataframe.rs
+++ b/r/sedonadb/src/rust/src/dataframe.rs
@@ -262,8 +262,10 @@ impl InternalDataFrame {
             .with_sort_by(sort_by_expr)
             .with_single_file_output(single_file_output);
 
-        let mut writer_options = TableGeoParquetOptions::new();
-        writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
+        let mut writer_options = TableGeoParquetOptions {
+            overwrite_bbox_columns,
+            ..Default::default()
+        };
         if let Some(geoparquet_version) = geoparquet_version {
             writer_options.geoparquet_version = geoparquet_version
                 .parse()
diff --git a/rust/sedona-geoparquet/src/file_opener.rs 
b/rust/sedona-geoparquet/src/file_opener.rs
index 72719c88..6ba04b82 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -128,7 +128,7 @@ impl FileOpener for GeoParquetFileOpener {
 
             let maybe_geoparquet_metadata = 
GeoParquetMetadata::try_from_parquet_metadata(
                 &parquet_metadata,
-                self_clone.options.geometry_columns.as_ref(),
+                self_clone.options.geometry_columns.inner(),
             )?;
 
             if self_clone.enable_pruning {
diff --git a/rust/sedona-geoparquet/src/format.rs 
b/rust/sedona-geoparquet/src/format.rs
index b8d50cd6..08a04c92 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -24,7 +24,7 @@ use std::{
 use arrow_schema::{Schema, SchemaRef};
 use async_trait::async_trait;
 use datafusion::{
-    config::ConfigOptions,
+    config::{ConfigField, ConfigOptions},
     datasource::{
         file_format::{
             file_compression_type::FileCompressionType,
@@ -97,8 +97,16 @@ impl FileFormatFactory for GeoParquetFormatFactory {
     ) -> Result<Arc<dyn FileFormat>> {
         let mut options_mut = self.options.clone().unwrap_or_default();
         let mut format_options_mut = format_options.clone();
-        if let Some(version_string) = 
format_options_mut.remove("geoparquet_version") {
-            options_mut.geoparquet_version = version_string.parse()?;
+
+        // Remove GeoParquet-specific options that will cause an error if 
passed
+        // to inner.create() and ensure they are reflected by the GeoParquet
+        // options. These are prefixed with `format` when passed by
+        // DataFusion SQL. DataFusion takes care of lowercasing these values 
before
+        // they are passed here.
+        for key in TableGeoParquetOptions::TABLE_OPTIONS_KEYS {
+            if let Some(value) = format_options_mut.remove(key) {
+                options_mut.set(key.strip_prefix("format.").unwrap(), &value)?;
+            }
         }
 
         let inner_format = self.inner.create(state, &format_options_mut)?;
@@ -215,7 +223,7 @@ impl FileFormat for GeoParquetFormat {
         for metadata in &metadatas {
             let this_geoparquet_metadata = 
GeoParquetMetadata::try_from_parquet_metadata(
                 metadata,
-                self.options.geometry_columns.as_ref(),
+                self.options.geometry_columns.inner(),
             )?;
 
             match (geoparquet_metadata.as_mut(), this_geoparquet_metadata) {
@@ -605,7 +613,7 @@ mod test {
     use datafusion_physical_expr::PhysicalExpr;
 
     use rstest::rstest;
-    use sedona_schema::crs::lnglat;
+    use sedona_schema::crs::{deserialize_crs, lnglat};
     use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
     use sedona_schema::schema::SedonaSchema;
     use sedona_testing::create::create_scalar;
@@ -614,7 +622,7 @@ mod test {
     use super::*;
 
     fn setup_context() -> SessionContext {
-        let mut state = SessionStateBuilder::new().build();
+        let mut state = 
SessionStateBuilder::new_with_default_features().build();
         state
             .register_file_format(Arc::new(GeoParquetFormatFactory::new()), 
true)
             .unwrap();
@@ -750,6 +758,42 @@ mod test {
         );
     }
 
+    #[tokio::test]
+    async fn format_from_external_table() {
+        let ctx = setup_context();
+        let example = test_geoparquet("example", "geometry").unwrap();
+        ctx.sql(&format!(
+            r#"
+            CREATE EXTERNAL TABLE test
+            STORED AS PARQUET
+            LOCATION '{example}'
+            OPTIONS ('geometry_columns' '{{"geometry": {{"encoding": "WKB", 
"crs": "EPSG:3857"}}}}');
+            "#
+        ))
+        .await
+        .unwrap();
+
+        let df = ctx.table("test").await.unwrap();
+
+        // Check that the logical plan resulting from a read has the correct 
schema
+        assert_eq!(
+            df.schema().clone().strip_qualifiers().field_names(),
+            ["wkt", "geometry"]
+        );
+
+        let sedona_types = df
+            .schema()
+            .sedona_types()
+            .collect::<Result<Vec<_>>>()
+            .unwrap();
+        assert_eq!(sedona_types.len(), 2);
+        assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+        assert_eq!(
+            sedona_types[1],
+            SedonaType::WkbView(Edges::Planar, 
deserialize_crs("EPSG:3857").unwrap())
+        );
+    }
+
     #[rstest]
     #[tokio::test]
     async fn pruning_geoparquet_metadata(#[values("st_intersects", 
"st_contains")] udf_name: &str) {
diff --git a/rust/sedona-geoparquet/src/options.rs 
b/rust/sedona-geoparquet/src/options.rs
index 2cc1fd12..bf136c37 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -17,13 +17,13 @@
 
 use std::{collections::HashMap, str::FromStr};
 
-use datafusion::config::TableParquetOptions;
-use datafusion_common::{plan_err, DataFusionError};
+use datafusion::config::{ConfigField, TableParquetOptions, Visit};
+use datafusion_common::{plan_err, DataFusionError, Result};
 
 use crate::metadata::GeoParquetColumnMetadata;
 
 /// [TableParquetOptions] wrapper with GeoParquet-specific options
-#[derive(Debug, Default, Clone)]
+#[derive(Debug, Clone, Default, PartialEq)]
 pub struct TableGeoParquetOptions {
     /// Inner [TableParquetOptions]
     pub inner: TableParquetOptions,
@@ -33,14 +33,76 @@ pub struct TableGeoParquetOptions {
     /// bounding box columns.
     pub overwrite_bbox_columns: bool,
     /// Optional geometry column metadata overrides for schema inference.
-    pub geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+    pub geometry_columns: GeometryColumns,
     /// Validate geometry column contents against metadata when reading.
     pub validate: bool,
 }
 
 impl TableGeoParquetOptions {
-    pub fn new() -> Self {
-        Self::default()
+    /// Special-cased TableOptions names
+    ///
+    /// When the TableGeoParquet options is being constructed from a place
+    /// where DataFusion internals are interacting with a TableOptions 
instance,
+    /// these are the option names that get created (e.g. OPTIONS ('validate' 
true))
+    /// becomes a string key of format.validate). There are several places 
that we
+    /// need to intercept these before they are used to update the 
TableOptions.
+    pub const TABLE_OPTIONS_KEYS: [&str; 4] = [
+        "format.geoparquet_version",
+        "format.geometry_columns",
+        "format.validate",
+        "format.overwrite_bbox_columns",
+    ];
+}
+
+impl ConfigField for TableGeoParquetOptions {
+    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: 
&'static str) {
+        // Visit inner TableParquetOptions fields
+        self.inner.visit(v, key_prefix, "");
+
+        // Visit GeoParquet-specific fields
+        self.geoparquet_version.visit(
+            v,
+            &format!("{key_prefix}.geoparquet_version"),
+            "GeoParquet version to use when writing",
+        );
+        self.overwrite_bbox_columns.visit(
+            v,
+            &format!("{key_prefix}.overwrite_bbox_columns"),
+            "Overwrite existing bounding box columns when writing GeoParquet 
1.1",
+        );
+        self.geometry_columns.visit(
+            v,
+            &format!("{key_prefix}.geometry_columns"),
+            "Optional geometry column metadata overrides for schema inference",
+        );
+        self.validate.visit(
+            v,
+            &format!("{key_prefix}.validate"),
+            "Validate geometry column contents against metadata when reading",
+        );
+    }
+
+    fn set(&mut self, key: &str, value: &str) -> Result<()> {
+        // Try GeoParquet-specific keys first
+        match key {
+            "geoparquet_version" => {
+                self.geoparquet_version.set(key, value)?;
+            }
+            "overwrite_bbox_columns" => {
+                self.overwrite_bbox_columns.set(key, value)?;
+            }
+            "geometry_columns" => {
+                self.geometry_columns.set(key, value)?;
+            }
+            "validate" => {
+                self.validate.set(key, value)?;
+            }
+            // Forward all other keys to inner TableParquetOptions
+            _ => {
+                self.inner.set(key, value)?;
+            }
+        }
+        Ok(())
     }
 }
 
@@ -54,7 +116,7 @@ impl From<TableParquetOptions> for TableGeoParquetOptions {
 }
 
 /// The GeoParquet Version to write for output with spatial columns
-#[derive(Debug, Clone, Copy, Default)]
+#[derive(Debug, Clone, Copy, Default, PartialEq)]
 pub enum GeoParquetVersion {
     /// Write GeoParquet 1.0 metadata
     ///
@@ -102,3 +164,84 @@ impl FromStr for GeoParquetVersion {
         }
     }
 }
+
+impl ConfigField for GeoParquetVersion {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) 
{
+        let value = match self {
+            GeoParquetVersion::V1_0 => "1.0",
+            GeoParquetVersion::V1_1 => "1.1",
+            GeoParquetVersion::V2_0 => "2.0",
+            GeoParquetVersion::Omitted => "none",
+        };
+        v.some(key, value, description);
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+        *self = value.parse()?;
+        Ok(())
+    }
+}
+
+/// Wrapper for geometry column metadata configuration
+#[derive(Debug, Clone, Default, PartialEq)]
+pub struct GeometryColumns {
+    columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+}
+
+impl GeometryColumns {
+    /// Create empty geometry columns
+    pub fn new() -> Self {
+        Self { columns: None }
+    }
+
+    /// Create from a HashMap
+    pub fn from_map(columns: HashMap<String, GeoParquetColumnMetadata>) -> 
Self {
+        Self {
+            columns: Some(columns),
+        }
+    }
+
+    /// Get the inner HashMap
+    pub fn inner(&self) -> Option<&HashMap<String, GeoParquetColumnMetadata>> {
+        self.columns.as_ref()
+    }
+
+    /// Set from JSON string
+    pub fn from_json(json: &str) -> Result<Self> {
+        let columns: HashMap<String, GeoParquetColumnMetadata> = 
serde_json::from_str(json)
+            .map_err(|e| {
+                DataFusionError::Configuration(format!("geometry_columns must 
be valid JSON: {e}"))
+            })?;
+        Ok(Self {
+            columns: Some(columns),
+        })
+    }
+
+    /// Convert to JSON string
+    pub fn to_json(&self) -> Result<String> {
+        match &self.columns {
+            Some(cols) => serde_json::to_string(cols).map_err(|e| {
+                DataFusionError::Configuration(format!("Failed to serialize 
geometry_columns: {e}"))
+            }),
+            None => Ok(String::new()),
+        }
+    }
+}
+
+impl ConfigField for GeometryColumns {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) 
{
+        match self.to_json() {
+            Ok(json) if !json.is_empty() => v.some(key, json, description),
+            _ => {}
+        }
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+        if value.is_empty() {
+            self.columns = None;
+        } else {
+            *self = Self::from_json(value)?;
+        }
+        Ok(())
+    }
+}
diff --git a/rust/sedona-geoparquet/src/provider.rs 
b/rust/sedona-geoparquet/src/provider.rs
index b9d2ba52..2c0ca7fe 100644
--- a/rust/sedona-geoparquet/src/provider.rs
+++ b/rust/sedona-geoparquet/src/provider.rs
@@ -263,7 +263,8 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
             let mut geoparquet_options =
                 TableGeoParquetOptions::from(parquet_format.options().clone());
             if let Some(geometry_columns) = &self.geometry_columns {
-                geoparquet_options.geometry_columns = 
Some(geometry_columns.clone());
+                geoparquet_options.geometry_columns =
+                    
crate::options::GeometryColumns::from_map(geometry_columns.clone());
             }
             geoparquet_options.validate = self.validate;
             options.format = 
Arc::new(GeoParquetFormat::new(geoparquet_options));
diff --git a/rust/sedona-geoparquet/src/writer.rs 
b/rust/sedona-geoparquet/src/writer.rs
index 42c9de4d..1179dbd3 100644
--- a/rust/sedona-geoparquet/src/writer.rs
+++ b/rust/sedona-geoparquet/src/writer.rs
@@ -516,6 +516,7 @@ fn append_float_bbox(
 #[cfg(test)]
 mod test {
     use std::iter::zip;
+    use std::path::Path;
 
     use arrow_array::{create_array, Array, RecordBatch};
     use datafusion::datasource::file_format::format_as_file_type;
@@ -545,7 +546,7 @@ mod test {
         SessionContext::new_with_state(state).enable_url_table()
     }
 
-    async fn test_dataframe_roundtrip(ctx: SessionContext, src: DataFrame) {
+    async fn test_dataframe_roundtrip(ctx: &SessionContext, src: DataFrame) {
         let df_batches = src.clone().collect().await.unwrap();
         test_write_dataframe(
             ctx,
@@ -558,8 +559,18 @@ mod test {
         .unwrap()
     }
 
+    async fn test_write_dataframe_sql(
+        ctx: &SessionContext,
+        sql: &str,
+        tmp_parquet: &Path,
+        expected_batches: Vec<RecordBatch>,
+    ) -> Result<()> {
+        ctx.sql(sql).await?.collect().await?;
+        test_write_dataframe_common(ctx, tmp_parquet, expected_batches).await
+    }
+
     async fn test_write_dataframe(
-        ctx: SessionContext,
+        ctx: &SessionContext,
         src: DataFrame,
         expected_batches: Vec<RecordBatch>,
         options: TableGeoParquetOptions,
@@ -585,6 +596,14 @@ mod test {
 
         DataFrame::new(ctx.state(), plan).collect().await?;
 
+        test_write_dataframe_common(ctx, &tmp_parquet, expected_batches).await
+    }
+
+    async fn test_write_dataframe_common(
+        ctx: &SessionContext,
+        tmp_parquet: &Path,
+        expected_batches: Vec<RecordBatch>,
+    ) -> Result<()> {
         let df_parquet_batches = ctx
             .table(tmp_parquet.to_string_lossy().to_string())
             .await
@@ -624,8 +643,17 @@ mod test {
         assert_eq!(df_parquet_sedona_types, df_sedona_types);
 
         // Check batches without metadata
-        for (df_parquet_batch, df_batch) in zip(df_parquet_batches, 
expected_batches) {
-            assert_eq!(df_parquet_batch.columns(), df_batch.columns())
+        for (i, (df_parquet_batch, df_batch)) in
+            zip(df_parquet_batches, expected_batches).enumerate()
+        {
+            for (j, (df_parquet_column, df_batch_column)) in
+                zip(df_parquet_batch.columns(), df_batch.columns()).enumerate()
+            {
+                assert_eq!(
+                    df_parquet_column, df_batch_column,
+                    "Batch {i} column {j} did not match"
+                );
+            }
         }
 
         Ok(())
@@ -644,7 +672,7 @@ mod test {
             .select(vec![col("wkt")])
             .unwrap();
 
-        test_dataframe_roundtrip(ctx, df).await;
+        test_dataframe_roundtrip(&ctx, df).await;
     }
 
     #[tokio::test]
@@ -653,7 +681,7 @@ mod test {
         let ctx = setup_context();
         let df = ctx.table(&example).await.unwrap();
 
-        test_dataframe_roundtrip(ctx, df).await;
+        test_dataframe_roundtrip(&ctx, df).await;
     }
 
     #[tokio::test]
@@ -662,24 +690,19 @@ mod test {
         let ctx = setup_context();
         let df = ctx.table(&example).await.unwrap();
 
-        test_dataframe_roundtrip(ctx, df).await;
+        test_dataframe_roundtrip(&ctx, df).await;
     }
 
     #[tokio::test]
     async fn geoparquet_1_1_basic() {
         let example = test_geoparquet("example", "geometry").unwrap();
         let ctx = setup_context();
-        let df = ctx
-            .table(&example)
-            .await
-            .unwrap()
-            // DataFusion internals lose the nullability we assigned to the 
bbox
-            // and without this line the test fails.
-            .filter(Expr::IsNotNull(col("geometry").into()))
-            .unwrap();
+        let df = ctx.table(&example).await.unwrap();
 
-        let mut options = TableGeoParquetOptions::new();
-        options.geoparquet_version = GeoParquetVersion::V1_1;
+        let options = TableGeoParquetOptions {
+            geoparquet_version: GeoParquetVersion::V1_1,
+            ..Default::default()
+        };
 
         let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
 
@@ -695,9 +718,29 @@ mod test {
             .await
             .unwrap();
 
-        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+        // Check that we can do this from a DataFrame
+        test_write_dataframe(&ctx, df, df_batches_with_bbox.clone(), options, 
vec![])
             .await
             .unwrap();
+
+        // Check that we can do this from SQL too
+        let tmpdir = tempdir().unwrap();
+        let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet");
+        test_write_dataframe_sql(
+            &ctx,
+            &format!(
+                r#"COPY (
+                    SELECT * FROM '{example}'
+                   ) TO '{}'
+                   OPTIONS (GEOPARQUET_VERSION '1.1')
+                "#,
+                tmp_parquet.display()
+            ),
+            &tmp_parquet,
+            df_batches_with_bbox,
+        )
+        .await
+        .unwrap();
     }
 
     #[tokio::test]
@@ -710,10 +753,6 @@ mod test {
             .table(&example)
             .await
             .unwrap()
-            // DataFusion internals lose the nullability we assigned to the 
bbox
-            // and without this line the test fails.
-            .filter(Expr::IsNotNull(col("geometry").into()))
-            .unwrap()
             .select(vec![
                 col("wkt"),
                 col("geometry").alias("geom"),
@@ -723,8 +762,10 @@ mod test {
             ])
             .unwrap();
 
-        let mut options = TableGeoParquetOptions::new();
-        options.geoparquet_version = GeoParquetVersion::V1_1;
+        let options = TableGeoParquetOptions {
+            geoparquet_version: GeoParquetVersion::V1_1,
+            ..Default::default()
+        };
 
         let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
 
@@ -744,7 +785,7 @@ mod test {
             .await
             .unwrap();
 
-        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+        test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
             .await
             .unwrap();
     }
@@ -760,18 +801,16 @@ mod test {
             .table(&example)
             .await
             .unwrap()
-            // DataFusion internals lose the nullability we assigned to the 
bbox
-            // and without this line the test fails.
-            .filter(Expr::IsNotNull(col("geometry").into()))
-            .unwrap()
             .select(vec![
                 lit("this is definitely not a bbox").alias("bbox"),
                 col("geometry"),
             ])
             .unwrap();
 
-        let mut options = TableGeoParquetOptions::new();
-        options.geoparquet_version = GeoParquetVersion::V1_1;
+        let mut options = TableGeoParquetOptions {
+            geoparquet_version: GeoParquetVersion::V1_1,
+            ..Default::default()
+        };
 
         let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
 
@@ -788,7 +827,7 @@ mod test {
 
         // Without setting overwrite_bbox_columns = true, this should error
         let err = test_write_dataframe(
-            ctx.clone(),
+            &ctx,
             df.clone(),
             df_batches_with_bbox.clone(),
             options.clone(),
@@ -801,7 +840,7 @@ mod test {
             .starts_with("Can't overwrite GeoParquet 1.1 bbox column 'bbox'"));
 
         options.overwrite_bbox_columns = true;
-        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+        test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
             .await
             .unwrap();
     }
@@ -814,10 +853,6 @@ mod test {
             .table(&example)
             .await
             .unwrap()
-            // DataFusion internals loose the nullability we assigned to the 
bbox
-            // and without this line the test fails.
-            .filter(Expr::IsNotNull(col("geometry").into()))
-            .unwrap()
             .select(vec![
                 lit("some_partition").alias("part"),
                 col("wkt"),
@@ -825,8 +860,10 @@ mod test {
             ])
             .unwrap();
 
-        let mut options = TableGeoParquetOptions::new();
-        options.geoparquet_version = GeoParquetVersion::V1_1;
+        let options = TableGeoParquetOptions {
+            geoparquet_version: GeoParquetVersion::V1_1,
+            ..Default::default()
+        };
 
         let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
 
@@ -847,7 +884,7 @@ mod test {
             .await
             .unwrap();
 
-        test_write_dataframe(ctx, df, df_batches_with_bbox, options, 
vec!["part".into()])
+        test_write_dataframe(&ctx, df, df_batches_with_bbox, options, 
vec!["part".into()])
             .await
             .unwrap();
     }
@@ -886,8 +923,10 @@ mod test {
             ])
             .unwrap();
 
-        let mut options = TableGeoParquetOptions::new();
-        options.geoparquet_version = GeoParquetVersion::V1_1;
+        let options = TableGeoParquetOptions {
+            geoparquet_version: GeoParquetVersion::V1_1,
+            ..Default::default()
+        };
 
         let df_batches_with_bbox = df
             .clone()
@@ -901,7 +940,7 @@ mod test {
             .await
             .unwrap();
 
-        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+        test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
             .await
             .unwrap();
     }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index ee2c9635..2ea9cf03 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -557,7 +557,7 @@ mod tests {
     use datafusion::assert_batches_eq;
     use sedona_datasource::spec::{Object, OpenReaderArgs};
     use sedona_schema::{
-        crs::lnglat,
+        crs::{deserialize_crs, lnglat},
         datatypes::{Edges, SedonaType},
         schema::SedonaSchema,
     };
@@ -784,4 +784,40 @@ mod tests {
         .await
         .expect("should succeed because aws and gcs options were stripped");
     }
+
+    #[tokio::test]
+    async fn format_from_external_table() {
+        let ctx = SedonaContext::new_local_interactive().await.unwrap();
+        let example = test_geoparquet("example", "geometry").unwrap();
+        ctx.sql(&format!(
+            r#"
+            CREATE EXTERNAL TABLE test
+            STORED AS PARQUET
+            LOCATION '{example}'
+            OPTIONS ('geometry_columns' '{{"geometry": {{"encoding": "WKB", 
"crs": "EPSG:3857"}}}}');
+            "#
+        ))
+        .await
+        .unwrap();
+
+        let df = ctx.ctx.table("test").await.unwrap();
+
+        // Check that the logical plan resulting from a read has the correct 
schema
+        assert_eq!(
+            df.schema().clone().strip_qualifiers().field_names(),
+            ["wkt", "geometry"]
+        );
+
+        let sedona_types = df
+            .schema()
+            .sedona_types()
+            .collect::<Result<Vec<_>>>()
+            .unwrap();
+        assert_eq!(sedona_types.len(), 2);
+        assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+        assert_eq!(
+            sedona_types[1],
+            SedonaType::WkbView(Edges::Planar, 
deserialize_crs("EPSG:3857").unwrap())
+        );
+    }
 }
diff --git a/rust/sedona/src/object_storage.rs 
b/rust/sedona/src/object_storage.rs
index 519c4418..f5b366e1 100644
--- a/rust/sedona/src/object_storage.rs
+++ b/rust/sedona/src/object_storage.rs
@@ -34,6 +34,7 @@ use dirs::home_dir;
 #[cfg(any(feature = "aws", feature = "azure", feature = "gcp", feature = 
"http"))]
 use object_store::CredentialProvider;
 use object_store::ObjectStore;
+use sedona_geoparquet::options::TableGeoParquetOptions;
 use url::Url;
 
 #[cfg(feature = "aws")]
@@ -819,10 +820,21 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
 
     // Clone and modify the default table options based on the provided options
     let mut table_options = ctx.ctx.state().default_table_options();
-    if let Some(format) = format {
-        table_options.set_config_format(format);
+    if let Some(ref format) = format {
+        table_options.set_config_format(format.clone());
     }
-    table_options.alter_with_string_hash_map(options)?;
+
+    let mut options = options.clone();
+
+    // If this is an explicitly Parquet configuration, we need to strip 
GeoParquet
+    // options before calling alter_string_with_hash_map.
+    if let Some(&ConfigFileType::PARQUET) = format.as_ref() {
+        for key in TableGeoParquetOptions::TABLE_OPTIONS_KEYS {
+            options.remove(key);
+        }
+    }
+
+    table_options.alter_with_string_hash_map(&options)?;
 
     // Retrieve the appropriate object store based on the scheme, URL, and 
modified table options
     let store = get_object_store(&ctx.ctx.state(), scheme, url, 
&table_options).await?;


Reply via email to