This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a4ca3bf feat(rust/sedona-geoparquet): Add read support for 
Geometry/Geography Parquet types (#561)
3a4ca3bf is described below

commit 3a4ca3bfe9359e5ab8794a1b7145a7975653bfcc
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Feb 4 09:26:28 2026 -0600

    feat(rust/sedona-geoparquet): Add read support for Geometry/Geography 
Parquet types (#561)
    
    Co-authored-by: Copilot <[email protected]>
---
 python/sedonadb/tests/io/test_parquet.py  |  42 ++-
 rust/sedona-geoparquet/Cargo.toml         |   2 +-
 rust/sedona-geoparquet/src/file_opener.rs | 506 +++++++++++++++++++++++++++---
 rust/sedona-geoparquet/src/format.rs      |  85 +++--
 rust/sedona-geoparquet/src/metadata.rs    | 483 +++++++++++++++++++++++++++-
 rust/sedona-schema/src/schema.rs          |  43 ++-
 6 files changed, 1043 insertions(+), 118 deletions(-)

diff --git a/python/sedonadb/tests/io/test_parquet.py 
b/python/sedonadb/tests/io/test_parquet.py
index fb653945..7f87c027 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -21,10 +21,11 @@ from pathlib import Path
 
 import geopandas
 import geopandas.testing
+import pyarrow as pa
 import pytest
+import sedonadb
 import shapely
 from pyarrow import parquet
-import sedonadb
 from sedonadb._lib import SedonaError
 from sedonadb.testing import DuckDB, SedonaDB, geom_or_null, skip_if_not_exists
 
@@ -43,6 +44,25 @@ def test_read_whole_geoparquet(geoarrow_data, name):
     eng.assert_result(result, gdf)
 
 
[email protected]("name", ["water-junc", "water-point"])
+def test_read_whole_parquet_native(geoarrow_data, name):
+    # Checks a read of some non-trivial files and ensures we match a pyarrow 
read
+    eng = SedonaDB()
+    path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}.parquet"
+    skip_if_not_exists(path)
+
+    tab = parquet.read_table(path)
+    gdf = (
+        geopandas.GeoDataFrame.from_arrow(tab)
+        .sort_values(by="OBJECTID")
+        .reset_index(drop=True)
+    )
+
+    eng.create_view_parquet("tab", path)
+    result = eng.execute_and_collect("""SELECT * FROM tab ORDER BY 
"OBJECTID";""")
+    eng.assert_result(result, gdf)
+
+
 @pytest.mark.parametrize("name", ["geoparquet-1.0.0", "geoparquet-1.1.0", 
"plain"])
 def test_read_sedona_testing(sedona_testing, name):
     # Checks a read of trivial files (some GeoParquet and some not) against a 
DuckDB read
@@ -122,6 +142,26 @@ def test_read_geoparquet_prune_points(geoarrow_data, name, 
predicate):
             write_covering_bbox=True,
             row_group_size=1024,
         )
+        assert "Geometry" not in repr(parquet.ParquetFile(tmp_parquet).schema)
+
+        eng.create_view_parquet("tab", tmp_parquet)
+        result = eng.execute_and_collect(
+            f"""
+            SELECT "OBJECTID", geometry FROM tab
+            WHERE ST_{predicate}(geometry, 
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+            ORDER BY "OBJECTID";
+        """
+        )
+        eng.assert_result(result, gdf)
+
+        # Write a file with Parquet Geometry and ensure a correct result
+        parquet.write_table(
+            pa.table(gdf.to_arrow()),
+            tmp_parquet,
+            row_group_size=1024,
+            store_schema=False,
+        )
+        assert "Geometry" in repr(parquet.ParquetFile(tmp_parquet).schema)
 
         eng.create_view_parquet("tab", tmp_parquet)
         result = eng.execute_and_collect(
diff --git a/rust/sedona-geoparquet/Cargo.toml 
b/rust/sedona-geoparquet/Cargo.toml
index 3e63c236..ba65d5a2 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -46,7 +46,7 @@ arrow-schema = { workspace = true }
 arrow-array = { workspace = true }
 bytes = { workspace = true }
 chrono = { workspace = true }
-datafusion = { workspace = true, features = ["parquet"] }
+datafusion = { workspace = true, features = ["parquet", "sql"] }
 datafusion-catalog = { workspace = true }
 datafusion-common = { workspace = true }
 datafusion-datasource-parquet = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/file_opener.rs 
b/rust/sedona-geoparquet/src/file_opener.rs
index e0aa5274..e41fd7c4 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -26,21 +26,29 @@ use 
datafusion_datasource_parquet::metadata::DFParquetMetadata;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, 
MetricBuilder};
 use object_store::ObjectStore;
-use parquet::file::{
-    metadata::{ParquetMetaData, RowGroupMetaData},
-    statistics::Statistics,
+use parquet::{
+    basic::LogicalType,
+    file::{
+        metadata::{ParquetMetaData, RowGroupMetaData},
+        statistics::Statistics,
+    },
+    geospatial::statistics::GeospatialStatistics,
 };
 use sedona_expr::{
     spatial_filter::{SpatialFilter, TableGeoStatistics},
     statistics::GeoStatistics,
 };
-use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::{
+    bounding_box::BoundingBox,
+    interval::{Interval, IntervalTrait},
+    types::{GeometryTypeAndDimensions, GeometryTypeAndDimensionsSet},
+};
 use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
 
-use crate::metadata::GeoParquetMetadata;
+use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata};
 
 #[derive(Clone)]
-struct GeoParquetFileOpenerMetrics {
+pub(crate) struct GeoParquetFileOpenerMetrics {
     /// How many file ranges are pruned by [`SpatialFilter`]
     ///
     /// Note on "file range": an opener may read only part of a file rather 
than the
@@ -59,7 +67,7 @@ struct GeoParquetFileOpenerMetrics {
 }
 
 impl GeoParquetFileOpenerMetrics {
-    fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
+    pub fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> 
Self {
         Self {
             files_ranges_spatial_pruned: 
MetricBuilder::new(execution_plan_global_metrics)
                 .global_counter("files_ranges_spatial_pruned"),
@@ -78,37 +86,15 @@ impl GeoParquetFileOpenerMetrics {
 /// Pruning happens (for Parquet) in the [FileOpener], so we implement
 /// that here, too.
 #[derive(Clone)]
-pub struct GeoParquetFileOpener {
-    inner: Arc<dyn FileOpener>,
-    object_store: Arc<dyn ObjectStore>,
-    metadata_size_hint: Option<usize>,
-    predicate: Arc<dyn PhysicalExpr>,
-    file_schema: SchemaRef,
-    enable_pruning: bool,
-    metrics: GeoParquetFileOpenerMetrics,
-}
-
-impl GeoParquetFileOpener {
-    /// Create a new file opener
-    pub fn new(
-        inner: Arc<dyn FileOpener>,
-        object_store: Arc<dyn ObjectStore>,
-        metadata_size_hint: Option<usize>,
-        predicate: Arc<dyn PhysicalExpr>,
-        file_schema: SchemaRef,
-        enable_pruning: bool,
-        execution_plan_global_metrics: &ExecutionPlanMetricsSet,
-    ) -> Self {
-        Self {
-            inner,
-            object_store,
-            metadata_size_hint,
-            predicate,
-            file_schema,
-            enable_pruning,
-            metrics: 
GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics),
-        }
-    }
+pub(crate) struct GeoParquetFileOpener {
+    pub inner: Arc<dyn FileOpener>,
+    pub object_store: Arc<dyn ObjectStore>,
+    pub metadata_size_hint: Option<usize>,
+    pub predicate: Arc<dyn PhysicalExpr>,
+    pub file_schema: SchemaRef,
+    pub enable_pruning: bool,
+    pub metrics: GeoParquetFileOpenerMetrics,
+    pub overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
 }
 
 impl FileOpener for GeoParquetFileOpener {
@@ -127,9 +113,10 @@ impl FileOpener for GeoParquetFileOpener {
             if self_clone.enable_pruning {
                 let spatial_filter = 
SpatialFilter::try_from_expr(&self_clone.predicate)?;
 
-                if let Some(geoparquet_metadata) =
-                    
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
-                {
+                if let Some(geoparquet_metadata) = 
GeoParquetMetadata::try_from_parquet_metadata(
+                    &parquet_metadata,
+                    self_clone.overrides.as_ref(),
+                )? {
                     filter_access_plan_using_geoparquet_file_metadata(
                         &self_clone.file_schema,
                         &mut access_plan,
@@ -146,6 +133,14 @@ impl FileOpener for GeoParquetFileOpener {
                         &parquet_metadata,
                         &self_clone.metrics,
                     )?;
+
+                    filter_access_plan_using_native_geostats(
+                        &self_clone.file_schema,
+                        &mut access_plan,
+                        &spatial_filter,
+                        &parquet_metadata,
+                        &self_clone.metrics,
+                    )?;
                 }
             }
 
@@ -210,6 +205,13 @@ fn filter_access_plan_using_geoparquet_covering(
     // but we need flattened integer references to retrieve min/max statistics 
for each of these.
     let covering_specs = parse_column_coverings(file_schema, parquet_metadata, 
metadata)?;
 
+    // If there are no covering specs, don't iterate through the row groups
+    // This has the side-effect of ensuring the row_groups_spatial_matched 
metric is not double
+    // counted except in the rare case where we prune based on both.
+    if covering_specs.iter().all(|spec| spec.is_none()) {
+        return Ok(());
+    }
+
     // Iterate through the row groups
     for i in row_group_indices_to_scan {
         // Generate row group statistics based on the covering statistics
@@ -232,6 +234,65 @@ fn filter_access_plan_using_geoparquet_covering(
     Ok(())
 }
 
+/// Filter an access plan using the Parquet GeoStatistics, if present
+///
+/// Iterates through an existing access plan and skips row groups based on
+/// the Parquet format GeoStatistics (i.e., Geometry/Geography Parquet types).
+fn filter_access_plan_using_native_geostats(
+    file_schema: &SchemaRef,
+    access_plan: &mut ParquetAccessPlan,
+    spatial_filter: &SpatialFilter,
+    parquet_metadata: &ParquetMetaData,
+    metrics: &GeoParquetFileOpenerMetrics,
+) -> Result<()> {
+    let row_group_indices_to_scan = access_plan.row_group_indexes();
+
+    // What we're about to do is a bit of work, so skip it if we can.
+    if row_group_indices_to_scan.is_empty() {
+        return Ok(());
+    }
+
+    // Get the indices we need to index in to the Parquet column()s.
+    // For schemas with no nested columns, this will be a sequential
+    // range of 0..n.
+    let top_level_indices = top_level_column_indices(parquet_metadata);
+
+    // If there are no native geometry or geography logical types at the
+    // top level indices, don't iterate through the row groups. This has the 
side-effect
+    // of ensuring the row_groups_spatial_matched metric is not double counted 
except
+    // in the rare case where we prune based on both.
+    let parquet_schema = parquet_metadata.file_metadata().schema_descr();
+    if top_level_indices.iter().all(|i| {
+        !matches!(
+            parquet_schema.column(*i).logical_type_ref(),
+            Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography { 
.. })
+        )
+    }) {
+        return Ok(());
+    }
+
+    // Iterate through the row groups
+    for i in row_group_indices_to_scan {
+        // Generate row group statistics based on the covering statistics
+        let row_group_column_geo_stats =
+            row_group_native_geo_stats(parquet_metadata.row_group(i), 
&top_level_indices);
+        let row_group_geo_stats = 
TableGeoStatistics::try_from_stats_and_schema(
+            &row_group_column_geo_stats,
+            file_schema,
+        )?;
+
+        // Evaluate predicate!
+        if !spatial_filter.evaluate(&row_group_geo_stats)? {
+            metrics.row_groups_spatial_pruned.add(1);
+            access_plan.skip(i);
+        } else {
+            metrics.row_groups_spatial_matched.add(1);
+        }
+    }
+
+    Ok(())
+}
+
 /// Calculates a Vec of [GeoStatistics] based on GeoParquet file-level metadata
 ///
 /// Each element is either a [GeoStatistics] populated with a [BoundingBox]
@@ -371,6 +432,109 @@ fn parse_column_coverings(
         .collect()
 }
 
+/// Calculates a Vec of [GeoStatistics] based on Parquet-native GeoStatistics
+///
+/// Each element is either a [GeoStatistics] populated with a [BoundingBox]
+/// or [GeoStatistics::unspecified], which is a value that will ensure that
+/// any spatial predicate that references those statistics will evaluate to
+/// true.
+fn row_group_native_geo_stats(
+    row_group_metadata: &RowGroupMetaData,
+    column_indices: &[usize],
+) -> Vec<GeoStatistics> {
+    column_indices
+        .iter()
+        .map(|column_index| {
+            let native_geo_stats_opt = 
row_group_metadata.column(*column_index).geo_statistics();
+            native_geo_stats_opt
+                .map(parquet_geo_stats_to_sedona_geo_stats)
+                .unwrap_or(GeoStatistics::unspecified())
+        })
+        .collect()
+}
+
+/// Convert Parquet [GeospatialStatistics] into Sedona [GeoStatistics]
+///
+/// This also sanity checks the Parquet statistics for non-finite or 
non-sensical
+/// ranges, treating the information as unknown if it fails the sanity check.
+fn parquet_geo_stats_to_sedona_geo_stats(
+    parquet_geo_stats: &GeospatialStatistics,
+) -> GeoStatistics {
+    let mut out = GeoStatistics::unspecified();
+
+    if let Some(native_bbox) = parquet_geo_stats.bounding_box() {
+        let x_range = (native_bbox.get_xmin(), native_bbox.get_xmax());
+        let y_range = (native_bbox.get_ymin(), native_bbox.get_ymax());
+        let z_range = match (native_bbox.get_zmin(), native_bbox.get_zmax()) {
+            (Some(lo), Some(hi)) => Some(Interval::new(lo, hi)),
+            _ => None,
+        };
+        let m_range = match (native_bbox.get_mmin(), native_bbox.get_mmax()) {
+            (Some(lo), Some(hi)) => Some(Interval::new(lo, hi)),
+            _ => None,
+        };
+
+        let bbox = BoundingBox::xyzm(x_range, y_range, z_range, m_range);
+
+        // Sanity check the bbox statistics. If the sanity check fails, don't 
set
+        // a bounding box for pruning. Note that the x width can be < 0 
(wraparound).
+        let mut bbox_is_valid =
+            bbox.x().width().is_finite() && bbox.y().width().is_finite() && 
bbox.y().width() >= 0.0;
+        if let Some(z) = bbox.z() {
+            bbox_is_valid = bbox_is_valid && z.width().is_finite() && 
z.width() >= 0.0;
+        }
+        if let Some(m) = bbox.m() {
+            bbox_is_valid = bbox_is_valid && m.width().is_finite() && 
m.width() >= 0.0;
+        }
+
+        if bbox_is_valid {
+            out = out.with_bbox(Some(bbox));
+        }
+    }
+
+    if let Some(native_geometry_types) = parquet_geo_stats.geospatial_types() {
+        let mut geometry_types = GeometryTypeAndDimensionsSet::new();
+        let mut geometry_types_valid = true;
+        for wkb_id in native_geometry_types {
+            if *wkb_id < 0 {
+                geometry_types_valid = false;
+                break;
+            }
+
+            match GeometryTypeAndDimensions::try_from_wkb_id(*wkb_id as u32) {
+                Ok(type_and_dim) => 
geometry_types.insert_or_ignore(&type_and_dim),
+                Err(_) => {
+                    geometry_types_valid = false;
+                    break;
+                }
+            }
+        }
+
+        if !geometry_types.is_empty() && geometry_types_valid {
+            out = out.with_geometry_types(Some(geometry_types))
+        }
+    }
+
+    out
+}
+
+/// Calculates column indices for top-level columns of file_schema
+///
+/// We need to build a list of top-level indices, where the indices refer to 
the
+/// flattened list of columns (e.g., `.column(i)` in row group metadata).
+fn top_level_column_indices(parquet_metadata: &ParquetMetaData) -> Vec<usize> {
+    let mut top_level_indices = Vec::new();
+    let schema_descr = parquet_metadata.file_metadata().schema_descr();
+    for (i, col) in schema_descr.columns().iter().enumerate() {
+        let path_vec = col.path().parts();
+        if path_vec.len() == 1 {
+            top_level_indices.push(i);
+        }
+    }
+
+    top_level_indices
+}
+
 /// Returns true if there are any fields with GeoArrow metadata
 ///
 /// This is used to defer to the parent implementation if there is no
@@ -767,6 +931,266 @@ mod test {
         ));
     }
 
+    #[test]
+    fn parquet_geo_stats_empty() {
+        let parquet_stats = GeospatialStatistics::new(None, None);
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        assert_eq!(result, GeoStatistics::unspecified());
+    }
+
+    #[test]
+    fn parquet_geo_stats_valid_bbox_xy() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                -180.0, 180.0, -90.0, 90.0,
+            )),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        assert_eq!(
+            result,
+            GeoStatistics::unspecified()
+                .with_bbox(Some(BoundingBox::xy((-180.0, 180.0), (-90.0, 
90.0))))
+        );
+    }
+
+    #[test]
+    fn parquet_geo_stats_valid_bbox_xyz() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_zrange(0.0, 1000.0),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        let expected_bbox = BoundingBox::xyzm(
+            (-180.0, 180.0),
+            (-90.0, 90.0),
+            Some(Interval::new(0.0, 1000.0)),
+            None,
+        );
+        assert_eq!(
+            result,
+            GeoStatistics::unspecified().with_bbox(Some(expected_bbox))
+        );
+    }
+
+    #[test]
+    fn parquet_geo_stats_valid_bbox_xyzm() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_zrange(0.0, 1000.0)
+                    .with_mrange(0.0, 100.0),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        let expected_bbox = BoundingBox::xyzm(
+            (-180.0, 180.0),
+            (-90.0, 90.0),
+            Some(Interval::new(0.0, 1000.0)),
+            Some(Interval::new(0.0, 100.0)),
+        );
+        assert_eq!(
+            result,
+            GeoStatistics::unspecified().with_bbox(Some(expected_bbox))
+        );
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_infinite_x() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                f64::NEG_INFINITY,
+                f64::INFINITY,
+                -90.0,
+                90.0,
+            )),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because x width is not finite
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_nan_x() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                f64::NAN,
+                f64::NAN,
+                -90.0,
+                90.0,
+            )),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because x width is not finite
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_infinite_y() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                -180.0,
+                180.0,
+                f64::NEG_INFINITY,
+                f64::INFINITY,
+            )),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because y width is not finite
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_negative_y_width() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                -180.0, 180.0, 1.0, -1.0,
+            )),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because y width is less than zero
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_infinite_z() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_zrange(f64::NEG_INFINITY, f64::INFINITY),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because z width is not finite
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_negative_z_width() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_zrange(100.0, -100.0),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because z width is less than 0
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_infinite_m() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_mrange(f64::NEG_INFINITY, f64::INFINITY),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because m width is not finite
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_negative_m_width() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(
+                parquet::geospatial::bounding_box::BoundingBox::new(-180.0, 
180.0, -90.0, 90.0)
+                    .with_mrange(50.0, -50.0),
+            ),
+            None,
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified because m width is less than 0
+        assert_eq!(result.bbox(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_valid_geometry_types() {
+        let parquet_stats = GeospatialStatistics::new(None, Some(vec![1, 2, 
3]));
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        assert!(result.geometry_types().is_some());
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_geometry_types_negative() {
+        let parquet_stats = GeospatialStatistics::new(None, Some(vec![1, -1, 
3]));
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified geometry types because of negative value
+        assert_eq!(result.geometry_types(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_geometry_types_unknown_wkb_id() {
+        let parquet_stats = GeospatialStatistics::new(None, Some(vec![1, 
999999]));
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Should return unspecified geometry types because of unknown WKB id
+        assert_eq!(result.geometry_types(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_empty_geometry_types() {
+        let parquet_stats = GeospatialStatistics::new(None, Some(vec![]));
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Empty geometry types should result in None
+        assert_eq!(result.geometry_types(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_combined_valid() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                -180.0, 180.0, -90.0, 90.0,
+            )),
+            Some(vec![1, 2]), // Point, LineString
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        assert!(result.bbox().is_some());
+        assert!(result.geometry_types().is_some());
+    }
+
+    #[test]
+    fn parquet_geo_stats_valid_bbox_invalid_geometry_types() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                -180.0, 180.0, -90.0, 90.0,
+            )),
+            Some(vec![-1]), // Invalid negative
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Bbox should be valid, geometry types should be None
+        assert!(result.bbox().is_some());
+        assert_eq!(result.geometry_types(), None);
+    }
+
+    #[test]
+    fn parquet_geo_stats_invalid_bbox_valid_geometry_types() {
+        let parquet_stats = GeospatialStatistics::new(
+            Some(parquet::geospatial::bounding_box::BoundingBox::new(
+                f64::NAN,
+                180.0,
+                -90.0,
+                90.0,
+            )),
+            Some(vec![1, 2]),
+        );
+        let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+        // Bbox should be None, geometry types should be valid
+        assert_eq!(result.bbox(), None);
+        assert!(result.geometry_types().is_some());
+    }
+
     fn file_schema_with_covering() -> SchemaRef {
         Arc::new(Schema::new(vec![
             Field::new("not_geo", DataType::Binary, true),
diff --git a/rust/sedona-geoparquet/src/format.rs 
b/rust/sedona-geoparquet/src/format.rs
index c3dc6f6b..352d23a3 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -52,7 +52,7 @@ use sedona_common::sedona_internal_err;
 use sedona_schema::extension_type::ExtensionType;
 
 use crate::{
-    file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
+    file_opener::{storage_schema_contains_geo, GeoParquetFileOpener, 
GeoParquetFileOpenerMetrics},
     metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata, 
GeoParquetMetadata},
     options::TableGeoParquetOptions,
     writer::create_geoparquet_writer_physical_plan,
@@ -150,19 +150,6 @@ impl GeoParquetFormat {
     }
 }
 
-/// Merge geometry columns metadata.
-/// `overrides` columns replace any inferred metadata for the same column name.
-fn merge_geometry_columns(
-    base: &mut HashMap<String, GeoParquetColumnMetadata>,
-    overrides: &HashMap<String, GeoParquetColumnMetadata>,
-) -> Result<()> {
-    for (column_name, override_meta) in overrides {
-        base.insert(column_name.clone(), override_meta.clone());
-    }
-
-    Ok(())
-}
-
 #[async_trait]
 impl FileFormat for GeoParquetFormat {
     fn as_any(&self) -> &dyn Any {
@@ -218,40 +205,38 @@ impl FileFormat for GeoParquetFormat {
             .try_collect()
             .await?;
 
-        // Combine multiple partitioned geoparquet files' metadata into a 
single one
-        // See comments in `try_update(..)` for the specific behaviors.
+        // "Not seen" and "No geometry metadata" are identical here. When 
merging new
+        // metadata we check definitions for individual fields to ensure 
consistency
+        // but it is OK to have a file without geometry. Exactly how this gets 
merged
+        // and adapted changed in DataFusion 52...the method here is prone to 
inconsistency
+        // when some files contain geometry and some do not. Column overrides 
can be used
+        // as a workaround in this situation.
         let mut geoparquet_metadata: Option<GeoParquetMetadata> = None;
         for metadata in &metadatas {
-            if let Some(kv) = metadata.file_metadata().key_value_metadata() {
-                for item in kv {
-                    if item.key != "geo" {
-                        continue;
-                    }
-                    if let Some(value) = &item.value {
-                        let this_geoparquet_metadata = 
GeoParquetMetadata::try_new(value)?;
-
-                        match geoparquet_metadata.as_mut() {
-                            Some(existing) => {
-                                
existing.try_update(&this_geoparquet_metadata)?;
-                            }
-                            None => geoparquet_metadata = 
Some(this_geoparquet_metadata),
-                        }
-                    }
+            let this_geoparquet_metadata = 
GeoParquetMetadata::try_from_parquet_metadata(
+                metadata,
+                self.options.geometry_columns.as_ref(),
+            )?;
+
+            match (geoparquet_metadata.as_mut(), this_geoparquet_metadata) {
+                (Some(existing_metadata), Some(this_metadata)) => {
+                    existing_metadata.try_update(&this_metadata)?;
                 }
+                (None, Some(this_metadata)) => {
+                    geoparquet_metadata.replace(this_metadata);
+                }
+                (None, None) => {}
+                (Some(_), None) => {}
             }
         }
 
         // Geometry columns have been inferred from metadata, next combine 
column
         // metadata from options with the inferred ones
-        let mut inferred_geo_cols = match geoparquet_metadata {
+        let inferred_geo_cols = match geoparquet_metadata {
             Some(geo_metadata) => geo_metadata.columns,
             None => HashMap::new(),
         };
 
-        if let Some(geometry_columns) = &self.options.geometry_columns {
-            merge_geometry_columns(&mut inferred_geo_cols, geometry_columns)?;
-        }
-
         if inferred_geo_cols.is_empty() {
             return Ok(inner_schema_without_metadata);
         }
@@ -376,6 +361,7 @@ pub struct GeoParquetFileSource {
     inner: ParquetSource,
     metadata_size_hint: Option<usize>,
     predicate: Option<Arc<dyn PhysicalExpr>>,
+    overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
 }
 
 impl GeoParquetFileSource {
@@ -385,6 +371,7 @@ impl GeoParquetFileSource {
             inner: ParquetSource::new(options.inner.clone()),
             metadata_size_hint: None,
             predicate: None,
+            overrides: options.geometry_columns.clone(),
         }
     }
 
@@ -432,6 +419,7 @@ impl GeoParquetFileSource {
                 inner: parquet_source.clone(),
                 metadata_size_hint,
                 predicate: new_predicate,
+                overrides: None,
             })
         } else {
             sedona_internal_err!("GeoParquetFileSource constructed from 
non-ParquetSource")
@@ -444,6 +432,7 @@ impl GeoParquetFileSource {
             inner: self.inner.with_predicate(predicate.clone()),
             metadata_size_hint: self.metadata_size_hint,
             predicate: Some(predicate),
+            overrides: self.overrides.clone(),
         }
     }
 
@@ -468,6 +457,7 @@ impl GeoParquetFileSource {
             inner: parquet_source,
             metadata_size_hint: self.metadata_size_hint,
             predicate: self.predicate.clone(),
+            overrides: self.overrides.clone(),
         }
     }
 
@@ -477,6 +467,7 @@ impl GeoParquetFileSource {
             inner: self.inner.clone().with_metadata_size_hint(hint),
             metadata_size_hint: Some(hint),
             predicate: self.predicate.clone(),
+            overrides: self.overrides.clone(),
         }
     }
 }
@@ -497,17 +488,18 @@ impl FileSource for GeoParquetFileSource {
             return inner_opener;
         }
 
-        Arc::new(GeoParquetFileOpener::new(
-            inner_opener,
+        Arc::new(GeoParquetFileOpener {
+            inner: inner_opener,
             object_store,
-            self.metadata_size_hint,
-            self.predicate.clone().unwrap(),
-            base_config.file_schema().clone(),
-            self.inner.table_parquet_options().global.pruning,
+            metadata_size_hint: self.metadata_size_hint,
+            predicate: self.predicate.clone().unwrap(),
+            file_schema: base_config.file_schema().clone(),
+            enable_pruning: self.inner.table_parquet_options().global.pruning,
             // HACK: Since there is no public API to set inner's metrics, so 
we use
             // inner's metrics as the ExecutionPlan-global metrics
-            self.inner.metrics(),
-        ))
+            metrics: GeoParquetFileOpenerMetrics::new(self.inner.metrics()),
+            overrides: self.overrides.clone(),
+        })
     }
 
     fn try_pushdown_filters(
@@ -518,11 +510,14 @@ impl FileSource for GeoParquetFileSource {
         let inner_result = self.inner.try_pushdown_filters(filters.clone(), 
config)?;
         match &inner_result.updated_node {
             Some(updated_node) => {
-                let updated_inner = Self::try_from_file_source(
+                let mut updated_inner = Self::try_from_file_source(
                     updated_node.clone(),
                     self.metadata_size_hint,
+                    // TODO should this be None?
                     None,
                 )?;
+                // TODO: part of try_from_file_source()?
+                updated_inner.overrides = self.overrides.clone();
                 Ok(inner_result.with_updated_node(Arc::new(updated_inner)))
             }
             None => Ok(inner_result),
diff --git a/rust/sedona-geoparquet/src/metadata.rs 
b/rust/sedona-geoparquet/src/metadata.rs
index 39beddf0..94a4b6e6 100644
--- a/rust/sedona-geoparquet/src/metadata.rs
+++ b/rust/sedona-geoparquet/src/metadata.rs
@@ -21,12 +21,14 @@
 /// to remove the dependency on GeoArrow since we mostly don't need that here 
yet).
 /// This should be synchronized with that crate when possible.
 /// 
https://github.com/geoarrow/geoarrow-rs/blob/ad2d29ef90050c5cfcfa7dfc0b4a3e5d12e51bbe/rust/geoarrow-geoparquet/src/metadata.rs
-use datafusion_common::Result;
-use parquet::file::metadata::ParquetMetaData;
+use datafusion_common::{plan_err, Result};
+use parquet::basic::{EdgeInterpolationAlgorithm, LogicalType};
+use parquet::file::metadata::{KeyValue, ParquetMetaData};
 use sedona_expr::statistics::GeoStatistics;
 use sedona_geometry::bounding_box::BoundingBox;
 use sedona_geometry::interval::{Interval, IntervalTrait};
 use sedona_geometry::types::GeometryTypeAndDimensionsSet;
+use sedona_schema::schema::primary_geometry_column_from_names;
 use std::collections::HashMap;
 use std::fmt::Display;
 use std::fmt::Write;
@@ -39,7 +41,7 @@ use serde_json::Value;
 ///
 /// In contrast to the _user-specified API_, which is just "WKB" or "Native", 
here we need to know
 /// the actual written encoding type so that we can save that in the metadata.
-#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
 #[allow(clippy::upper_case_acronyms)]
 pub enum GeoParquetColumnEncoding {
     /// Serialized Well-known Binary encoding
@@ -109,7 +111,7 @@ impl Display for GeoParquetColumnEncoding {
 ///
 /// Note: This technique to use the bounding box to improve spatial queries 
does not apply to
 /// geometries that cross the antimeridian. Such geometries are unsupported by 
this method.
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
 pub struct GeoParquetBboxCovering {
     /// The path in the Parquet schema of the column that contains the xmin
     pub xmin: Vec<String>,
@@ -257,7 +259,7 @@ impl GeoParquetBboxCovering {
 /// The covering field specifies optional simplified representations of each 
geometry. The keys of
 /// the "covering" object MUST be a supported encoding. Currently the only 
supported encoding is
 /// "bbox" which specifies the names of bounding box columns
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
 pub struct GeoParquetCovering {
     /// Bounding-box covering
     pub bbox: GeoParquetBboxCovering,
@@ -279,7 +281,7 @@ impl GeoParquetCovering {
 }
 
 /// Top-level GeoParquet file metadata
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
 pub struct GeoParquetMetadata {
     /// The version identifier for the GeoParquet specification.
     pub version: String,
@@ -304,7 +306,7 @@ impl Default for GeoParquetMetadata {
 }
 
 /// GeoParquet column metadata
-#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
 pub struct GeoParquetColumnMetadata {
     /// Name of the geometry encoding format. As of GeoParquet 1.1, `"WKB"`, 
`"point"`,
     /// `"linestring"`, `"polygon"`, `"multipoint"`, `"multilinestring"`, and 
`"multipolygon"` are
@@ -382,20 +384,83 @@ impl GeoParquetMetadata {
     }
 
     /// Construct a [`GeoParquetMetadata`] from a [`ParquetMetaData`]
-    pub fn try_from_parquet_metadata(metadata: &ParquetMetaData) -> 
Result<Option<Self>> {
-        if let Some(kv) = metadata.file_metadata().key_value_metadata() {
-            for item in kv {
-                if item.key != "geo" {
-                    continue;
-                }
+    ///
+    /// This constructor considers (1) the GeoParquet metadata in the key/value
+    /// metadata and (2) Geometry/Geography types present in the Parquet 
schema.
+    /// Specification of a column in the GeoParquet metadata takes precedence.
+    pub fn try_from_parquet_metadata(
+        metadata: &ParquetMetaData,
+        overrides: Option<&HashMap<String, GeoParquetColumnMetadata>>,
+    ) -> Result<Option<Self>> {
+        let schema = metadata.file_metadata().schema_descr().root_schema();
+        let kv_metadata = metadata.file_metadata().key_value_metadata();
+        let mut maybe_metadata = Self::try_from_parquet_metadata_impl(schema, 
kv_metadata)?;
 
-                if let Some(value) = &item.value {
-                    return Ok(Some(Self::try_new(value)?));
-                }
+        // Apply overrides
+        match (&mut maybe_metadata, overrides) {
+            (None, None) | (Some(_), None) => {}
+            (None, Some(geometry_columns)) => {
+                let mut metadata = GeoParquetMetadata::default();
+                metadata.override_columns(geometry_columns)?;
+                maybe_metadata = Some(metadata);
+            }
+            (Some(this_metadata), Some(geometry_columns)) => {
+                this_metadata.override_columns(geometry_columns)?;
             }
         }
 
-        Ok(None)
+        Ok(maybe_metadata)
+    }
+
+    /// For testing, as it is easier to simulate the schema and key/value 
metadata
+    /// than the whole ParquetMetaData.
+    fn try_from_parquet_metadata_impl(
+        root_schema: &parquet::schema::types::Type,
+        kv_metadata: Option<&Vec<KeyValue>>,
+    ) -> Result<Option<Self>> {
+        let mut columns_from_schema = columns_from_parquet_schema(root_schema, 
kv_metadata)?;
+
+        if let Some(value) = get_parquet_key_value("geo", kv_metadata) {
+            // Values in the GeoParquet metadata take precedence over those 
from the
+            // Parquet schema
+            let mut out = Self::try_new(&value)?;
+            for (k, v) in columns_from_schema.drain() {
+                out.columns.entry(k).or_insert(v);
+            }
+
+            return Ok(Some(out));
+        }
+
+        // No geo metadata key, but we have geo columns from the schema
+        if !columns_from_schema.is_empty() {
+            // To keep metadata valid, ensure we set a primary column 
deterministically
+            let mut column_names = 
columns_from_schema.keys().collect::<Vec<_>>();
+            column_names.sort();
+            let primary_index = 
primary_geometry_column_from_names(column_names.iter())
+                .expect("non-empty input always returns a value");
+            let primary_column = column_names[primary_index].to_string();
+
+            Ok(Some(Self {
+                version: "2.0.0".to_string(),
+                columns: columns_from_schema,
+                primary_column,
+            }))
+        } else {
+            Ok(None)
+        }
+    }
+
+    /// Replace any inferred metadata for the same column name with overrides
+    pub fn override_columns(
+        &mut self,
+        overrides: &HashMap<String, GeoParquetColumnMetadata>,
+    ) -> Result<()> {
+        for (column_name, override_meta) in overrides {
+            self.columns
+                .insert(column_name.clone(), override_meta.clone());
+        }
+
+        Ok(())
     }
 
     /// Update a GeoParquetMetadata from another file's metadata
@@ -551,9 +616,139 @@ impl GeoParquetColumnMetadata {
     }
 }
 
+/// Collect column metadata from (top-level) Parquet Geometry/Geography columns
+///
+/// Converts embedded schema information into GeoParquet column metadata. 
Because
+/// GeoParquet metadata does not support nested columns, this does not 
currently
+/// support them either.
+fn columns_from_parquet_schema(
+    root_schema: &parquet::schema::types::Type,
+    kv_metadata: Option<&Vec<KeyValue>>,
+) -> Result<HashMap<String, GeoParquetColumnMetadata>> {
+    let mut columns = HashMap::new();
+
+    for field in root_schema.get_fields() {
+        let column_metadata_opt =
+            
column_from_logical_type(field.get_basic_info().logical_type_ref(), 
kv_metadata)?;
+        if let Some(column_metadata) = column_metadata_opt {
+            let name = field.name().to_string();
+            columns.insert(name, column_metadata);
+        }
+    }
+
+    Ok(columns)
+}
+
+/// Convert a single LogicalType to GeoParquetColumnMetadata, if possible
+///
+/// Returns None for something that is not Geometry or Geography.
+fn column_from_logical_type(
+    logical_type: Option<&LogicalType>,
+    kv_metadata: Option<&Vec<KeyValue>>,
+) -> Result<Option<GeoParquetColumnMetadata>> {
+    if let Some(logical_type) = logical_type {
+        let mut column_metadata = GeoParquetColumnMetadata::default();
+
+        match logical_type {
+            LogicalType::Geometry { crs } => {
+                column_metadata.crs = 
geoparquet_crs_from_logical_type(crs.as_ref(), kv_metadata);
+                Ok(Some(column_metadata))
+            }
+            LogicalType::Geography { crs, algorithm } => {
+                column_metadata.crs = 
geoparquet_crs_from_logical_type(crs.as_ref(), kv_metadata);
+
+                let edges = match algorithm {
+                    None | Some(EdgeInterpolationAlgorithm::SPHERICAL) => 
"spherical",
+                    Some(EdgeInterpolationAlgorithm::VINCENTY) => "vincenty",
+                    Some(EdgeInterpolationAlgorithm::ANDOYER) => "andoyer",
+                    Some(EdgeInterpolationAlgorithm::THOMAS) => "thomas",
+                    Some(EdgeInterpolationAlgorithm::KARNEY) => "karney",
+                    Some(_) => {
+                        return plan_err!(
+                            "Unsupported edge interpolation algorithm in 
Parquet schema"
+                        )
+                    }
+                };
+                column_metadata.edges = Some(edges.to_string());
+                Ok(Some(column_metadata))
+            }
+            _ => Ok(None),
+        }
+    } else {
+        Ok(None)
+    }
+}
+
+/// Parse a CRS from a Parquet logical type into one that will transfer to 
GeoParquet
+/// and then to GeoArrow
+///
+/// This is identical to what will happen in the forthcoming Arrow release 
(packaged
+/// with DataFusion 52), except this version also resolves projjson:xxx CRSes 
from
+/// the key/value metadata (in Arrow this was hard because the conversion code 
was
+/// not set up in a way that this was easy to do; here it is easy because we 
have
+/// the whole ParquetMetadata).
+fn geoparquet_crs_from_logical_type(
+    crs: Option<&String>,
+    kv_metadata: Option<&Vec<KeyValue>>,
+) -> Option<Value> {
+    if let Some(crs_str) = crs {
+        // Treat an empty string the same as lon/lat. There is no concept of a 
"missing"
+        // CRS in a Parquet LogicalType although this can be expressed with 
srid:0 in a
+        // pinch.
+        if crs_str.is_empty() {
+            return None;
+        }
+
+        // Resolve projjson:some_key if possible. If this is not possible, the 
value that
+        // will be passed on to the GeoParquet column metadata is the full 
string
+        // "projjson:some_key".
+        if let Some(crs_kv_key) = crs_str.strip_prefix("projjson:") {
+            if let Some(crs_from_kv) = get_parquet_key_value(crs_kv_key, 
kv_metadata) {
+                return Some(Value::String(crs_from_kv.to_string()));
+            }
+        }
+
+        // Resolve srid:<int value> to "<int value>", which is accepted by 
SedonaDB internals
+        // and is interpreted as an EPSG code. There is no guarantee that 
other implementations
+        // will do this but it probably better than erroring for an unknown 
CRS.
+        if let Some(srid_string) = crs_str.strip_prefix("srid:") {
+            return Some(Value::String(srid_string.to_string()));
+        }
+
+        // Try to parse the output as JSON such that the resulting column 
metadata is closer
+        // to what would have been in the GeoParquet metadata (e.g., where 
PROJJSON is parsed).
+        if let Ok(value) = crs_str.parse::<Value>() {
+            Some(value)
+        } else {
+            Some(Value::String(crs_str.to_string()))
+        }
+    } else {
+        None
+    }
+}
+
+/// Helper to get a key from the key/value metadata
+fn get_parquet_key_value(key: &str, kv_metadata: Option<&Vec<KeyValue>>) -> 
Option<String> {
+    if let Some(kv_metadata) = kv_metadata {
+        for kv in kv_metadata {
+            if kv.key == key {
+                if let Some(value) = &kv.value {
+                    return Some(value.to_string());
+                }
+            }
+        }
+    }
+
+    None
+}
+
 #[cfg(test)]
 mod test {
+    use std::sync::Arc;
+
     use geo_traits::Dimensions;
+    use parquet::basic::{Repetition, Type as PhysicalType};
+    use parquet::schema::types::Type;
     use sedona_geometry::types::{GeometryTypeAndDimensions, GeometryTypeId};
 
     use super::*;
@@ -574,4 +769,258 @@ mod test {
             GeometryTypeAndDimensions::new(GeometryTypeId::Point, 
Dimensions::Xy)
         );
     }
+
+    #[test]
+    fn test_from_parquet_metadata_with_parquet_types() {
+        let s = r#"{
+            "version": "2.0.0",
+            "primary_column": "geom_geoparquet",
+            "columns": {
+                "geom_geoparquet": {
+                    "encoding": "WKB",
+                    "crs": "geom_geoparquet_crs"
+                }
+            }
+        }"#;
+
+        let kv_metadata_with_geo_key = make_kv_metadata(&[("geo", s)]);
+
+        let schema_no_parquet_geo = make_parquet_schema(&[("geom_geoparquet", 
None)]);
+        let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+            &schema_no_parquet_geo,
+            kv_metadata_with_geo_key.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.version, "2.0.0");
+        assert_eq!(metadata.primary_column, "geom_geoparquet");
+        assert_eq!(metadata.columns.len(), 1);
+        assert!(metadata.columns.contains_key("geom_geoparquet"));
+        assert_eq!(
+            metadata.columns.get("geom_geoparquet").unwrap().crs,
+            Some(Value::String("geom_geoparquet_crs".to_string()))
+        );
+
+        let schema_additional_parquet_geo = make_parquet_schema(&[
+            ("geom_geoparquet", None),
+            ("geom_parquet", Some(LogicalType::Geometry { crs: None })),
+        ]);
+        let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+            &schema_additional_parquet_geo,
+            kv_metadata_with_geo_key.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.columns.len(), 2);
+        assert!(metadata.columns.contains_key("geom_geoparquet"));
+        assert!(metadata.columns.contains_key("geom_parquet"));
+
+        let schema_overlapping_columns =
+            make_parquet_schema(&[("geom_geoparquet", 
Some(LogicalType::Geometry { crs: None }))]);
+        let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+            &schema_overlapping_columns,
+            kv_metadata_with_geo_key.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.columns.len(), 1);
+        assert!(metadata.columns.contains_key("geom_geoparquet"));
+        // Ensure we use the CRS provided by the GeoParquet metadata instead 
of the CRS
+        // provided by the type as a test that GeoParquet columns take 
precedence if both
+        // are present.
+        assert_eq!(
+            metadata.columns.get("geom_geoparquet").unwrap().crs,
+            Some(Value::String("geom_geoparquet_crs".to_string()))
+        );
+
+        let schema_only_parquet_geo =
+            make_parquet_schema(&[("geom_parquet", Some(LogicalType::Geometry 
{ crs: None }))]);
+        let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+            &schema_only_parquet_geo,
+            None, // No key/value metadata
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.columns.len(), 1);
+        assert!(metadata.columns.contains_key("geom_parquet"));
+    }
+
+    #[test]
+    fn test_column_from_logical_type() {
+        let kv_metadata = make_kv_metadata(&[("some_projjson_key", 
"some_projjson_value")]);
+
+        // A missing logical type annotation is never Geometry or Geography
+        assert_eq!(
+            column_from_logical_type(None, kv_metadata.as_ref()).unwrap(),
+            None
+        );
+
+        // Logical type that is present but not Geometry or Geography should 
return None
+        assert_eq!(
+            column_from_logical_type(Some(&LogicalType::Uuid), 
kv_metadata.as_ref()).unwrap(),
+            None
+        );
+
+        // Geometry logical type
+        let metadata = column_from_logical_type(
+            Some(&LogicalType::Geometry { crs: None }),
+            kv_metadata.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata, GeoParquetColumnMetadata::default());
+
+        // Ensure CRS is translated
+        let metadata = column_from_logical_type(
+            Some(&LogicalType::Geometry {
+                crs: Some("projjson:some_projjson_key".to_string()),
+            }),
+            kv_metadata.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(
+            metadata.crs,
+            Some(Value::String("some_projjson_value".to_string()))
+        );
+
+        // Geography logical type
+        let metadata = column_from_logical_type(
+            Some(&LogicalType::Geography {
+                crs: None,
+                algorithm: None,
+            }),
+            kv_metadata.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.edges, Some("spherical".to_string()));
+
+        // Ensure CRS is translated
+        let metadata = column_from_logical_type(
+            Some(&LogicalType::Geography {
+                crs: Some("projjson:some_projjson_key".to_string()),
+                algorithm: None,
+            }),
+            kv_metadata.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(
+            metadata.crs,
+            Some(Value::String("some_projjson_value".to_string()))
+        );
+
+        // Ensure algorithm is translated
+        let metadata = column_from_logical_type(
+            Some(&LogicalType::Geography {
+                crs: None,
+                algorithm: Some(EdgeInterpolationAlgorithm::VINCENTY),
+            }),
+            kv_metadata.as_ref(),
+        )
+        .unwrap()
+        .unwrap();
+        assert_eq!(metadata.edges, Some("vincenty".to_string()));
+    }
+
+    #[test]
+    fn test_crs_from_logical_type() {
+        let kv_metadata = make_kv_metadata(&[("some_projjson_key", 
"some_projjson_value")]);
+
+        // None and "" both map to a GeoParquet default CRS
+        assert_eq!(
+            geoparquet_crs_from_logical_type(None, kv_metadata.as_ref()),
+            None
+        );
+        assert_eq!(
+            geoparquet_crs_from_logical_type(Some(&"".to_string()), 
kv_metadata.as_ref()),
+            None
+        );
+
+        // projjson: string should resolve from the key/value metadata or be 
passed on
+        // verbatim if it can't be resolved.
+        assert_eq!(
+            geoparquet_crs_from_logical_type(None, kv_metadata.as_ref()),
+            None
+        );
+        assert_eq!(
+            geoparquet_crs_from_logical_type(
+                Some(&"projjson:some_projjson_key".to_string()),
+                kv_metadata.as_ref()
+            ),
+            Some(Value::String("some_projjson_value".to_string()))
+        );
+        assert_eq!(
+            geoparquet_crs_from_logical_type(
+                Some(&"projjson:not_in_kv_metadata".to_string()),
+                kv_metadata.as_ref()
+            ),
+            Some(Value::String("projjson:not_in_kv_metadata".to_string()))
+        );
+
+        // srid: string should have its prefix stripped
+        assert_eq!(
+            geoparquet_crs_from_logical_type(Some(&"srid:1234".to_string()), 
kv_metadata.as_ref()),
+            Some(Value::String("1234".to_string()))
+        );
+
+        // strings should get passed through verbatim
+        assert_eq!(
+            geoparquet_crs_from_logical_type(Some(&"EPSG:1234".to_string()), 
kv_metadata.as_ref()),
+            Some(Value::String("EPSG:1234".to_string()))
+        );
+
+        // Valid JSON should be parsed
+        assert_eq!(
+            geoparquet_crs_from_logical_type(
+                Some(&"\"EPSG:1234\"".to_string()),
+                kv_metadata.as_ref()
+            ),
+            Some(Value::String("EPSG:1234".to_string()))
+        );
+    }
+
+    #[test]
+    fn test_get_kv_metadata() {
+        let kv_metadata = make_kv_metadata(&[("key", "value")]);
+        assert_eq!(
+            get_parquet_key_value("key", kv_metadata.as_ref()),
+            Some("value".to_string())
+        );
+    }
+
+    // Helper to make a Parquet schema. None here means Binary since it's the 
only primitive
+    // type we need to test
+    fn make_parquet_schema(fields: &[(&str, Option<LogicalType>)]) -> 
parquet::schema::types::Type {
+        let fields = fields
+            .iter()
+            .map(|(name, logical_type)| {
+                let mut builder = Type::primitive_type_builder(name, 
PhysicalType::BYTE_ARRAY)
+                    .with_repetition(Repetition::OPTIONAL);
+                if let Some(lt) = logical_type {
+                    builder = builder.with_logical_type(Some(lt.clone()));
+                }
+                Arc::new(builder.build().unwrap())
+            })
+            .collect::<Vec<_>>();
+
+        Type::group_type_builder("schema")
+            .with_fields(fields)
+            .build()
+            .unwrap()
+    }
+
+    // Helper to simulate key/value metadata
+    fn make_kv_metadata(pairs: &[(&str, &str)]) -> Option<Vec<KeyValue>> {
+        Some(
+            pairs
+                .iter()
+                .map(|(key, value)| KeyValue {
+                    key: key.to_string(),
+                    value: Some(value.to_string()),
+                })
+                .collect(),
+        )
+    }
 }
diff --git a/rust/sedona-schema/src/schema.rs b/rust/sedona-schema/src/schema.rs
index 9ce1d3a4..f13dab24 100644
--- a/rust/sedona-schema/src/schema.rs
+++ b/rust/sedona-schema/src/schema.rs
@@ -77,24 +77,41 @@ impl SedonaSchema for Schema {
 
     fn primary_geometry_column_index(&self) -> Result<Option<usize>> {
         let indices = self.geometry_column_indices()?;
-        if indices.is_empty() {
-            return Ok(None);
+        let primary_index_opt =
+            primary_geometry_column_from_names(indices.iter().map(|i| 
self.field(*i).name()));
+        if let Some(primary_index) = primary_index_opt {
+            Ok(Some(indices[primary_index]))
+        } else {
+            Ok(None)
         }
+    }
+}
 
-        let names_map = indices
-            .iter()
-            .rev()
-            .map(|i| (self.field(*i).name().to_lowercase(), *i))
-            .collect::<HashMap<_, _>>();
+/// Compute the primary geometry column given a list of geometry column names
+///
+/// This implementation powers [SedonaSchema::primary_geometry_column_index] 
and is
+/// useful for applying a consistent heuristic when only a list of names are
+/// available (e.g., GeoParquet metadata).
+pub fn primary_geometry_column_from_names(
+    column_names: impl DoubleEndedIterator<Item = impl AsRef<str>>,
+) -> Option<usize> {
+    let names_map = column_names
+        .rev()
+        .enumerate()
+        .map(|(i, name)| (name.as_ref().to_lowercase(), i))
+        .collect::<HashMap<_, _>>();
+
+    if names_map.is_empty() {
+        return None;
+    }
 
-        for special_name in ["geometry", "geography", "geom", "geog"] {
-            if let Some(i) = names_map.get(special_name) {
-                return Ok(Some(*i));
-            }
+    for special_name in ["geometry", "geography", "geom", "geog"] {
+        if let Some(i) = names_map.get(special_name) {
+            return Some(names_map.len() - *i - 1);
         }
-
-        Ok(Some(indices[0]))
     }
+
+    Some(0)
 }
 
 #[cfg(test)]

Reply via email to