This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 82dda378 feat(rust/sedona-geoparquet): Ensure GeoParquet configuration
options for read and write can be passed via SQL (#607)
82dda378 is described below
commit 82dda37812f900e1f3cedefd1e785aa4b7a70e75
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Feb 17 15:47:19 2026 -0600
feat(rust/sedona-geoparquet): Ensure GeoParquet configuration options for
read and write can be passed via SQL (#607)
---
python/sedonadb/python/sedonadb/dataframe.py | 37 ++++++-
python/sedonadb/src/dataframe.rs | 36 +++---
python/sedonadb/tests/io/test_parquet.py | 23 ++++
r/sedonadb/src/rust/src/dataframe.rs | 6 +-
rust/sedona-geoparquet/src/file_opener.rs | 2 +-
rust/sedona-geoparquet/src/format.rs | 56 +++++++++-
rust/sedona-geoparquet/src/options.rs | 157 +++++++++++++++++++++++++--
rust/sedona-geoparquet/src/provider.rs | 3 +-
rust/sedona-geoparquet/src/writer.rs | 125 +++++++++++++--------
rust/sedona/src/context.rs | 38 ++++++-
rust/sedona/src/object_storage.rs | 18 ++-
11 files changed, 419 insertions(+), 82 deletions(-)
diff --git a/python/sedonadb/python/sedonadb/dataframe.py
b/python/sedonadb/python/sedonadb/dataframe.py
index 7f6d630e..c32c8600 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -338,11 +338,14 @@ class DataFrame:
self,
path: Union[str, Path],
*,
+ options: Optional[Dict[str, Any]] = None,
partition_by: Optional[Union[str, Iterable[str]]] = None,
sort_by: Optional[Union[str, Iterable[str]]] = None,
single_file_output: Optional[bool] = None,
- geoparquet_version: Literal["1.0", "1.1"] = "1.0",
- overwrite_bbox_columns: bool = False,
+ geoparquet_version: Literal["1.0", "1.1", None] = None,
+ overwrite_bbox_columns: Optional[bool] = None,
+ max_row_group_size: Optional[int] = None,
+ compression: Optional[str] = None,
):
"""Write this DataFrame to one or more (Geo)Parquet files
@@ -353,6 +356,11 @@ class DataFrame:
Args:
path: A filename or directory to which parquet file(s) should be
written.
+ options: Key/value options to be used when constructing a parquet
writer.
+ Common options are exposed as other arguments to
`to_parquet()`; however,
+ this argument allows setting any DataFusion Parquet writer
option. If
+ an option is specified here and by an argument to this
function, the
+ value specified as a keyword argument takes precedence.
partition_by: A vector of column names to partition by. If
non-empty,
applies hive-style partitioning to the output.
sort_by: A vector of column names to sort by. Currently only
ascending
@@ -376,6 +384,11 @@ class DataFrame:
that already exist in the input. This is useful in a read ->
modify
-> write scenario to ensure these columns are up-to-date. If
`False`
(the default), an error will be raised if a bbox column
already exists.
+ max_row_group_size: Target maximum number of rows in each row
group. Defaults
+ to the global configuration value (1M rows).
+ compression: Sets the Parquet compression codec. Valid values are:
uncompressed,
+ snappy, gzip(level), brotli(level), lz4, zstd(level), and
lz4_raw. Defaults
+ to the global configuration value (zstd(3)).
Examples:
@@ -389,6 +402,23 @@ class DataFrame:
path = Path(path)
+ if options is not None:
+ options = {k: str(v) for k, v in options.items()}
+ else:
+ options = {}
+
+ if max_row_group_size is not None:
+ options["max_row_group_size"] = str(max_row_group_size)
+
+ if compression is not None:
+ options["compression"] = str(compression)
+
+ if geoparquet_version is not None:
+ options["geoparquet_version"] = str(geoparquet_version)
+
+ if overwrite_bbox_columns is not None:
+ options["overwrite_bbox_columns"] = str(overwrite_bbox_columns)
+
if single_file_output is None:
single_file_output = partition_by is None and
str(path).endswith(".parquet")
@@ -409,11 +439,10 @@ class DataFrame:
self._impl.to_parquet(
self._ctx,
str(path),
+ options,
partition_by,
sort_by,
single_file_output,
- geoparquet_version,
- overwrite_bbox_columns,
)
def show(
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index e0bf2151..5de8c011 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -23,6 +23,7 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{Schema, SchemaRef};
use datafusion::catalog::MemTable;
+use datafusion::config::ConfigField;
use datafusion::logical_expr::SortExpr;
use datafusion::prelude::DataFrame;
use datafusion_common::{Column, DataFusionError, ParamValues};
@@ -33,7 +34,7 @@ use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyDict, PyList};
use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::show::{DisplayMode, DisplayTableOptions};
-use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
+use sedona_geoparquet::options::TableGeoParquetOptions;
use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;
@@ -181,11 +182,10 @@ impl InternalDataFrame {
py: Python<'py>,
ctx: &InternalContext,
path: String,
+ options: Bound<'py, PyDict>,
partition_by: Vec<String>,
sort_by: Vec<String>,
single_file_output: bool,
- geoparquet_version: Option<String>,
- overwrite_bbox_columns: bool,
) -> Result<(), PySedonaError> {
// sort_by needs to be SortExpr. A Vec<String> can unambiguously be
interpreted as
// field names (ascending), but other types of expressions aren't
supported here yet.
@@ -197,18 +197,18 @@ impl InternalDataFrame {
})
.collect::<Vec<_>>();
- let options = SedonaWriteOptions::new()
+ let write_options = SedonaWriteOptions::new()
.with_partition_by(partition_by)
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);
- let mut writer_options = TableGeoParquetOptions::new();
- writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
- if let Some(geoparquet_version) = geoparquet_version {
- writer_options.geoparquet_version = geoparquet_version.parse()?;
- } else {
- writer_options.geoparquet_version = GeoParquetVersion::Omitted;
- }
+ let options_map = options
+ .iter()
+ .map(|(k, v)| Ok((k.extract::<String>()?, v.extract::<String>()?)))
+ .collect::<Result<HashMap<_, _>, PySedonaError>>()?;
+
+ // Create GeoParquet options
+ let mut writer_options = TableGeoParquetOptions::default();
// Resolve writer options from the context configuration
let global_parquet_options = ctx
@@ -222,12 +222,20 @@ impl InternalDataFrame {
.clone();
writer_options.inner.global = global_parquet_options;
+ // Set values from options dictionary
+ for (k, v) in &options_map {
+ writer_options.set(k, v)?;
+ }
+
wait_for_future(
py,
&self.runtime,
- self.inner
- .clone()
- .write_geoparquet(&ctx.inner, &path, options,
Some(writer_options)),
+ self.inner.clone().write_geoparquet(
+ &ctx.inner,
+ &path,
+ write_options,
+ Some(writer_options),
+ ),
)??;
Ok(())
}
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index c80d7478..f5980659 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -332,6 +332,29 @@ def test_write_geoparquet_options(geoarrow_data):
metadata = parquet.read_metadata(tmp_parquet)
assert metadata.row_group(0).num_rows == 1024
+ # Set via keyword arg and ensure that value is respected
+ con.sql(
+ "SET datafusion.execution.parquet.max_row_group_size = 1000000"
+ ).execute()
+ con.create_data_frame(gdf).to_parquet(
+ tmp_parquet,
+ max_row_group_size=1024,
+ )
+
+ gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
+ geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+
+ metadata = parquet.read_metadata(tmp_parquet)
+ assert metadata.row_group(0).num_rows == 1024
+
+ # Ensure compression is respected
+ size_with_default_compression = tmp_parquet.stat().st_size
+ con.create_data_frame(gdf).to_parquet(
+ tmp_parquet,
+ compression="uncompressed",
+ )
+ assert tmp_parquet.stat().st_size > (size_with_default_compression * 2)
+
def test_write_geoparquet_1_1(con, geoarrow_data):
# Checks GeoParquet 1.1 support specifically
diff --git a/r/sedonadb/src/rust/src/dataframe.rs
b/r/sedonadb/src/rust/src/dataframe.rs
index 275b1f4f..2e24fdfc 100644
--- a/r/sedonadb/src/rust/src/dataframe.rs
+++ b/r/sedonadb/src/rust/src/dataframe.rs
@@ -262,8 +262,10 @@ impl InternalDataFrame {
.with_sort_by(sort_by_expr)
.with_single_file_output(single_file_output);
- let mut writer_options = TableGeoParquetOptions::new();
- writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
+ let mut writer_options = TableGeoParquetOptions {
+ overwrite_bbox_columns,
+ ..Default::default()
+ };
if let Some(geoparquet_version) = geoparquet_version {
writer_options.geoparquet_version = geoparquet_version
.parse()
diff --git a/rust/sedona-geoparquet/src/file_opener.rs
b/rust/sedona-geoparquet/src/file_opener.rs
index 72719c88..6ba04b82 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -128,7 +128,7 @@ impl FileOpener for GeoParquetFileOpener {
let maybe_geoparquet_metadata =
GeoParquetMetadata::try_from_parquet_metadata(
&parquet_metadata,
- self_clone.options.geometry_columns.as_ref(),
+ self_clone.options.geometry_columns.inner(),
)?;
if self_clone.enable_pruning {
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index b8d50cd6..08a04c92 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -24,7 +24,7 @@ use std::{
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::{
- config::ConfigOptions,
+ config::{ConfigField, ConfigOptions},
datasource::{
file_format::{
file_compression_type::FileCompressionType,
@@ -97,8 +97,16 @@ impl FileFormatFactory for GeoParquetFormatFactory {
) -> Result<Arc<dyn FileFormat>> {
let mut options_mut = self.options.clone().unwrap_or_default();
let mut format_options_mut = format_options.clone();
- if let Some(version_string) =
format_options_mut.remove("geoparquet_version") {
- options_mut.geoparquet_version = version_string.parse()?;
+
+ // Remove GeoParquet-specific options that will cause an error if
passed
+ // to inner.create() and ensure they are reflected by the GeoParquet
+ // options. These are prefixed with `format` when passed by
+ // DataFusion SQL. DataFusion takes care of lowercasing these values
before
+ // they are passed here.
+ for key in TableGeoParquetOptions::TABLE_OPTIONS_KEYS {
+ if let Some(value) = format_options_mut.remove(key) {
+ options_mut.set(key.strip_prefix("format.").unwrap(), &value)?;
+ }
}
let inner_format = self.inner.create(state, &format_options_mut)?;
@@ -215,7 +223,7 @@ impl FileFormat for GeoParquetFormat {
for metadata in &metadatas {
let this_geoparquet_metadata =
GeoParquetMetadata::try_from_parquet_metadata(
metadata,
- self.options.geometry_columns.as_ref(),
+ self.options.geometry_columns.inner(),
)?;
match (geoparquet_metadata.as_mut(), this_geoparquet_metadata) {
@@ -605,7 +613,7 @@ mod test {
use datafusion_physical_expr::PhysicalExpr;
use rstest::rstest;
- use sedona_schema::crs::lnglat;
+ use sedona_schema::crs::{deserialize_crs, lnglat};
use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
use sedona_schema::schema::SedonaSchema;
use sedona_testing::create::create_scalar;
@@ -614,7 +622,7 @@ mod test {
use super::*;
fn setup_context() -> SessionContext {
- let mut state = SessionStateBuilder::new().build();
+ let mut state =
SessionStateBuilder::new_with_default_features().build();
state
.register_file_format(Arc::new(GeoParquetFormatFactory::new()),
true)
.unwrap();
@@ -750,6 +758,42 @@ mod test {
);
}
+ #[tokio::test]
+ async fn format_from_external_table() {
+ let ctx = setup_context();
+ let example = test_geoparquet("example", "geometry").unwrap();
+ ctx.sql(&format!(
+ r#"
+ CREATE EXTERNAL TABLE test
+ STORED AS PARQUET
+ LOCATION '{example}'
+ OPTIONS ('geometry_columns' '{{"geometry": {{"encoding": "WKB",
"crs": "EPSG:3857"}}}}');
+ "#
+ ))
+ .await
+ .unwrap();
+
+ let df = ctx.table("test").await.unwrap();
+
+ // Check that the logical plan resulting from a read has the correct
schema
+ assert_eq!(
+ df.schema().clone().strip_qualifiers().field_names(),
+ ["wkt", "geometry"]
+ );
+
+ let sedona_types = df
+ .schema()
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
+ assert_eq!(sedona_types.len(), 2);
+ assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+ assert_eq!(
+ sedona_types[1],
+ SedonaType::WkbView(Edges::Planar,
deserialize_crs("EPSG:3857").unwrap())
+ );
+ }
+
#[rstest]
#[tokio::test]
async fn pruning_geoparquet_metadata(#[values("st_intersects",
"st_contains")] udf_name: &str) {
diff --git a/rust/sedona-geoparquet/src/options.rs
b/rust/sedona-geoparquet/src/options.rs
index 2cc1fd12..bf136c37 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -17,13 +17,13 @@
use std::{collections::HashMap, str::FromStr};
-use datafusion::config::TableParquetOptions;
-use datafusion_common::{plan_err, DataFusionError};
+use datafusion::config::{ConfigField, TableParquetOptions, Visit};
+use datafusion_common::{plan_err, DataFusionError, Result};
use crate::metadata::GeoParquetColumnMetadata;
/// [TableParquetOptions] wrapper with GeoParquet-specific options
-#[derive(Debug, Default, Clone)]
+#[derive(Debug, Clone, Default, PartialEq)]
pub struct TableGeoParquetOptions {
/// Inner [TableParquetOptions]
pub inner: TableParquetOptions,
@@ -33,14 +33,76 @@ pub struct TableGeoParquetOptions {
/// bounding box columns.
pub overwrite_bbox_columns: bool,
/// Optional geometry column metadata overrides for schema inference.
- pub geometry_columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+ pub geometry_columns: GeometryColumns,
/// Validate geometry column contents against metadata when reading.
pub validate: bool,
}
impl TableGeoParquetOptions {
- pub fn new() -> Self {
- Self::default()
+ /// Special-cased TableOptions names
+ ///
+ /// When the TableGeoParquet options is being constructed from a place
+ /// where DataFusion internals are interacting with a TableOptions
instance,
+ /// these are the option names that get created (e.g. OPTIONS ('validate'
true))
+ /// becomes a string key of format.validate). There are several places
that we
+ /// need to intercept these before they are used to update the
TableOptions.
+ pub const TABLE_OPTIONS_KEYS: [&str; 4] = [
+ "format.geoparquet_version",
+ "format.geometry_columns",
+ "format.validate",
+ "format.overwrite_bbox_columns",
+ ];
+}
+
+impl ConfigField for TableGeoParquetOptions {
+ fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description:
&'static str) {
+ // Visit inner TableParquetOptions fields
+ self.inner.visit(v, key_prefix, "");
+
+ // Visit GeoParquet-specific fields
+ self.geoparquet_version.visit(
+ v,
+ &format!("{key_prefix}.geoparquet_version"),
+ "GeoParquet version to use when writing",
+ );
+ self.overwrite_bbox_columns.visit(
+ v,
+ &format!("{key_prefix}.overwrite_bbox_columns"),
+ "Overwrite existing bounding box columns when writing GeoParquet
1.1",
+ );
+ self.geometry_columns.visit(
+ v,
+ &format!("{key_prefix}.geometry_columns"),
+ "Optional geometry column metadata overrides for schema inference",
+ );
+ self.validate.visit(
+ v,
+ &format!("{key_prefix}.validate"),
+ "Validate geometry column contents against metadata when reading",
+ );
+ }
+
+ fn set(&mut self, key: &str, value: &str) -> Result<()> {
+ // Try GeoParquet-specific keys first
+ match key {
+ "geoparquet_version" => {
+ self.geoparquet_version.set(key, value)?;
+ }
+ "overwrite_bbox_columns" => {
+ self.overwrite_bbox_columns.set(key, value)?;
+ }
+ "geometry_columns" => {
+ self.geometry_columns.set(key, value)?;
+ }
+ "validate" => {
+ self.validate.set(key, value)?;
+ }
+ // Forward all other keys to inner TableParquetOptions
+ _ => {
+ self.inner.set(key, value)?;
+ }
+ }
+ Ok(())
}
}
@@ -54,7 +116,7 @@ impl From<TableParquetOptions> for TableGeoParquetOptions {
}
/// The GeoParquet Version to write for output with spatial columns
-#[derive(Debug, Clone, Copy, Default)]
+#[derive(Debug, Clone, Copy, Default, PartialEq)]
pub enum GeoParquetVersion {
/// Write GeoParquet 1.0 metadata
///
@@ -102,3 +164,84 @@ impl FromStr for GeoParquetVersion {
}
}
}
+
+impl ConfigField for GeoParquetVersion {
+ fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str)
{
+ let value = match self {
+ GeoParquetVersion::V1_0 => "1.0",
+ GeoParquetVersion::V1_1 => "1.1",
+ GeoParquetVersion::V2_0 => "2.0",
+ GeoParquetVersion::Omitted => "none",
+ };
+ v.some(key, value, description);
+ }
+
+ fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+ *self = value.parse()?;
+ Ok(())
+ }
+}
+
+/// Wrapper for geometry column metadata configuration
+#[derive(Debug, Clone, Default, PartialEq)]
+pub struct GeometryColumns {
+ columns: Option<HashMap<String, GeoParquetColumnMetadata>>,
+}
+
+impl GeometryColumns {
+ /// Create empty geometry columns
+ pub fn new() -> Self {
+ Self { columns: None }
+ }
+
+ /// Create from a HashMap
+ pub fn from_map(columns: HashMap<String, GeoParquetColumnMetadata>) ->
Self {
+ Self {
+ columns: Some(columns),
+ }
+ }
+
+ /// Get the inner HashMap
+ pub fn inner(&self) -> Option<&HashMap<String, GeoParquetColumnMetadata>> {
+ self.columns.as_ref()
+ }
+
+ /// Set from JSON string
+ pub fn from_json(json: &str) -> Result<Self> {
+ let columns: HashMap<String, GeoParquetColumnMetadata> =
serde_json::from_str(json)
+ .map_err(|e| {
+ DataFusionError::Configuration(format!("geometry_columns must
be valid JSON: {e}"))
+ })?;
+ Ok(Self {
+ columns: Some(columns),
+ })
+ }
+
+ /// Convert to JSON string
+ pub fn to_json(&self) -> Result<String> {
+ match &self.columns {
+ Some(cols) => serde_json::to_string(cols).map_err(|e| {
+ DataFusionError::Configuration(format!("Failed to serialize
geometry_columns: {e}"))
+ }),
+ None => Ok(String::new()),
+ }
+ }
+}
+
+impl ConfigField for GeometryColumns {
+ fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str)
{
+ match self.to_json() {
+ Ok(json) if !json.is_empty() => v.some(key, json, description),
+ _ => {}
+ }
+ }
+
+ fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+ if value.is_empty() {
+ self.columns = None;
+ } else {
+ *self = Self::from_json(value)?;
+ }
+ Ok(())
+ }
+}
diff --git a/rust/sedona-geoparquet/src/provider.rs
b/rust/sedona-geoparquet/src/provider.rs
index b9d2ba52..2c0ca7fe 100644
--- a/rust/sedona-geoparquet/src/provider.rs
+++ b/rust/sedona-geoparquet/src/provider.rs
@@ -263,7 +263,8 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
let mut geoparquet_options =
TableGeoParquetOptions::from(parquet_format.options().clone());
if let Some(geometry_columns) = &self.geometry_columns {
- geoparquet_options.geometry_columns =
Some(geometry_columns.clone());
+ geoparquet_options.geometry_columns =
+
crate::options::GeometryColumns::from_map(geometry_columns.clone());
}
geoparquet_options.validate = self.validate;
options.format =
Arc::new(GeoParquetFormat::new(geoparquet_options));
diff --git a/rust/sedona-geoparquet/src/writer.rs
b/rust/sedona-geoparquet/src/writer.rs
index 42c9de4d..1179dbd3 100644
--- a/rust/sedona-geoparquet/src/writer.rs
+++ b/rust/sedona-geoparquet/src/writer.rs
@@ -516,6 +516,7 @@ fn append_float_bbox(
#[cfg(test)]
mod test {
use std::iter::zip;
+ use std::path::Path;
use arrow_array::{create_array, Array, RecordBatch};
use datafusion::datasource::file_format::format_as_file_type;
@@ -545,7 +546,7 @@ mod test {
SessionContext::new_with_state(state).enable_url_table()
}
- async fn test_dataframe_roundtrip(ctx: SessionContext, src: DataFrame) {
+ async fn test_dataframe_roundtrip(ctx: &SessionContext, src: DataFrame) {
let df_batches = src.clone().collect().await.unwrap();
test_write_dataframe(
ctx,
@@ -558,8 +559,18 @@ mod test {
.unwrap()
}
+ async fn test_write_dataframe_sql(
+ ctx: &SessionContext,
+ sql: &str,
+ tmp_parquet: &Path,
+ expected_batches: Vec<RecordBatch>,
+ ) -> Result<()> {
+ ctx.sql(sql).await?.collect().await?;
+ test_write_dataframe_common(ctx, tmp_parquet, expected_batches).await
+ }
+
async fn test_write_dataframe(
- ctx: SessionContext,
+ ctx: &SessionContext,
src: DataFrame,
expected_batches: Vec<RecordBatch>,
options: TableGeoParquetOptions,
@@ -585,6 +596,14 @@ mod test {
DataFrame::new(ctx.state(), plan).collect().await?;
+ test_write_dataframe_common(ctx, &tmp_parquet, expected_batches).await
+ }
+
+ async fn test_write_dataframe_common(
+ ctx: &SessionContext,
+ tmp_parquet: &Path,
+ expected_batches: Vec<RecordBatch>,
+ ) -> Result<()> {
let df_parquet_batches = ctx
.table(tmp_parquet.to_string_lossy().to_string())
.await
@@ -624,8 +643,17 @@ mod test {
assert_eq!(df_parquet_sedona_types, df_sedona_types);
// Check batches without metadata
- for (df_parquet_batch, df_batch) in zip(df_parquet_batches,
expected_batches) {
- assert_eq!(df_parquet_batch.columns(), df_batch.columns())
+ for (i, (df_parquet_batch, df_batch)) in
+ zip(df_parquet_batches, expected_batches).enumerate()
+ {
+ for (j, (df_parquet_column, df_batch_column)) in
+ zip(df_parquet_batch.columns(), df_batch.columns()).enumerate()
+ {
+ assert_eq!(
+ df_parquet_column, df_batch_column,
+ "Batch {i} column {j} did not match"
+ );
+ }
}
Ok(())
@@ -644,7 +672,7 @@ mod test {
.select(vec![col("wkt")])
.unwrap();
- test_dataframe_roundtrip(ctx, df).await;
+ test_dataframe_roundtrip(&ctx, df).await;
}
#[tokio::test]
@@ -653,7 +681,7 @@ mod test {
let ctx = setup_context();
let df = ctx.table(&example).await.unwrap();
- test_dataframe_roundtrip(ctx, df).await;
+ test_dataframe_roundtrip(&ctx, df).await;
}
#[tokio::test]
@@ -662,24 +690,19 @@ mod test {
let ctx = setup_context();
let df = ctx.table(&example).await.unwrap();
- test_dataframe_roundtrip(ctx, df).await;
+ test_dataframe_roundtrip(&ctx, df).await;
}
#[tokio::test]
async fn geoparquet_1_1_basic() {
let example = test_geoparquet("example", "geometry").unwrap();
let ctx = setup_context();
- let df = ctx
- .table(&example)
- .await
- .unwrap()
- // DataFusion internals lose the nullability we assigned to the
bbox
- // and without this line the test fails.
- .filter(Expr::IsNotNull(col("geometry").into()))
- .unwrap();
+ let df = ctx.table(&example).await.unwrap();
- let mut options = TableGeoParquetOptions::new();
- options.geoparquet_version = GeoParquetVersion::V1_1;
+ let options = TableGeoParquetOptions {
+ geoparquet_version: GeoParquetVersion::V1_1,
+ ..Default::default()
+ };
let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
@@ -695,9 +718,29 @@ mod test {
.await
.unwrap();
- test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ // Check that we can do this from a DataFrame
+ test_write_dataframe(&ctx, df, df_batches_with_bbox.clone(), options,
vec![])
.await
.unwrap();
+
+ // Check that we can do this from SQL too
+ let tmpdir = tempdir().unwrap();
+ let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet");
+ test_write_dataframe_sql(
+ &ctx,
+ &format!(
+ r#"COPY (
+ SELECT * FROM '{example}'
+ ) TO '{}'
+ OPTIONS (GEOPARQUET_VERSION '1.1')
+ "#,
+ tmp_parquet.display()
+ ),
+ &tmp_parquet,
+ df_batches_with_bbox,
+ )
+ .await
+ .unwrap();
}
#[tokio::test]
@@ -710,10 +753,6 @@ mod test {
.table(&example)
.await
.unwrap()
- // DataFusion internals lose the nullability we assigned to the
bbox
- // and without this line the test fails.
- .filter(Expr::IsNotNull(col("geometry").into()))
- .unwrap()
.select(vec![
col("wkt"),
col("geometry").alias("geom"),
@@ -723,8 +762,10 @@ mod test {
])
.unwrap();
- let mut options = TableGeoParquetOptions::new();
- options.geoparquet_version = GeoParquetVersion::V1_1;
+ let options = TableGeoParquetOptions {
+ geoparquet_version: GeoParquetVersion::V1_1,
+ ..Default::default()
+ };
let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
@@ -744,7 +785,7 @@ mod test {
.await
.unwrap();
- test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
.await
.unwrap();
}
@@ -760,18 +801,16 @@ mod test {
.table(&example)
.await
.unwrap()
- // DataFusion internals lose the nullability we assigned to the
bbox
- // and without this line the test fails.
- .filter(Expr::IsNotNull(col("geometry").into()))
- .unwrap()
.select(vec![
lit("this is definitely not a bbox").alias("bbox"),
col("geometry"),
])
.unwrap();
- let mut options = TableGeoParquetOptions::new();
- options.geoparquet_version = GeoParquetVersion::V1_1;
+ let mut options = TableGeoParquetOptions {
+ geoparquet_version: GeoParquetVersion::V1_1,
+ ..Default::default()
+ };
let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
@@ -788,7 +827,7 @@ mod test {
// Without setting overwrite_bbox_columns = true, this should error
let err = test_write_dataframe(
- ctx.clone(),
+ &ctx,
df.clone(),
df_batches_with_bbox.clone(),
options.clone(),
@@ -801,7 +840,7 @@ mod test {
.starts_with("Can't overwrite GeoParquet 1.1 bbox column 'bbox'"));
options.overwrite_bbox_columns = true;
- test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
.await
.unwrap();
}
@@ -814,10 +853,6 @@ mod test {
.table(&example)
.await
.unwrap()
- // DataFusion internals loose the nullability we assigned to the
bbox
- // and without this line the test fails.
- .filter(Expr::IsNotNull(col("geometry").into()))
- .unwrap()
.select(vec![
lit("some_partition").alias("part"),
col("wkt"),
@@ -825,8 +860,10 @@ mod test {
])
.unwrap();
- let mut options = TableGeoParquetOptions::new();
- options.geoparquet_version = GeoParquetVersion::V1_1;
+ let options = TableGeoParquetOptions {
+ geoparquet_version: GeoParquetVersion::V1_1,
+ ..Default::default()
+ };
let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
@@ -847,7 +884,7 @@ mod test {
.await
.unwrap();
- test_write_dataframe(ctx, df, df_batches_with_bbox, options,
vec!["part".into()])
+ test_write_dataframe(&ctx, df, df_batches_with_bbox, options,
vec!["part".into()])
.await
.unwrap();
}
@@ -886,8 +923,10 @@ mod test {
])
.unwrap();
- let mut options = TableGeoParquetOptions::new();
- options.geoparquet_version = GeoParquetVersion::V1_1;
+ let options = TableGeoParquetOptions {
+ geoparquet_version: GeoParquetVersion::V1_1,
+ ..Default::default()
+ };
let df_batches_with_bbox = df
.clone()
@@ -901,7 +940,7 @@ mod test {
.await
.unwrap();
- test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+ test_write_dataframe(&ctx, df, df_batches_with_bbox, options, vec![])
.await
.unwrap();
}
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index ee2c9635..2ea9cf03 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -557,7 +557,7 @@ mod tests {
use datafusion::assert_batches_eq;
use sedona_datasource::spec::{Object, OpenReaderArgs};
use sedona_schema::{
- crs::lnglat,
+ crs::{deserialize_crs, lnglat},
datatypes::{Edges, SedonaType},
schema::SedonaSchema,
};
@@ -784,4 +784,40 @@ mod tests {
.await
.expect("should succeed because aws and gcs options were stripped");
}
+
+ #[tokio::test]
+ async fn format_from_external_table() {
+ let ctx = SedonaContext::new_local_interactive().await.unwrap();
+ let example = test_geoparquet("example", "geometry").unwrap();
+ ctx.sql(&format!(
+ r#"
+ CREATE EXTERNAL TABLE test
+ STORED AS PARQUET
+ LOCATION '{example}'
+ OPTIONS ('geometry_columns' '{{"geometry": {{"encoding": "WKB",
"crs": "EPSG:3857"}}}}');
+ "#
+ ))
+ .await
+ .unwrap();
+
+ let df = ctx.ctx.table("test").await.unwrap();
+
+ // Check that the logical plan resulting from a read has the correct
schema
+ assert_eq!(
+ df.schema().clone().strip_qualifiers().field_names(),
+ ["wkt", "geometry"]
+ );
+
+ let sedona_types = df
+ .schema()
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
+ assert_eq!(sedona_types.len(), 2);
+ assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
+ assert_eq!(
+ sedona_types[1],
+ SedonaType::WkbView(Edges::Planar,
deserialize_crs("EPSG:3857").unwrap())
+ );
+ }
}
diff --git a/rust/sedona/src/object_storage.rs
b/rust/sedona/src/object_storage.rs
index 519c4418..f5b366e1 100644
--- a/rust/sedona/src/object_storage.rs
+++ b/rust/sedona/src/object_storage.rs
@@ -34,6 +34,7 @@ use dirs::home_dir;
#[cfg(any(feature = "aws", feature = "azure", feature = "gcp", feature =
"http"))]
use object_store::CredentialProvider;
use object_store::ObjectStore;
+use sedona_geoparquet::options::TableGeoParquetOptions;
use url::Url;
#[cfg(feature = "aws")]
@@ -819,10 +820,21 @@ pub(crate) async fn
register_object_store_and_config_extensions(
// Clone and modify the default table options based on the provided options
let mut table_options = ctx.ctx.state().default_table_options();
- if let Some(format) = format {
- table_options.set_config_format(format);
+ if let Some(ref format) = format {
+ table_options.set_config_format(format.clone());
}
- table_options.alter_with_string_hash_map(options)?;
+
+ let mut options = options.clone();
+
+ // If this is an explicitly Parquet configuration, we need to strip
GeoParquet
+ // options before calling alter_string_with_hash_map.
+ if let Some(&ConfigFileType::PARQUET) = format.as_ref() {
+ for key in TableGeoParquetOptions::TABLE_OPTIONS_KEYS {
+ options.remove(key);
+ }
+ }
+
+ table_options.alter_with_string_hash_map(&options)?;
// Retrieve the appropriate object store based on the scheme, URL, and
modified table options
let store = get_object_store(&ctx.ctx.state(), scheme, url,
&table_options).await?;