This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 11fefbd  feat(rust/sedona-geoparquet): GeoParquet 1.1 write support 
(#175)
11fefbd is described below

commit 11fefbddec15f59e29b5531b6a3ce1187d8b4468
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Oct 7 23:21:14 2025 -0500

    feat(rust/sedona-geoparquet): GeoParquet 1.1 write support (#175)
    
    Co-authored-by: Copilot <[email protected]>
---
 Cargo.lock                                   |   2 +
 python/sedonadb/python/sedonadb/dataframe.py |  31 +-
 python/sedonadb/src/dataframe.rs             |  14 +-
 python/sedonadb/tests/io/test_parquet.py     |  73 +++-
 rust/sedona-geoparquet/Cargo.toml            |   2 +
 rust/sedona-geoparquet/src/format.rs         |  16 +-
 rust/sedona-geoparquet/src/metadata.rs       |  15 +
 rust/sedona-geoparquet/src/options.rs        |  30 +-
 rust/sedona-geoparquet/src/writer.rs         | 594 ++++++++++++++++++++++++++-
 9 files changed, 732 insertions(+), 45 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 72f67a2..93db6d4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4954,6 +4954,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-physical-expr",
  "datafusion-physical-plan",
+ "float_next_after",
  "futures",
  "geo-traits 0.2.0",
  "object_store",
@@ -4961,6 +4962,7 @@ dependencies = [
  "rstest",
  "sedona-common",
  "sedona-expr",
+ "sedona-functions",
  "sedona-geometry",
  "sedona-schema",
  "sedona-testing",
diff --git a/python/sedonadb/python/sedonadb/dataframe.py 
b/python/sedonadb/python/sedonadb/dataframe.py
index 673fbfb..9759dd1 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -16,7 +16,7 @@
 # under the License.
 
 from pathlib import Path
-from typing import TYPE_CHECKING, Union, Optional, Any, Iterable
+from typing import TYPE_CHECKING, Union, Optional, Any, Iterable, Literal
 
 from sedonadb.utility import sedona  # noqa: F401
 
@@ -295,13 +295,15 @@ class DataFrame:
         partition_by: Optional[Union[str, Iterable[str]]] = None,
         sort_by: Optional[Union[str, Iterable[str]]] = None,
         single_file_output: Optional[bool] = None,
+        geoparquet_version: Literal["1.0", "1.1"] = "1.0",
+        overwrite_bbox_columns: bool = False,
     ):
         """Write this DataFrame to one or more (Geo)Parquet files
 
         For input that contains geometry columns, GeoParquet metadata is 
written
         such that suitable readers can recreate Geometry/Geography types when
-        reading the output.
-
+        reading the output and potentially read fewer row groups when only a
+        subset of the file is needed for a given query.
 
         Args:
             path: A filename or directory to which parquet file(s) should be 
written.
@@ -313,6 +315,21 @@ class DataFrame:
                 file vs. writing one file per partition to a directory. By 
default,
                 a single file is written if `partition_by` is unspecified and
                 `path` ends with `.parquet`.
+            geoparquet_version: GeoParquet metadata version to write if output 
contains
+                one or more geometry columns. The default (1.0) is the most 
widely
+                supported and will result in geometry columns being recognized 
in many
+                readers; however, only includes statistics at the file level.
+
+                Use GeoParquet 1.1 to compute an additional bounding box column
+                for every geometry column in the output: some readers can use 
these columns
+                to prune row groups when files contain an effective spatial 
ordering.
+                The extra columns will appear just before their geometry 
column and
+                will be named "[geom_col_name]_bbox" for all geometry columns 
except
+                "geometry", whose bounding box column name is just "bbox".
+            overwrite_bbox_columns: Use `True` to overwrite any bounding box 
columns
+                that already exist in the input. This is useful in a read -> 
modify
+                -> write scenario to ensure these columns are up-to-date. If 
`False`
+                (the default), an error will be raised if a bbox column 
already exists.
 
         Examples:
 
@@ -344,7 +361,13 @@ class DataFrame:
             sort_by = []
 
         self._impl.to_parquet(
-            self._ctx, str(path), partition_by, sort_by, single_file_output
+            self._ctx,
+            str(path),
+            partition_by,
+            sort_by,
+            single_file_output,
+            geoparquet_version,
+            overwrite_bbox_columns,
         )
 
     def show(
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index 940c332..aa1dc60 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -32,7 +32,7 @@ use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
 use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
 use sedona::show::{DisplayMode, DisplayTableOptions};
-use sedona_geoparquet::options::TableGeoParquetOptions;
+use sedona_geoparquet::options::{GeoParquetVersion, TableGeoParquetOptions};
 use sedona_schema::schema::SedonaSchema;
 use tokio::runtime::Runtime;
 
@@ -139,6 +139,7 @@ impl InternalDataFrame {
         ))
     }
 
+    #[allow(clippy::too_many_arguments)]
     fn to_parquet<'py>(
         &self,
         py: Python<'py>,
@@ -147,6 +148,8 @@ impl InternalDataFrame {
         partition_by: Vec<String>,
         sort_by: Vec<String>,
         single_file_output: bool,
+        geoparquet_version: Option<String>,
+        overwrite_bbox_columns: bool,
     ) -> Result<(), PySedonaError> {
         // sort_by needs to be SortExpr. A Vec<String> can unambiguously be 
interpreted as
         // field names (ascending), but other types of expressions aren't 
supported here yet.
@@ -162,7 +165,14 @@ impl InternalDataFrame {
             .with_partition_by(partition_by)
             .with_sort_by(sort_by_expr)
             .with_single_file_output(single_file_output);
-        let writer_options = TableGeoParquetOptions::default();
+
+        let mut writer_options = TableGeoParquetOptions::new();
+        writer_options.overwrite_bbox_columns = overwrite_bbox_columns;
+        if let Some(geoparquet_version) = geoparquet_version {
+            writer_options.geoparquet_version = geoparquet_version.parse()?;
+        } else {
+            writer_options.geoparquet_version = GeoParquetVersion::Omitted;
+        }
 
         wait_for_future(
             py,
diff --git a/python/sedonadb/tests/io/test_parquet.py 
b/python/sedonadb/tests/io/test_parquet.py
index a90e254..73afbfa 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -15,14 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import pytest
+import json
 import tempfile
-import shapely
+from pathlib import Path
+
 import geopandas
 import geopandas.testing
+import pytest
+import shapely
 from pyarrow import parquet
-from pathlib import Path
-from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
+from sedonadb._lib import SedonaError
+from sedonadb.testing import DuckDB, SedonaDB, geom_or_null, skip_if_not_exists
 
 
 @pytest.mark.parametrize("name", ["water-junc", "water-point"])
@@ -257,6 +260,68 @@ def test_write_geoparquet_geometry(con, geoarrow_data, 
name):
         geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
 
 
+def test_write_geoparquet_1_1(con, geoarrow_data):
+    # Checks GeoParquet 1.1 support specifically
+    path = geoarrow_data / "ns-water" / "files" / 
"ns-water_water-junc_geo.parquet"
+    skip_if_not_exists(path)
+
+    gdf = 
geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)
+
+    with tempfile.TemporaryDirectory() as td:
+        tmp_parquet = Path(td) / "tmp.parquet"
+        con.create_data_frame(gdf).to_parquet(
+            tmp_parquet, sort_by="OBJECTID", geoparquet_version="1.1"
+        )
+
+        file_kv_metadata = parquet.ParquetFile(tmp_parquet).metadata.metadata
+        assert b"geo" in file_kv_metadata
+        geo_metadata = json.loads(file_kv_metadata[b"geo"])
+        assert geo_metadata["version"] == "1.1.0"
+        geo_column = geo_metadata["columns"]["geometry"]
+        assert geo_column["covering"] == {
+            "bbox": {
+                "xmin": ["bbox", "xmin"],
+                "ymin": ["bbox", "ymin"],
+                "xmax": ["bbox", "xmax"],
+                "ymax": ["bbox", "ymax"],
+            }
+        }
+
+        # This should still roundtrip through GeoPandas because GeoPandas 
removes
+        # the bbox column on read
+        gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
+        assert all(gdf.columns == gdf_roundtrip.columns)
+        geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+
+        # ...but the bbox column should still be there
+        df_roundtrip = con.read_parquet(tmp_parquet).to_pandas()
+        assert "bbox" in df_roundtrip.columns
+
+        # An attempt to rewrite this should fail because it would have to 
overwrite
+        # the bbox column
+        tmp_parquet2 = Path(td) / "tmp2.parquet"
+        with pytest.raises(
+            SedonaError, match="Can't overwrite GeoParquet 1.1 bbox column 
'bbox'"
+        ):
+            con.read_parquet(tmp_parquet).to_parquet(
+                tmp_parquet2, geoparquet_version="1.1"
+            )
+
+        # ...unless we pass the appropriate option
+        con.read_parquet(tmp_parquet).to_parquet(
+            tmp_parquet2, geoparquet_version="1.1", overwrite_bbox_columns=True
+        )
+        df_roundtrip = con.read_parquet(tmp_parquet2).to_pandas()
+        assert "bbox" in df_roundtrip.columns
+
+
+def test_write_geoparquet_unknown(con):
+    with pytest.raises(SedonaError, match="Unexpected GeoParquet version 
string"):
+        con.sql("SELECT 1 as one").to_parquet(
+            "unused", geoparquet_version="not supported"
+        )
+
+
 def test_write_geoparquet_geography(con, geoarrow_data):
     # Checks a read and write of geography (rounctrip, since nobody else can 
read/write)
     path = (
diff --git a/rust/sedona-geoparquet/Cargo.toml 
b/rust/sedona-geoparquet/Cargo.toml
index d11acd0..0a5110f 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -50,12 +50,14 @@ datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-physical-expr = { workspace = true }
 datafusion-physical-plan = { workspace = true }
+float_next_after = { workspace = true }
 geo-traits = { workspace = true }
 futures = { workspace = true }
 object_store = { workspace = true }
 parquet = { workspace = true }
 sedona-common = { path = "../sedona-common" }
 sedona-expr = { path = "../sedona-expr" }
+sedona-functions = { path = "../sedona-functions" }
 sedona-geometry = { path = "../sedona-geometry" }
 sedona-schema = { path = "../sedona-schema" }
 serde = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/format.rs 
b/rust/sedona-geoparquet/src/format.rs
index bed3dd8..8ff8717 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -48,7 +48,7 @@ use sedona_schema::extension_type::ExtensionType;
 use crate::{
     file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
     metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
-    options::{GeoParquetVersion, TableGeoParquetOptions},
+    options::TableGeoParquetOptions,
     writer::create_geoparquet_writer_physical_plan,
 };
 use datafusion::datasource::physical_plan::ParquetSource;
@@ -91,17 +91,9 @@ impl FileFormatFactory for GeoParquetFormatFactory {
     ) -> Result<Arc<dyn FileFormat>> {
         let mut options_mut = self.options.clone().unwrap_or_default();
         let mut format_options_mut = format_options.clone();
-        options_mut.geoparquet_version =
-            if let Some(version_string) = 
format_options_mut.remove("geoparquet_version") {
-                match version_string.as_str() {
-                    "1.0" => GeoParquetVersion::V1_0,
-                    "1.1" => GeoParquetVersion::V1_1,
-                    "2.0" => GeoParquetVersion::V2_0,
-                    _ => GeoParquetVersion::default(),
-                }
-            } else {
-                GeoParquetVersion::default()
-            };
+        if let Some(version_string) = 
format_options_mut.remove("geoparquet_version") {
+            options_mut.geoparquet_version = version_string.parse()?;
+        }
 
         let inner_format = self.inner.create(state, &format_options_mut)?;
         if let Some(parquet_format) = 
inner_format.as_any().downcast_ref::<ParquetFormat>() {
diff --git a/rust/sedona-geoparquet/src/metadata.rs 
b/rust/sedona-geoparquet/src/metadata.rs
index b4be970..98ca0ff 100644
--- a/rust/sedona-geoparquet/src/metadata.rs
+++ b/rust/sedona-geoparquet/src/metadata.rs
@@ -268,6 +268,21 @@ pub struct GeoParquetCovering {
     pub bbox: GeoParquetBboxCovering,
 }
 
+impl GeoParquetCovering {
+    pub fn bbox_struct_xy(struct_column_name: &str) -> Self {
+        GeoParquetCovering {
+            bbox: GeoParquetBboxCovering {
+                xmin: vec![struct_column_name.to_string(), "xmin".to_string()],
+                ymin: vec![struct_column_name.to_string(), "ymin".to_string()],
+                zmin: None,
+                xmax: vec![struct_column_name.to_string(), "xmax".to_string()],
+                ymax: vec![struct_column_name.to_string(), "ymax".to_string()],
+                zmax: None,
+            },
+        }
+    }
+}
+
 /// Top-level GeoParquet file metadata
 #[derive(Clone, Debug, Serialize, Deserialize)]
 pub struct GeoParquetMetadata {
diff --git a/rust/sedona-geoparquet/src/options.rs 
b/rust/sedona-geoparquet/src/options.rs
index 20fe4bd..43eafad 100644
--- a/rust/sedona-geoparquet/src/options.rs
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::str::FromStr;
+
 use datafusion::config::TableParquetOptions;
+use datafusion_common::{plan_err, DataFusionError};
 
 /// [TableParquetOptions] wrapper with GeoParquet-specific options
 #[derive(Debug, Default, Clone)]
@@ -24,13 +27,22 @@ pub struct TableGeoParquetOptions {
     pub inner: TableParquetOptions,
     /// [GeoParquetVersion] to use when writing GeoParquet files
     pub geoparquet_version: GeoParquetVersion,
+    /// When writing [GeoParquetVersion::V1_1], use `true` to overwrite 
existing
+    /// bounding box columns.
+    pub overwrite_bbox_columns: bool,
+}
+
+impl TableGeoParquetOptions {
+    pub fn new() -> Self {
+        Self::default()
+    }
 }
 
 impl From<TableParquetOptions> for TableGeoParquetOptions {
     fn from(value: TableParquetOptions) -> Self {
         Self {
             inner: value,
-            geoparquet_version: GeoParquetVersion::default(),
+            ..Default::default()
         }
     }
 }
@@ -73,3 +85,19 @@ impl Default for GeoParquetVersion {
         Self::V1_0
     }
 }
+
+impl FromStr for GeoParquetVersion {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "1.0" => Ok(GeoParquetVersion::V1_0),
+            "1.1" => Ok(GeoParquetVersion::V1_1),
+            "2.0" => Ok(GeoParquetVersion::V2_0),
+            "none" => Ok(GeoParquetVersion::Omitted),
+            _ => plan_err!(
+                "Unexpected GeoParquet version string (expected '1.0', '1.1', 
'2.0', or 'none')"
+            ),
+        }
+    }
+}
diff --git a/rust/sedona-geoparquet/src/writer.rs 
b/rust/sedona-geoparquet/src/writer.rs
index 0071233..ef741e1 100644
--- a/rust/sedona-geoparquet/src/writer.rs
+++ b/rust/sedona-geoparquet/src/writer.rs
@@ -15,33 +15,49 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
+use std::{collections::HashMap, sync::Arc};
 
+use arrow_array::{
+    builder::{Float32Builder, NullBufferBuilder},
+    ArrayRef, StructArray,
+};
+use arrow_schema::{DataType, Field, Fields};
 use datafusion::{
     config::TableParquetOptions,
     datasource::{
         file_format::parquet::ParquetSink, physical_plan::FileSinkConfig, 
sink::DataSinkExec,
     },
 };
-use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err, Result};
-use datafusion_expr::dml::InsertOp;
-use datafusion_physical_expr::LexRequirement;
-use datafusion_physical_plan::ExecutionPlan;
+use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err, 
DataFusionError, Result};
+use datafusion_expr::{dml::InsertOp, ColumnarValue, ScalarUDF, Volatility};
+use datafusion_physical_expr::{
+    expressions::Column, LexRequirement, PhysicalExpr, ScalarFunctionExpr,
+};
+use datafusion_physical_plan::{projection::ProjectionExec, ExecutionPlan};
+use float_next_after::NextAfter;
+use geo_traits::GeometryTrait;
 use sedona_common::sedona_internal_err;
+use sedona_expr::scalar_udf::{SedonaScalarKernel, SedonaScalarUDF};
+use sedona_functions::executor::WkbExecutor;
+use sedona_geometry::{
+    bounds::geo_traits_update_xy_bounds,
+    interval::{Interval, IntervalTrait},
+};
 use sedona_schema::{
     crs::lnglat,
     datatypes::{Edges, SedonaType},
+    matchers::ArgMatcher,
     schema::SedonaSchema,
 };
 
 use crate::{
-    metadata::{GeoParquetColumnMetadata, GeoParquetMetadata},
+    metadata::{GeoParquetColumnMetadata, GeoParquetCovering, 
GeoParquetMetadata},
     options::{GeoParquetVersion, TableGeoParquetOptions},
 };
 
 pub fn create_geoparquet_writer_physical_plan(
-    input: Arc<dyn ExecutionPlan>,
-    conf: FileSinkConfig,
+    mut input: Arc<dyn ExecutionPlan>,
+    mut conf: FileSinkConfig,
     order_requirements: Option<LexRequirement>,
     options: &TableGeoParquetOptions,
 ) -> Result<Arc<dyn ExecutionPlan>> {
@@ -50,26 +66,33 @@ pub fn create_geoparquet_writer_physical_plan(
     }
 
     // If there is no geometry, just use the inner implementation
-    let output_geometry_column_indices = 
conf.output_schema().geometry_column_indices()?;
+    let mut output_geometry_column_indices = 
conf.output_schema().geometry_column_indices()?;
     if output_geometry_column_indices.is_empty() {
         return create_inner_writer(input, conf, order_requirements, 
options.inner.clone());
     }
 
     // We have geometry and/or geography! Collect the GeoParquetMetadata we'll 
need to write
     let mut metadata = GeoParquetMetadata::default();
+    let mut bbox_columns = HashMap::new();
 
     // Check the version
     match options.geoparquet_version {
         GeoParquetVersion::V1_0 => {
             metadata.version = "1.0.0".to_string();
         }
+        GeoParquetVersion::V1_1 => {
+            metadata.version = "1.1.0".to_string();
+            (input, bbox_columns) = project_bboxes(input, 
options.overwrite_bbox_columns)?;
+            conf.output_schema = input.schema();
+            output_geometry_column_indices = 
input.schema().geometry_column_indices()?;
+        }
         _ => {
             return not_impl_err!(
                 "GeoParquetVersion {:?} is not yet supported",
                 options.geoparquet_version
             );
         }
-    };
+    }
 
     let field_names = conf
         .output_schema()
@@ -118,6 +141,13 @@ pub fn create_geoparquet_writer_physical_plan(
             );
         }
 
+        // Add bbox column info, if we added one in project_bboxes()
+        if let Some(bbox_column_name) = bbox_columns.get(f.name()) {
+            column_metadata
+                .covering
+                .replace(GeoParquetCovering::bbox_struct_xy(bbox_column_name));
+        }
+
         // Add to metadata
         metadata
             .columns
@@ -140,6 +170,7 @@ pub fn create_geoparquet_writer_physical_plan(
     Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
 }
 
+/// Create a regular Parquet writer like DataFusion would otherwise do.
 fn create_inner_writer(
     input: Arc<dyn ExecutionPlan>,
     conf: FileSinkConfig,
@@ -151,18 +182,243 @@ fn create_inner_writer(
     Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
 }
 
+/// Create a projection that inserts a bbox column for every geometry column
+///
+/// This implements creating the GeoParquet 1.1 bounding box columns,
+/// returning a map from the name of the geometry column to the name of the
+/// bounding box column it created. This does not currently create such
+/// a column for any geography input.
+///
+/// The inserted bounding box columns always directly precede their
+/// corresponding geometry column and are named a follows:
+///
+/// - For a column named "geometry", the bbox column is named "bbox". This
+///   reflects what pretty much everybody is already naming their columns
+///   today.
+/// - For any other column, the bbox column is named "{col_name}_bbox".
+///
+/// If a bbox column name already exists in the schema, we replace it.
+/// In the context of writing a file and all that goes with it, the time it
+/// takes to recompute the bounding box is not important; because writing
+/// GeoParquet 1.1 is opt-in, if somebody *did* have a column with "bbox" or
+/// "some_col_bbox", it is unlikely that replacing it would have unintended
+/// consequences.
+fn project_bboxes(
+    input: Arc<dyn ExecutionPlan>,
+    overwrite_bbox_columns: bool,
+) -> Result<(Arc<dyn ExecutionPlan>, HashMap<String, String>)> {
+    let input_schema = input.schema();
+    let matcher = ArgMatcher::is_geometry();
+    let bbox_udf: Arc<ScalarUDF> = Arc::new(geoparquet_bbox_udf().into());
+    let bbox_udf_name = bbox_udf.name();
+
+    // Calculate and keep track of the expression, name pairs for the bounding 
box
+    // columns we are about to (potentially) create.
+    let mut bbox_exprs = HashMap::<usize, (Arc<dyn PhysicalExpr>, 
String)>::new();
+    let mut bbox_column_names = HashMap::new();
+    for (i, f) in input.schema().fields().iter().enumerate() {
+        let column = Arc::new(Column::new(f.name(), i));
+
+        // If this is a geometry column (not geography), compute the
+        // expression that is a function call to our bbox column creator
+        if matcher.match_type(&SedonaType::from_storage_field(
+            column.return_field(&input_schema)?.as_ref(),
+        )?) {
+            let bbox_field_name = bbox_column_name(f.name());
+            let expr = Arc::new(ScalarFunctionExpr::new(
+                bbox_udf_name,
+                bbox_udf.clone(),
+                vec![column],
+                Arc::new(Field::new("", bbox_type(), true)),
+            ));
+
+            bbox_exprs.insert(i, (expr, bbox_field_name.clone()));
+            bbox_column_names.insert(bbox_field_name, f.name().clone());
+        }
+    }
+
+    // If we don't need to create any bbox columns, don't add an additional
+    // projection at the end of the input plan
+    if bbox_exprs.is_empty() {
+        return Ok((input, HashMap::new()));
+    }
+
+    // Create the projection expressions
+    let mut exprs = Vec::new();
+    for (i, f) in input.schema().fields().iter().enumerate() {
+        // Skip any column with the same name as a bbox column, since we are
+        // about to replace it with the recomputed bbox.
+        if bbox_column_names.contains_key(f.name()) {
+            if overwrite_bbox_columns {
+                continue;
+            } else {
+                return exec_err!(
+                    "Can't overwrite GeoParquet 1.1 bbox column '{}'.
+Use overwrite_bbox_columns = True if this is what was intended.",
+                    f.name()
+                );
+            }
+        }
+
+        // If this is a column with a bbox, insert the bbox expression now
+        if let Some((expr, expr_name)) = bbox_exprs.remove(&i) {
+            exprs.push((expr, expr_name));
+        }
+
+        // Insert the column (whether it does or does not have geometry)
+        let column = Arc::new(Column::new(f.name(), i));
+        exprs.push((column, f.name().clone()));
+    }
+
+    // Create the projection
+    let exec = ProjectionExec::try_new(exprs, input)?;
+
+    // Flip the bbox_column_names into the form our caller needs it
+    let bbox_column_names_by_field = bbox_column_names.drain().map(|(k, v)| 
(v, k)).collect();
+
+    Ok((Arc::new(exec), bbox_column_names_by_field))
+}
+
+fn geoparquet_bbox_udf() -> SedonaScalarUDF {
+    SedonaScalarUDF::new(
+        "geoparquet_bbox",
+        vec![Arc::new(GeoParquetBbox {})],
+        Volatility::Immutable,
+        None,
+    )
+}
+
+fn bbox_column_name(geometry_column_name: &str) -> String {
+    if geometry_column_name == "geometry" {
+        "bbox".to_string()
+    } else {
+        format!("{geometry_column_name}_bbox")
+    }
+}
+
+#[derive(Debug)]
+struct GeoParquetBbox {}
+
+impl SedonaScalarKernel for GeoParquetBbox {
+    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        let matcher = ArgMatcher::new(
+            vec![ArgMatcher::is_geometry()],
+            SedonaType::Arrow(bbox_type()),
+        );
+        matcher.match_args(args)
+    }
+
+    fn invoke_batch(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        let executor = WkbExecutor::new(arg_types, args);
+
+        // Initialize the builders. We use Float32 to minimize the impact
+        // on the file size.
+        let mut nulls = NullBufferBuilder::new(executor.num_iterations());
+        let mut builders = [
+            Float32Builder::with_capacity(executor.num_iterations()),
+            Float32Builder::with_capacity(executor.num_iterations()),
+            Float32Builder::with_capacity(executor.num_iterations()),
+            Float32Builder::with_capacity(executor.num_iterations()),
+        ];
+
+        executor.execute_wkb_void(|maybe_item| {
+            match maybe_item {
+                Some(item) => {
+                    nulls.append(true);
+                    append_float_bbox(&item, &mut builders)?;
+                }
+                None => {
+                    // If we have a null, we set the outer validity bitmap to 
null
+                    // (i.e., "the bounding box is null") but also the inner 
bitmap
+                    // to null to ensure the value is not counted for the 
purposes
+                    // of computing statistics for the nested column.
+                    nulls.append(false);
+                    for builder in &mut builders {
+                        builder.append_null();
+                    }
+                }
+            }
+            Ok(())
+        })?;
+
+        let out_array = StructArray::try_new(
+            bbox_fields(),
+            builders
+                .iter_mut()
+                .map(|builder| -> ArrayRef { Arc::new(builder.finish()) })
+                .collect(),
+            nulls.finish(),
+        )?;
+
+        executor.finish(Arc::new(out_array))
+    }
+}
+
+fn bbox_type() -> DataType {
+    DataType::Struct(bbox_fields())
+}
+
+fn bbox_fields() -> Fields {
+    vec![
+        Field::new("xmin", DataType::Float32, true),
+        Field::new("ymin", DataType::Float32, true),
+        Field::new("xmax", DataType::Float32, true),
+        Field::new("ymax", DataType::Float32, true),
+    ]
+    .into()
+}
+
+// Calculates a bounding box and appends the float32-rounded version to
+// a set of builders, ensuring the float bounds always include the double
+// bounds.
+fn append_float_bbox(
+    wkb: impl GeometryTrait<T = f64>,
+    builders: &mut [Float32Builder],
+) -> Result<()> {
+    let mut x = Interval::empty();
+    let mut y = Interval::empty();
+    geo_traits_update_xy_bounds(wkb, &mut x, &mut y)
+        .map_err(|e| DataFusionError::External(e.into()))?;
+
+    // If we have an empty, append null values to the individual min/max
+    // columns to ensure their values aren't considered in the Parquet
+    // statistics.
+    if x.is_empty() || y.is_empty() {
+        for builder in builders {
+            builder.append_null();
+        }
+    } else {
+        builders[0].append_value((x.lo() as f32).next_after(-f32::INFINITY));
+        builders[1].append_value((y.lo() as f32).next_after(-f32::INFINITY));
+        builders[2].append_value((x.hi() as f32).next_after(f32::INFINITY));
+        builders[3].append_value((y.hi() as f32).next_after(f32::INFINITY));
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod test {
     use std::iter::zip;
 
+    use arrow_array::{create_array, Array, RecordBatch};
     use datafusion::datasource::file_format::format_as_file_type;
     use datafusion::prelude::DataFrame;
     use datafusion::{
         execution::SessionStateBuilder,
-        prelude::{col, SessionContext},
+        prelude::{col, lit, SessionContext},
     };
-    use datafusion_expr::LogicalPlanBuilder;
+    use datafusion_common::cast::{as_float32_array, as_struct_array};
+    use datafusion_common::ScalarValue;
+    use datafusion_expr::{Expr, LogicalPlanBuilder};
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+    use sedona_testing::create::create_array;
     use sedona_testing::data::test_geoparquet;
+    use sedona_testing::testers::ScalarUdfTester;
     use tempfile::tempdir;
 
     use crate::format::GeoParquetFormatFactory;
@@ -177,28 +433,45 @@ mod test {
         SessionContext::new_with_state(state).enable_url_table()
     }
 
-    async fn test_dataframe_roundtrip(ctx: SessionContext, df: DataFrame) {
+    async fn test_dataframe_roundtrip(ctx: SessionContext, src: DataFrame) {
+        let df_batches = src.clone().collect().await.unwrap();
+        test_write_dataframe(
+            ctx,
+            src,
+            df_batches,
+            TableGeoParquetOptions::default(),
+            vec![],
+        )
+        .await
+        .unwrap()
+    }
+
+    async fn test_write_dataframe(
+        ctx: SessionContext,
+        src: DataFrame,
+        expected_batches: Vec<RecordBatch>,
+        options: TableGeoParquetOptions,
+        partition_by: Vec<String>,
+    ) -> Result<()> {
         // It's a bit verbose to trigger this without helpers
-        let format = GeoParquetFormatFactory::new();
+        let format = GeoParquetFormatFactory::new_with_options(options);
         let file_type = format_as_file_type(Arc::new(format));
         let tmpdir = tempdir().unwrap();
 
-        let df_batches = df.clone().collect().await.unwrap();
-
         let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet");
 
         let plan = LogicalPlanBuilder::copy_to(
-            df.into_unoptimized_plan(),
+            src.into_unoptimized_plan(),
             tmp_parquet.to_string_lossy().into(),
             file_type,
             Default::default(),
-            vec![],
+            partition_by,
         )
         .unwrap()
         .build()
         .unwrap();
 
-        DataFrame::new(ctx.state(), plan).collect().await.unwrap();
+        DataFrame::new(ctx.state(), plan).collect().await?;
 
         let df_parquet_batches = ctx
             .table(tmp_parquet.to_string_lossy().to_string())
@@ -208,7 +481,22 @@ mod test {
             .await
             .unwrap();
 
-        assert_eq!(df_parquet_batches.len(), df_batches.len());
+        assert_eq!(df_parquet_batches.len(), expected_batches.len());
+
+        // Check column names
+        let df_parquet_names = df_parquet_batches[0]
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| f.name().clone())
+            .collect::<Vec<_>>();
+        let expected_names = expected_batches[0]
+            .schema()
+            .fields()
+            .iter()
+            .map(|f| f.name().clone())
+            .collect::<Vec<_>>();
+        assert_eq!(df_parquet_names, expected_names);
 
         // Check types, since the schema may not compare byte-for-byte equal 
(CRSes)
         let df_parquet_sedona_types = df_parquet_batches[0]
@@ -216,7 +504,7 @@ mod test {
             .sedona_types()
             .collect::<Result<Vec<_>>>()
             .unwrap();
-        let df_sedona_types = df_batches[0]
+        let df_sedona_types = expected_batches[0]
             .schema()
             .sedona_types()
             .collect::<Result<Vec<_>>>()
@@ -224,9 +512,11 @@ mod test {
         assert_eq!(df_parquet_sedona_types, df_sedona_types);
 
         // Check batches without metadata
-        for (df_parquet_batch, df_batch) in zip(df_parquet_batches, 
df_batches) {
+        for (df_parquet_batch, df_batch) in zip(df_parquet_batches, 
expected_batches) {
             assert_eq!(df_parquet_batch.columns(), df_batch.columns())
         }
+
+        Ok(())
     }
 
     #[tokio::test]
@@ -262,4 +552,264 @@ mod test {
 
         test_dataframe_roundtrip(ctx, df).await;
     }
+
+    #[tokio::test]
+    async fn geoparquet_1_1_basic() {
+        let example = test_geoparquet("example", "geometry").unwrap();
+        let ctx = setup_context();
+        let df = ctx
+            .table(&example)
+            .await
+            .unwrap()
+            // DataFusion internals lose the nullability we assigned to the 
bbox
+            // and without this line the test fails.
+            .filter(Expr::IsNotNull(col("geometry").into()))
+            .unwrap();
+
+        let mut options = TableGeoParquetOptions::new();
+        options.geoparquet_version = GeoParquetVersion::V1_1;
+
+        let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+        let df_batches_with_bbox = df
+            .clone()
+            .select(vec![
+                col("wkt"),
+                bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+                col("geometry"),
+            ])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn geoparquet_1_1_multiple_columns() {
+        let example = test_geoparquet("example", "geometry").unwrap();
+        let ctx = setup_context();
+
+        // Include >1 geometry and sprinkle in some non-geometry columns
+        let df = ctx
+            .table(&example)
+            .await
+            .unwrap()
+            // DataFusion internals lose the nullability we assigned to the 
bbox
+            // and without this line the test fails.
+            .filter(Expr::IsNotNull(col("geometry").into()))
+            .unwrap()
+            .select(vec![
+                col("wkt"),
+                col("geometry").alias("geom"),
+                col("wkt").alias("wkt2"),
+                col("geometry"),
+                col("wkt").alias("wkt3"),
+            ])
+            .unwrap();
+
+        let mut options = TableGeoParquetOptions::new();
+        options.geoparquet_version = GeoParquetVersion::V1_1;
+
+        let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+        let df_batches_with_bbox = df
+            .clone()
+            .select(vec![
+                col("wkt"),
+                bbox_udf.call(vec![col("geom")]).alias("geom_bbox"),
+                col("geom"),
+                col("wkt2"),
+                bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+                col("geometry"),
+                col("wkt3"),
+            ])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn geoparquet_1_1_overwrite_existing_bbox() {
+        let example = test_geoparquet("example", "geometry").unwrap();
+        let ctx = setup_context();
+
+        // Test writing a DataFrame that already has a column named "bbox".
+        // Writing this using GeoParquet 1.1 will overwrite the column.
+        let df = ctx
+            .table(&example)
+            .await
+            .unwrap()
+            // DataFusion internals lose the nullability we assigned to the 
bbox
+            // and without this line the test fails.
+            .filter(Expr::IsNotNull(col("geometry").into()))
+            .unwrap()
+            .select(vec![
+                lit("this is definitely not a bbox").alias("bbox"),
+                col("geometry"),
+            ])
+            .unwrap();
+
+        let mut options = TableGeoParquetOptions::new();
+        options.geoparquet_version = GeoParquetVersion::V1_1;
+
+        let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+        let df_batches_with_bbox = df
+            .clone()
+            .select(vec![
+                bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+                col("geometry"),
+            ])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        // Without setting overwrite_bbox_columns = true, this should error
+        let err = test_write_dataframe(
+            ctx.clone(),
+            df.clone(),
+            df_batches_with_bbox.clone(),
+            options.clone(),
+            vec!["part".into()],
+        )
+        .await
+        .unwrap_err();
+        assert!(err
+            .message()
+            .starts_with("Can't overwrite GeoParquet 1.1 bbox column 'bbox'"));
+
+        options.overwrite_bbox_columns = true;
+        test_write_dataframe(ctx, df, df_batches_with_bbox, options, vec![])
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn geoparquet_1_1_with_partition() {
+        let example = test_geoparquet("example", "geometry").unwrap();
+        let ctx = setup_context();
+        let df = ctx
+            .table(&example)
+            .await
+            .unwrap()
+            // DataFusion internals loose the nullability we assigned to the 
bbox
+            // and without this line the test fails.
+            .filter(Expr::IsNotNull(col("geometry").into()))
+            .unwrap()
+            .select(vec![
+                lit("some_partition").alias("part"),
+                col("wkt"),
+                col("geometry"),
+            ])
+            .unwrap();
+
+        let mut options = TableGeoParquetOptions::new();
+        options.geoparquet_version = GeoParquetVersion::V1_1;
+
+        let bbox_udf: ScalarUDF = geoparquet_bbox_udf().into();
+
+        let df_batches_with_bbox = df
+            .clone()
+            .select(vec![
+                col("wkt"),
+                bbox_udf.call(vec![col("geometry")]).alias("bbox"),
+                col("geometry"),
+                lit(ScalarValue::Dictionary(
+                    DataType::UInt16.into(),
+                    
ScalarValue::Utf8(Some("some_partition".to_string())).into(),
+                ))
+                .alias("part"),
+            ])
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        test_write_dataframe(ctx, df, df_batches_with_bbox, options, 
vec!["part".into()])
+            .await
+            .unwrap();
+    }
+
+    #[test]
+    fn float_bbox() {
+        let tester = ScalarUdfTester::new(geoparquet_bbox_udf().into(), 
vec![WKB_GEOMETRY]);
+        assert_eq!(
+            tester.return_type().unwrap(),
+            SedonaType::Arrow(bbox_type())
+        );
+
+        let array = create_array(
+            &[
+                Some("POINT (0 1)"),
+                Some("POINT (2 3)"),
+                Some("LINESTRING (4 5, 6 7)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+
+        let result = tester.invoke_array(array).unwrap();
+        assert_eq!(result.len(), 3);
+
+        let expected_cols_f64 = [
+            create_array!(Float64, [Some(0.0), Some(2.0), Some(4.0)]),
+            create_array!(Float64, [Some(1.0), Some(3.0), Some(5.0)]),
+            create_array!(Float64, [Some(0.0), Some(2.0), Some(6.0)]),
+            create_array!(Float64, [Some(1.0), Some(3.0), Some(7.0)]),
+        ];
+
+        let result_struct = as_struct_array(&result).unwrap();
+        let actual_cols = result_struct
+            .columns()
+            .iter()
+            .map(|col| as_float32_array(col).unwrap())
+            .collect::<Vec<_>>();
+        for i in 0..result.len() {
+            let actual = actual_cols
+                .iter()
+                .map(|a| a.value(i) as f64)
+                .collect::<Vec<_>>();
+            let expected = expected_cols_f64
+                .iter()
+                .map(|e| e.value(i))
+                .collect::<Vec<_>>();
+
+            // These values aren't equal (the actual values were float32 
values that
+            // had been rounded down); however, they should "contain" the 
expected box)
+            assert!(actual[0] <= expected[0]);
+            assert!(actual[1] <= expected[1]);
+            assert!(actual[2] >= expected[2]);
+            assert!(actual[3] >= expected[3]);
+        }
+    }
+
+    #[test]
+    fn float_bbox_null() {
+        let tester = ScalarUdfTester::new(geoparquet_bbox_udf().into(), 
vec![WKB_GEOMETRY]);
+
+        let null_result = tester.invoke_scalar(ScalarValue::Null).unwrap();
+        assert!(null_result.is_null());
+        if let ScalarValue::Struct(s) = null_result {
+            let actual_cols = s
+                .columns()
+                .iter()
+                .map(|col| as_float32_array(col).unwrap())
+                .collect::<Vec<_>>();
+            assert!(actual_cols[0].is_null(0));
+            assert!(actual_cols[1].is_null(0));
+            assert!(actual_cols[2].is_null(0));
+            assert!(actual_cols[3].is_null(0));
+        } else {
+            panic!("Expected struct")
+        }
+    }
 }


Reply via email to