This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 3a4ca3bf feat(rust/sedona-geoparquet): Add read support for
Geometry/Geography Parquet types (#561)
3a4ca3bf is described below
commit 3a4ca3bfe9359e5ab8794a1b7145a7975653bfcc
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Feb 4 09:26:28 2026 -0600
feat(rust/sedona-geoparquet): Add read support for Geometry/Geography
Parquet types (#561)
Co-authored-by: Copilot <[email protected]>
---
python/sedonadb/tests/io/test_parquet.py | 42 ++-
rust/sedona-geoparquet/Cargo.toml | 2 +-
rust/sedona-geoparquet/src/file_opener.rs | 506 +++++++++++++++++++++++++++---
rust/sedona-geoparquet/src/format.rs | 85 +++--
rust/sedona-geoparquet/src/metadata.rs | 483 +++++++++++++++++++++++++++-
rust/sedona-schema/src/schema.rs | 43 ++-
6 files changed, 1043 insertions(+), 118 deletions(-)
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index fb653945..7f87c027 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -21,10 +21,11 @@ from pathlib import Path
import geopandas
import geopandas.testing
+import pyarrow as pa
import pytest
+import sedonadb
import shapely
from pyarrow import parquet
-import sedonadb
from sedonadb._lib import SedonaError
from sedonadb.testing import DuckDB, SedonaDB, geom_or_null, skip_if_not_exists
@@ -43,6 +44,25 @@ def test_read_whole_geoparquet(geoarrow_data, name):
eng.assert_result(result, gdf)
[email protected]("name", ["water-junc", "water-point"])
+def test_read_whole_parquet_native(geoarrow_data, name):
+ # Checks a read of some non-trivial files and ensures we match a pyarrow
read
+ eng = SedonaDB()
+ path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}.parquet"
+ skip_if_not_exists(path)
+
+ tab = parquet.read_table(path)
+ gdf = (
+ geopandas.GeoDataFrame.from_arrow(tab)
+ .sort_values(by="OBJECTID")
+ .reset_index(drop=True)
+ )
+
+ eng.create_view_parquet("tab", path)
+ result = eng.execute_and_collect("""SELECT * FROM tab ORDER BY
"OBJECTID";""")
+ eng.assert_result(result, gdf)
+
+
@pytest.mark.parametrize("name", ["geoparquet-1.0.0", "geoparquet-1.1.0",
"plain"])
def test_read_sedona_testing(sedona_testing, name):
# Checks a read of trivial files (some GeoParquet and some not) against a
DuckDB read
@@ -122,6 +142,26 @@ def test_read_geoparquet_prune_points(geoarrow_data, name,
predicate):
write_covering_bbox=True,
row_group_size=1024,
)
+ assert "Geometry" not in repr(parquet.ParquetFile(tmp_parquet).schema)
+
+ eng.create_view_parquet("tab", tmp_parquet)
+ result = eng.execute_and_collect(
+ f"""
+ SELECT "OBJECTID", geometry FROM tab
+ WHERE ST_{predicate}(geometry,
ST_SetCRS({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
+ ORDER BY "OBJECTID";
+ """
+ )
+ eng.assert_result(result, gdf)
+
+ # Write a file with Parquet Geometry and ensure a correct result
+ parquet.write_table(
+ pa.table(gdf.to_arrow()),
+ tmp_parquet,
+ row_group_size=1024,
+ store_schema=False,
+ )
+ assert "Geometry" in repr(parquet.ParquetFile(tmp_parquet).schema)
eng.create_view_parquet("tab", tmp_parquet)
result = eng.execute_and_collect(
diff --git a/rust/sedona-geoparquet/Cargo.toml
b/rust/sedona-geoparquet/Cargo.toml
index 3e63c236..ba65d5a2 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -46,7 +46,7 @@ arrow-schema = { workspace = true }
arrow-array = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
-datafusion = { workspace = true, features = ["parquet"] }
+datafusion = { workspace = true, features = ["parquet", "sql"] }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true }
datafusion-datasource-parquet = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/file_opener.rs
b/rust/sedona-geoparquet/src/file_opener.rs
index e0aa5274..e41fd7c4 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -26,21 +26,29 @@ use
datafusion_datasource_parquet::metadata::DFParquetMetadata;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet,
MetricBuilder};
use object_store::ObjectStore;
-use parquet::file::{
- metadata::{ParquetMetaData, RowGroupMetaData},
- statistics::Statistics,
+use parquet::{
+ basic::LogicalType,
+ file::{
+ metadata::{ParquetMetaData, RowGroupMetaData},
+ statistics::Statistics,
+ },
+ geospatial::statistics::GeospatialStatistics,
};
use sedona_expr::{
spatial_filter::{SpatialFilter, TableGeoStatistics},
statistics::GeoStatistics,
};
-use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::{
+ bounding_box::BoundingBox,
+ interval::{Interval, IntervalTrait},
+ types::{GeometryTypeAndDimensions, GeometryTypeAndDimensionsSet},
+};
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
-use crate::metadata::GeoParquetMetadata;
+use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata};
#[derive(Clone)]
-struct GeoParquetFileOpenerMetrics {
+pub(crate) struct GeoParquetFileOpenerMetrics {
/// How many file ranges are pruned by [`SpatialFilter`]
///
/// Note on "file range": an opener may read only part of a file rather
than the
@@ -59,7 +67,7 @@ struct GeoParquetFileOpenerMetrics {
}
impl GeoParquetFileOpenerMetrics {
- fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
+ pub fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) ->
Self {
Self {
files_ranges_spatial_pruned:
MetricBuilder::new(execution_plan_global_metrics)
.global_counter("files_ranges_spatial_pruned"),
@@ -78,37 +86,15 @@ impl GeoParquetFileOpenerMetrics {
/// Pruning happens (for Parquet) in the [FileOpener], so we implement
/// that here, too.
#[derive(Clone)]
-pub struct GeoParquetFileOpener {
- inner: Arc<dyn FileOpener>,
- object_store: Arc<dyn ObjectStore>,
- metadata_size_hint: Option<usize>,
- predicate: Arc<dyn PhysicalExpr>,
- file_schema: SchemaRef,
- enable_pruning: bool,
- metrics: GeoParquetFileOpenerMetrics,
-}
-
-impl GeoParquetFileOpener {
- /// Create a new file opener
- pub fn new(
- inner: Arc<dyn FileOpener>,
- object_store: Arc<dyn ObjectStore>,
- metadata_size_hint: Option<usize>,
- predicate: Arc<dyn PhysicalExpr>,
- file_schema: SchemaRef,
- enable_pruning: bool,
- execution_plan_global_metrics: &ExecutionPlanMetricsSet,
- ) -> Self {
- Self {
- inner,
- object_store,
- metadata_size_hint,
- predicate,
- file_schema,
- enable_pruning,
- metrics:
GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics),
- }
- }
+pub(crate) struct GeoParquetFileOpener {
+ pub inner: Arc<dyn FileOpener>,
+ pub object_store: Arc<dyn ObjectStore>,
+ pub metadata_size_hint: Option<usize>,
+ pub predicate: Arc<dyn PhysicalExpr>,
+ pub file_schema: SchemaRef,
+ pub enable_pruning: bool,
+ pub metrics: GeoParquetFileOpenerMetrics,
+ pub overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
}
impl FileOpener for GeoParquetFileOpener {
@@ -127,9 +113,10 @@ impl FileOpener for GeoParquetFileOpener {
if self_clone.enable_pruning {
let spatial_filter =
SpatialFilter::try_from_expr(&self_clone.predicate)?;
- if let Some(geoparquet_metadata) =
-
GeoParquetMetadata::try_from_parquet_metadata(&parquet_metadata)?
- {
+ if let Some(geoparquet_metadata) =
GeoParquetMetadata::try_from_parquet_metadata(
+ &parquet_metadata,
+ self_clone.overrides.as_ref(),
+ )? {
filter_access_plan_using_geoparquet_file_metadata(
&self_clone.file_schema,
&mut access_plan,
@@ -146,6 +133,14 @@ impl FileOpener for GeoParquetFileOpener {
&parquet_metadata,
&self_clone.metrics,
)?;
+
+ filter_access_plan_using_native_geostats(
+ &self_clone.file_schema,
+ &mut access_plan,
+ &spatial_filter,
+ &parquet_metadata,
+ &self_clone.metrics,
+ )?;
}
}
@@ -210,6 +205,13 @@ fn filter_access_plan_using_geoparquet_covering(
// but we need flattened integer references to retrieve min/max statistics
for each of these.
let covering_specs = parse_column_coverings(file_schema, parquet_metadata,
metadata)?;
+ // If there are no covering specs, don't iterate through the row groups
+ // This has the side-effect of ensuring the row_groups_spatial_matched
metric is not double
+ // counted except in the rare case where we prune based on both.
+ if covering_specs.iter().all(|spec| spec.is_none()) {
+ return Ok(());
+ }
+
// Iterate through the row groups
for i in row_group_indices_to_scan {
// Generate row group statistics based on the covering statistics
@@ -232,6 +234,65 @@ fn filter_access_plan_using_geoparquet_covering(
Ok(())
}
+/// Filter an access plan using the Parquet GeoStatistics, if present
+///
+/// Iterates through an existing access plan and skips row groups based on
+/// the Parquet format GeoStatistics (i.e., Geometry/Geography Parquet types).
+fn filter_access_plan_using_native_geostats(
+ file_schema: &SchemaRef,
+ access_plan: &mut ParquetAccessPlan,
+ spatial_filter: &SpatialFilter,
+ parquet_metadata: &ParquetMetaData,
+ metrics: &GeoParquetFileOpenerMetrics,
+) -> Result<()> {
+ let row_group_indices_to_scan = access_plan.row_group_indexes();
+
+ // What we're about to do is a bit of work, so skip it if we can.
+ if row_group_indices_to_scan.is_empty() {
+ return Ok(());
+ }
+
+ // Get the indices we need to index in to the Parquet column()s.
+ // For schemas with no nested columns, this will be a sequential
+ // range of 0..n.
+ let top_level_indices = top_level_column_indices(parquet_metadata);
+
+ // If there are no native geometry or geography logical types at the
+ // top level indices, don't iterate through the row groups. This has the
side-effect
+ // of ensuring the row_groups_spatial_matched metric is not double counted
except
+ // in the rare case where we prune based on both.
+ let parquet_schema = parquet_metadata.file_metadata().schema_descr();
+ if top_level_indices.iter().all(|i| {
+ !matches!(
+ parquet_schema.column(*i).logical_type_ref(),
+ Some(LogicalType::Geometry { .. }) | Some(LogicalType::Geography {
.. })
+ )
+ }) {
+ return Ok(());
+ }
+
+ // Iterate through the row groups
+ for i in row_group_indices_to_scan {
+ // Generate row group statistics based on the covering statistics
+ let row_group_column_geo_stats =
+ row_group_native_geo_stats(parquet_metadata.row_group(i),
&top_level_indices);
+ let row_group_geo_stats =
TableGeoStatistics::try_from_stats_and_schema(
+ &row_group_column_geo_stats,
+ file_schema,
+ )?;
+
+ // Evaluate predicate!
+ if !spatial_filter.evaluate(&row_group_geo_stats)? {
+ metrics.row_groups_spatial_pruned.add(1);
+ access_plan.skip(i);
+ } else {
+ metrics.row_groups_spatial_matched.add(1);
+ }
+ }
+
+ Ok(())
+}
+
/// Calculates a Vec of [GeoStatistics] based on GeoParquet file-level metadata
///
/// Each element is either a [GeoStatistics] populated with a [BoundingBox]
@@ -371,6 +432,109 @@ fn parse_column_coverings(
.collect()
}
+/// Calculates a Vec of [GeoStatistics] based on Parquet-native GeoStatistics
+///
+/// Each element is either a [GeoStatistics] populated with a [BoundingBox]
+/// or [GeoStatistics::unspecified], which is a value that will ensure that
+/// any spatial predicate that references those statistics will evaluate to
+/// true.
+fn row_group_native_geo_stats(
+ row_group_metadata: &RowGroupMetaData,
+ column_indices: &[usize],
+) -> Vec<GeoStatistics> {
+ column_indices
+ .iter()
+ .map(|column_index| {
+ let native_geo_stats_opt =
row_group_metadata.column(*column_index).geo_statistics();
+ native_geo_stats_opt
+ .map(parquet_geo_stats_to_sedona_geo_stats)
+ .unwrap_or(GeoStatistics::unspecified())
+ })
+ .collect()
+}
+
+/// Convert Parquet [GeospatialStatistics] into Sedona [GeoStatistics]
+///
+/// This also sanity checks the Parquet statistics for non-finite or
non-sensical
+/// ranges, treating the information as unknown if it fails the sanity check.
+fn parquet_geo_stats_to_sedona_geo_stats(
+ parquet_geo_stats: &GeospatialStatistics,
+) -> GeoStatistics {
+ let mut out = GeoStatistics::unspecified();
+
+ if let Some(native_bbox) = parquet_geo_stats.bounding_box() {
+ let x_range = (native_bbox.get_xmin(), native_bbox.get_xmax());
+ let y_range = (native_bbox.get_ymin(), native_bbox.get_ymax());
+ let z_range = match (native_bbox.get_zmin(), native_bbox.get_zmax()) {
+ (Some(lo), Some(hi)) => Some(Interval::new(lo, hi)),
+ _ => None,
+ };
+ let m_range = match (native_bbox.get_mmin(), native_bbox.get_mmax()) {
+ (Some(lo), Some(hi)) => Some(Interval::new(lo, hi)),
+ _ => None,
+ };
+
+ let bbox = BoundingBox::xyzm(x_range, y_range, z_range, m_range);
+
+ // Sanity check the bbox statistics. If the sanity check fails, don't
set
+ // a bounding box for pruning. Note that the x width can be < 0
(wraparound).
+ let mut bbox_is_valid =
+ bbox.x().width().is_finite() && bbox.y().width().is_finite() &&
bbox.y().width() >= 0.0;
+ if let Some(z) = bbox.z() {
+ bbox_is_valid = bbox_is_valid && z.width().is_finite() &&
z.width() >= 0.0;
+ }
+ if let Some(m) = bbox.m() {
+ bbox_is_valid = bbox_is_valid && m.width().is_finite() &&
m.width() >= 0.0;
+ }
+
+ if bbox_is_valid {
+ out = out.with_bbox(Some(bbox));
+ }
+ }
+
+ if let Some(native_geometry_types) = parquet_geo_stats.geospatial_types() {
+ let mut geometry_types = GeometryTypeAndDimensionsSet::new();
+ let mut geometry_types_valid = true;
+ for wkb_id in native_geometry_types {
+ if *wkb_id < 0 {
+ geometry_types_valid = false;
+ break;
+ }
+
+ match GeometryTypeAndDimensions::try_from_wkb_id(*wkb_id as u32) {
+ Ok(type_and_dim) =>
geometry_types.insert_or_ignore(&type_and_dim),
+ Err(_) => {
+ geometry_types_valid = false;
+ break;
+ }
+ }
+ }
+
+ if !geometry_types.is_empty() && geometry_types_valid {
+ out = out.with_geometry_types(Some(geometry_types))
+ }
+ }
+
+ out
+}
+
+/// Calculates column indices for top-level columns of file_schema
+///
+/// We need to build a list of top-level indices, where the indices refer to
the
+/// flattened list of columns (e.g., `.column(i)` in row group metadata).
+fn top_level_column_indices(parquet_metadata: &ParquetMetaData) -> Vec<usize> {
+ let mut top_level_indices = Vec::new();
+ let schema_descr = parquet_metadata.file_metadata().schema_descr();
+ for (i, col) in schema_descr.columns().iter().enumerate() {
+ let path_vec = col.path().parts();
+ if path_vec.len() == 1 {
+ top_level_indices.push(i);
+ }
+ }
+
+ top_level_indices
+}
+
/// Returns true if there are any fields with GeoArrow metadata
///
/// This is used to defer to the parent implementation if there is no
@@ -767,6 +931,266 @@ mod test {
));
}
+ #[test]
+ fn parquet_geo_stats_empty() {
+ let parquet_stats = GeospatialStatistics::new(None, None);
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ assert_eq!(result, GeoStatistics::unspecified());
+ }
+
+ #[test]
+ fn parquet_geo_stats_valid_bbox_xy() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ -180.0, 180.0, -90.0, 90.0,
+ )),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ assert_eq!(
+ result,
+ GeoStatistics::unspecified()
+ .with_bbox(Some(BoundingBox::xy((-180.0, 180.0), (-90.0,
90.0))))
+ );
+ }
+
+ #[test]
+ fn parquet_geo_stats_valid_bbox_xyz() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_zrange(0.0, 1000.0),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ let expected_bbox = BoundingBox::xyzm(
+ (-180.0, 180.0),
+ (-90.0, 90.0),
+ Some(Interval::new(0.0, 1000.0)),
+ None,
+ );
+ assert_eq!(
+ result,
+ GeoStatistics::unspecified().with_bbox(Some(expected_bbox))
+ );
+ }
+
+ #[test]
+ fn parquet_geo_stats_valid_bbox_xyzm() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_zrange(0.0, 1000.0)
+ .with_mrange(0.0, 100.0),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ let expected_bbox = BoundingBox::xyzm(
+ (-180.0, 180.0),
+ (-90.0, 90.0),
+ Some(Interval::new(0.0, 1000.0)),
+ Some(Interval::new(0.0, 100.0)),
+ );
+ assert_eq!(
+ result,
+ GeoStatistics::unspecified().with_bbox(Some(expected_bbox))
+ );
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_infinite_x() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ f64::NEG_INFINITY,
+ f64::INFINITY,
+ -90.0,
+ 90.0,
+ )),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because x width is not finite
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_nan_x() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ f64::NAN,
+ f64::NAN,
+ -90.0,
+ 90.0,
+ )),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because x width is not finite
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_infinite_y() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ -180.0,
+ 180.0,
+ f64::NEG_INFINITY,
+ f64::INFINITY,
+ )),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because y width is not finite
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_negative_y_width() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ -180.0, 180.0, 1.0, -1.0,
+ )),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because y width is less than zero
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_infinite_z() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_zrange(f64::NEG_INFINITY, f64::INFINITY),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because z width is not finite
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_negative_z_width() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_zrange(100.0, -100.0),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because z width is less than 0
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_infinite_m() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_mrange(f64::NEG_INFINITY, f64::INFINITY),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because m width is not finite
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_negative_m_width() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(
+ parquet::geospatial::bounding_box::BoundingBox::new(-180.0,
180.0, -90.0, 90.0)
+ .with_mrange(50.0, -50.0),
+ ),
+ None,
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified because m width is less than 0
+ assert_eq!(result.bbox(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_valid_geometry_types() {
+ let parquet_stats = GeospatialStatistics::new(None, Some(vec![1, 2,
3]));
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ assert!(result.geometry_types().is_some());
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_geometry_types_negative() {
+ let parquet_stats = GeospatialStatistics::new(None, Some(vec![1, -1,
3]));
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified geometry types because of negative value
+ assert_eq!(result.geometry_types(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_geometry_types_unknown_wkb_id() {
+ let parquet_stats = GeospatialStatistics::new(None, Some(vec![1,
999999]));
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Should return unspecified geometry types because of unknown WKB id
+ assert_eq!(result.geometry_types(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_empty_geometry_types() {
+ let parquet_stats = GeospatialStatistics::new(None, Some(vec![]));
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Empty geometry types should result in None
+ assert_eq!(result.geometry_types(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_combined_valid() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ -180.0, 180.0, -90.0, 90.0,
+ )),
+ Some(vec![1, 2]), // Point, LineString
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ assert!(result.bbox().is_some());
+ assert!(result.geometry_types().is_some());
+ }
+
+ #[test]
+ fn parquet_geo_stats_valid_bbox_invalid_geometry_types() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ -180.0, 180.0, -90.0, 90.0,
+ )),
+ Some(vec![-1]), // Invalid negative
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Bbox should be valid, geometry types should be None
+ assert!(result.bbox().is_some());
+ assert_eq!(result.geometry_types(), None);
+ }
+
+ #[test]
+ fn parquet_geo_stats_invalid_bbox_valid_geometry_types() {
+ let parquet_stats = GeospatialStatistics::new(
+ Some(parquet::geospatial::bounding_box::BoundingBox::new(
+ f64::NAN,
+ 180.0,
+ -90.0,
+ 90.0,
+ )),
+ Some(vec![1, 2]),
+ );
+ let result = parquet_geo_stats_to_sedona_geo_stats(&parquet_stats);
+ // Bbox should be None, geometry types should be valid
+ assert_eq!(result.bbox(), None);
+ assert!(result.geometry_types().is_some());
+ }
+
fn file_schema_with_covering() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("not_geo", DataType::Binary, true),
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index c3dc6f6b..352d23a3 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -52,7 +52,7 @@ use sedona_common::sedona_internal_err;
use sedona_schema::extension_type::ExtensionType;
use crate::{
- file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
+ file_opener::{storage_schema_contains_geo, GeoParquetFileOpener,
GeoParquetFileOpenerMetrics},
metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata,
GeoParquetMetadata},
options::TableGeoParquetOptions,
writer::create_geoparquet_writer_physical_plan,
@@ -150,19 +150,6 @@ impl GeoParquetFormat {
}
}
-/// Merge geometry columns metadata.
-/// `overrides` columns replace any inferred metadata for the same column name.
-fn merge_geometry_columns(
- base: &mut HashMap<String, GeoParquetColumnMetadata>,
- overrides: &HashMap<String, GeoParquetColumnMetadata>,
-) -> Result<()> {
- for (column_name, override_meta) in overrides {
- base.insert(column_name.clone(), override_meta.clone());
- }
-
- Ok(())
-}
-
#[async_trait]
impl FileFormat for GeoParquetFormat {
fn as_any(&self) -> &dyn Any {
@@ -218,40 +205,38 @@ impl FileFormat for GeoParquetFormat {
.try_collect()
.await?;
- // Combine multiple partitioned geoparquet files' metadata into a
single one
- // See comments in `try_update(..)` for the specific behaviors.
+ // "Not seen" and "No geometry metadata" are identical here. When
merging new
+ // metadata we check definitions for individual fields to ensure
consistency
+ // but it is OK to have a file without geometry. Exactly how this gets
merged
+ // and adapted changed in DataFusion 52...the method here is prone to
inconsistency
+ // when some files contain geometry and some do not. Column overrides
can be used
+ // as a workaround in this situation.
let mut geoparquet_metadata: Option<GeoParquetMetadata> = None;
for metadata in &metadatas {
- if let Some(kv) = metadata.file_metadata().key_value_metadata() {
- for item in kv {
- if item.key != "geo" {
- continue;
- }
- if let Some(value) = &item.value {
- let this_geoparquet_metadata =
GeoParquetMetadata::try_new(value)?;
-
- match geoparquet_metadata.as_mut() {
- Some(existing) => {
-
existing.try_update(&this_geoparquet_metadata)?;
- }
- None => geoparquet_metadata =
Some(this_geoparquet_metadata),
- }
- }
+ let this_geoparquet_metadata =
GeoParquetMetadata::try_from_parquet_metadata(
+ metadata,
+ self.options.geometry_columns.as_ref(),
+ )?;
+
+ match (geoparquet_metadata.as_mut(), this_geoparquet_metadata) {
+ (Some(existing_metadata), Some(this_metadata)) => {
+ existing_metadata.try_update(&this_metadata)?;
}
+ (None, Some(this_metadata)) => {
+ geoparquet_metadata.replace(this_metadata);
+ }
+ (None, None) => {}
+ (Some(_), None) => {}
}
}
// Geometry columns have been inferred from metadata, next combine
column
// metadata from options with the inferred ones
- let mut inferred_geo_cols = match geoparquet_metadata {
+ let inferred_geo_cols = match geoparquet_metadata {
Some(geo_metadata) => geo_metadata.columns,
None => HashMap::new(),
};
- if let Some(geometry_columns) = &self.options.geometry_columns {
- merge_geometry_columns(&mut inferred_geo_cols, geometry_columns)?;
- }
-
if inferred_geo_cols.is_empty() {
return Ok(inner_schema_without_metadata);
}
@@ -376,6 +361,7 @@ pub struct GeoParquetFileSource {
inner: ParquetSource,
metadata_size_hint: Option<usize>,
predicate: Option<Arc<dyn PhysicalExpr>>,
+ overrides: Option<HashMap<String, GeoParquetColumnMetadata>>,
}
impl GeoParquetFileSource {
@@ -385,6 +371,7 @@ impl GeoParquetFileSource {
inner: ParquetSource::new(options.inner.clone()),
metadata_size_hint: None,
predicate: None,
+ overrides: options.geometry_columns.clone(),
}
}
@@ -432,6 +419,7 @@ impl GeoParquetFileSource {
inner: parquet_source.clone(),
metadata_size_hint,
predicate: new_predicate,
+ overrides: None,
})
} else {
sedona_internal_err!("GeoParquetFileSource constructed from
non-ParquetSource")
@@ -444,6 +432,7 @@ impl GeoParquetFileSource {
inner: self.inner.with_predicate(predicate.clone()),
metadata_size_hint: self.metadata_size_hint,
predicate: Some(predicate),
+ overrides: self.overrides.clone(),
}
}
@@ -468,6 +457,7 @@ impl GeoParquetFileSource {
inner: parquet_source,
metadata_size_hint: self.metadata_size_hint,
predicate: self.predicate.clone(),
+ overrides: self.overrides.clone(),
}
}
@@ -477,6 +467,7 @@ impl GeoParquetFileSource {
inner: self.inner.clone().with_metadata_size_hint(hint),
metadata_size_hint: Some(hint),
predicate: self.predicate.clone(),
+ overrides: self.overrides.clone(),
}
}
}
@@ -497,17 +488,18 @@ impl FileSource for GeoParquetFileSource {
return inner_opener;
}
- Arc::new(GeoParquetFileOpener::new(
- inner_opener,
+ Arc::new(GeoParquetFileOpener {
+ inner: inner_opener,
object_store,
- self.metadata_size_hint,
- self.predicate.clone().unwrap(),
- base_config.file_schema().clone(),
- self.inner.table_parquet_options().global.pruning,
+ metadata_size_hint: self.metadata_size_hint,
+ predicate: self.predicate.clone().unwrap(),
+ file_schema: base_config.file_schema().clone(),
+ enable_pruning: self.inner.table_parquet_options().global.pruning,
// HACK: Since there is no public API to set inner's metrics, so
we use
// inner's metrics as the ExecutionPlan-global metrics
- self.inner.metrics(),
- ))
+ metrics: GeoParquetFileOpenerMetrics::new(self.inner.metrics()),
+ overrides: self.overrides.clone(),
+ })
}
fn try_pushdown_filters(
@@ -518,11 +510,14 @@ impl FileSource for GeoParquetFileSource {
let inner_result = self.inner.try_pushdown_filters(filters.clone(),
config)?;
match &inner_result.updated_node {
Some(updated_node) => {
- let updated_inner = Self::try_from_file_source(
+ let mut updated_inner = Self::try_from_file_source(
updated_node.clone(),
self.metadata_size_hint,
+ // TODO should this be None?
None,
)?;
+ // TODO: part of try_from_file_source()?
+ updated_inner.overrides = self.overrides.clone();
Ok(inner_result.with_updated_node(Arc::new(updated_inner)))
}
None => Ok(inner_result),
diff --git a/rust/sedona-geoparquet/src/metadata.rs
b/rust/sedona-geoparquet/src/metadata.rs
index 39beddf0..94a4b6e6 100644
--- a/rust/sedona-geoparquet/src/metadata.rs
+++ b/rust/sedona-geoparquet/src/metadata.rs
@@ -21,12 +21,14 @@
/// to remove the dependency on GeoArrow since we mostly don't need that here
yet).
/// This should be synchronized with that crate when possible.
///
https://github.com/geoarrow/geoarrow-rs/blob/ad2d29ef90050c5cfcfa7dfc0b4a3e5d12e51bbe/rust/geoarrow-geoparquet/src/metadata.rs
-use datafusion_common::Result;
-use parquet::file::metadata::ParquetMetaData;
+use datafusion_common::{plan_err, Result};
+use parquet::basic::{EdgeInterpolationAlgorithm, LogicalType};
+use parquet::file::metadata::{KeyValue, ParquetMetaData};
use sedona_expr::statistics::GeoStatistics;
use sedona_geometry::bounding_box::BoundingBox;
use sedona_geometry::interval::{Interval, IntervalTrait};
use sedona_geometry::types::GeometryTypeAndDimensionsSet;
+use sedona_schema::schema::primary_geometry_column_from_names;
use std::collections::HashMap;
use std::fmt::Display;
use std::fmt::Write;
@@ -39,7 +41,7 @@ use serde_json::Value;
///
/// In contrast to the _user-specified API_, which is just "WKB" or "Native",
here we need to know
/// the actual written encoding type so that we can save that in the metadata.
-#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[allow(clippy::upper_case_acronyms)]
pub enum GeoParquetColumnEncoding {
/// Serialized Well-known Binary encoding
@@ -109,7 +111,7 @@ impl Display for GeoParquetColumnEncoding {
///
/// Note: This technique to use the bounding box to improve spatial queries
does not apply to
/// geometries that cross the antimeridian. Such geometries are unsupported by
this method.
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct GeoParquetBboxCovering {
/// The path in the Parquet schema of the column that contains the xmin
pub xmin: Vec<String>,
@@ -257,7 +259,7 @@ impl GeoParquetBboxCovering {
/// The covering field specifies optional simplified representations of each
geometry. The keys of
/// the "covering" object MUST be a supported encoding. Currently the only
supported encoding is
/// "bbox" which specifies the names of bounding box columns
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct GeoParquetCovering {
/// Bounding-box covering
pub bbox: GeoParquetBboxCovering,
@@ -279,7 +281,7 @@ impl GeoParquetCovering {
}
/// Top-level GeoParquet file metadata
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct GeoParquetMetadata {
/// The version identifier for the GeoParquet specification.
pub version: String,
@@ -304,7 +306,7 @@ impl Default for GeoParquetMetadata {
}
/// GeoParquet column metadata
-#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct GeoParquetColumnMetadata {
/// Name of the geometry encoding format. As of GeoParquet 1.1, `"WKB"`,
`"point"`,
/// `"linestring"`, `"polygon"`, `"multipoint"`, `"multilinestring"`, and
`"multipolygon"` are
@@ -382,20 +384,83 @@ impl GeoParquetMetadata {
}
/// Construct a [`GeoParquetMetadata`] from a [`ParquetMetaData`]
- pub fn try_from_parquet_metadata(metadata: &ParquetMetaData) ->
Result<Option<Self>> {
- if let Some(kv) = metadata.file_metadata().key_value_metadata() {
- for item in kv {
- if item.key != "geo" {
- continue;
- }
+ ///
+ /// This constructor considers (1) the GeoParquet metadata in the key/value
+ /// metadata and (2) Geometry/Geography types present in the Parquet
schema.
+ /// Specification of a column in the GeoParquet metadata takes precedence.
+ pub fn try_from_parquet_metadata(
+ metadata: &ParquetMetaData,
+ overrides: Option<&HashMap<String, GeoParquetColumnMetadata>>,
+ ) -> Result<Option<Self>> {
+ let schema = metadata.file_metadata().schema_descr().root_schema();
+ let kv_metadata = metadata.file_metadata().key_value_metadata();
+ let mut maybe_metadata = Self::try_from_parquet_metadata_impl(schema,
kv_metadata)?;
- if let Some(value) = &item.value {
- return Ok(Some(Self::try_new(value)?));
- }
+ // Apply overrides
+ match (&mut maybe_metadata, overrides) {
+ (None, None) | (Some(_), None) => {}
+ (None, Some(geometry_columns)) => {
+ let mut metadata = GeoParquetMetadata::default();
+ metadata.override_columns(geometry_columns)?;
+ maybe_metadata = Some(metadata);
+ }
+ (Some(this_metadata), Some(geometry_columns)) => {
+ this_metadata.override_columns(geometry_columns)?;
}
}
- Ok(None)
+ Ok(maybe_metadata)
+ }
+
+ /// For testing, as it is easier to simulate the schema and key/value
metadata
+ /// than the whole ParquetMetaData.
+ fn try_from_parquet_metadata_impl(
+ root_schema: &parquet::schema::types::Type,
+ kv_metadata: Option<&Vec<KeyValue>>,
+ ) -> Result<Option<Self>> {
+ let mut columns_from_schema = columns_from_parquet_schema(root_schema,
kv_metadata)?;
+
+ if let Some(value) = get_parquet_key_value("geo", kv_metadata) {
+ // Values in the GeoParquet metadata take precedence over those
from the
+ // Parquet schema
+ let mut out = Self::try_new(&value)?;
+ for (k, v) in columns_from_schema.drain() {
+ out.columns.entry(k).or_insert(v);
+ }
+
+ return Ok(Some(out));
+ }
+
+ // No geo metadata key, but we have geo columns from the schema
+ if !columns_from_schema.is_empty() {
+ // To keep metadata valid, ensure we set a primary column
deterministically
+ let mut column_names =
columns_from_schema.keys().collect::<Vec<_>>();
+ column_names.sort();
+ let primary_index =
primary_geometry_column_from_names(column_names.iter())
+ .expect("non-empty input always returns a value");
+ let primary_column = column_names[primary_index].to_string();
+
+ Ok(Some(Self {
+ version: "2.0.0".to_string(),
+ columns: columns_from_schema,
+ primary_column,
+ }))
+ } else {
+ Ok(None)
+ }
+ }
+
+ /// Replace any inferred metadata for the same column name with overrides
+ pub fn override_columns(
+ &mut self,
+ overrides: &HashMap<String, GeoParquetColumnMetadata>,
+ ) -> Result<()> {
+ for (column_name, override_meta) in overrides {
+ self.columns
+ .insert(column_name.clone(), override_meta.clone());
+ }
+
+ Ok(())
}
/// Update a GeoParquetMetadata from another file's metadata
@@ -551,9 +616,139 @@ impl GeoParquetColumnMetadata {
}
}
+/// Collect column metadata from (top-level) Parquet Geometry/Geography columns
+///
+/// Converts embedded schema information into GeoParquet column metadata.
Because
+/// GeoParquet metadata does not support nested columns, this does not
currently
+/// support them either.
+fn columns_from_parquet_schema(
+ root_schema: &parquet::schema::types::Type,
+ kv_metadata: Option<&Vec<KeyValue>>,
+) -> Result<HashMap<String, GeoParquetColumnMetadata>> {
+ let mut columns = HashMap::new();
+
+ for field in root_schema.get_fields() {
+ let column_metadata_opt =
+
column_from_logical_type(field.get_basic_info().logical_type_ref(),
kv_metadata)?;
+ if let Some(column_metadata) = column_metadata_opt {
+ let name = field.name().to_string();
+ columns.insert(name, column_metadata);
+ }
+ }
+
+ Ok(columns)
+}
+
+/// Convert a single LogicalType to GeoParquetColumnMetadata, if possible
+///
+/// Returns None for something that is not Geometry or Geography.
+fn column_from_logical_type(
+ logical_type: Option<&LogicalType>,
+ kv_metadata: Option<&Vec<KeyValue>>,
+) -> Result<Option<GeoParquetColumnMetadata>> {
+ if let Some(logical_type) = logical_type {
+ let mut column_metadata = GeoParquetColumnMetadata::default();
+
+ match logical_type {
+ LogicalType::Geometry { crs } => {
+ column_metadata.crs =
geoparquet_crs_from_logical_type(crs.as_ref(), kv_metadata);
+ Ok(Some(column_metadata))
+ }
+ LogicalType::Geography { crs, algorithm } => {
+ column_metadata.crs =
geoparquet_crs_from_logical_type(crs.as_ref(), kv_metadata);
+
+ let edges = match algorithm {
+ None | Some(EdgeInterpolationAlgorithm::SPHERICAL) =>
"spherical",
+ Some(EdgeInterpolationAlgorithm::VINCENTY) => "vincenty",
+ Some(EdgeInterpolationAlgorithm::ANDOYER) => "andoyer",
+ Some(EdgeInterpolationAlgorithm::THOMAS) => "thomas",
+ Some(EdgeInterpolationAlgorithm::KARNEY) => "karney",
+ Some(_) => {
+ return plan_err!(
+ "Unsupported edge interpolation algorithm in
Parquet schema"
+ )
+ }
+ };
+ column_metadata.edges = Some(edges.to_string());
+ Ok(Some(column_metadata))
+ }
+ _ => Ok(None),
+ }
+ } else {
+ Ok(None)
+ }
+}
+
+/// Parse a CRS from a Parquet logical type into one that will transfer to
GeoParquet
+/// and then to GeoArrow
+///
+/// This is identical to what will happen in the forthcoming Arrow release
(packaged
+/// with DataFusion 52), except this version also resolves projjson:xxx CRSes
from
+/// the key/value metadata (in Arrow this was hard because the conversion code
was
+/// not set up in a way that this was easy to do; here it is easy because we
have
+/// the whole ParquetMetadata).
+fn geoparquet_crs_from_logical_type(
+ crs: Option<&String>,
+ kv_metadata: Option<&Vec<KeyValue>>,
+) -> Option<Value> {
+ if let Some(crs_str) = crs {
+ // Treat an empty string the same as lon/lat. There is no concept of a
"missing"
+ // CRS in a Parquet LogicalType although this can be expressed with
srid:0 in a
+ // pinch.
+ if crs_str.is_empty() {
+ return None;
+ }
+
+ // Resolve projjson:some_key if possible. If this is not possible, the
value that
+ // will be passed on to the GeoParquet column metadata is the full
string
+ // "projjson:some_key".
+ if let Some(crs_kv_key) = crs_str.strip_prefix("projjson:") {
+ if let Some(crs_from_kv) = get_parquet_key_value(crs_kv_key,
kv_metadata) {
+ return Some(Value::String(crs_from_kv.to_string()));
+ }
+ }
+
+ // Resolve srid:<int value> to "<int value>", which is accepted by
SedonaDB internals
+ // and is interpreted as an EPSG code. There is no guarantee that
other implementations
+ // will do this but it probably better than erroring for an unknown
CRS.
+ if let Some(srid_string) = crs_str.strip_prefix("srid:") {
+ return Some(Value::String(srid_string.to_string()));
+ }
+
+ // Try to parse the output as JSON such that the resulting column
metadata is closer
+ // to what would have been in the GeoParquet metadata (e.g., where
PROJJSON is parsed).
+ if let Ok(value) = crs_str.parse::<Value>() {
+ Some(value)
+ } else {
+ Some(Value::String(crs_str.to_string()))
+ }
+ } else {
+ None
+ }
+}
+
+/// Helper to get a key from the key/value metadata
+fn get_parquet_key_value(key: &str, kv_metadata: Option<&Vec<KeyValue>>) ->
Option<String> {
+ if let Some(kv_metadata) = kv_metadata {
+ for kv in kv_metadata {
+ if kv.key == key {
+ if let Some(value) = &kv.value {
+ return Some(value.to_string());
+ }
+ }
+ }
+ }
+
+ None
+}
+
#[cfg(test)]
mod test {
+ use std::sync::Arc;
+
use geo_traits::Dimensions;
+ use parquet::basic::{Repetition, Type as PhysicalType};
+ use parquet::schema::types::Type;
use sedona_geometry::types::{GeometryTypeAndDimensions, GeometryTypeId};
use super::*;
@@ -574,4 +769,258 @@ mod test {
GeometryTypeAndDimensions::new(GeometryTypeId::Point,
Dimensions::Xy)
);
}
+
+ #[test]
+ fn test_from_parquet_metadata_with_parquet_types() {
+ let s = r#"{
+ "version": "2.0.0",
+ "primary_column": "geom_geoparquet",
+ "columns": {
+ "geom_geoparquet": {
+ "encoding": "WKB",
+ "crs": "geom_geoparquet_crs"
+ }
+ }
+ }"#;
+
+ let kv_metadata_with_geo_key = make_kv_metadata(&[("geo", s)]);
+
+ let schema_no_parquet_geo = make_parquet_schema(&[("geom_geoparquet",
None)]);
+ let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+ &schema_no_parquet_geo,
+ kv_metadata_with_geo_key.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.version, "2.0.0");
+ assert_eq!(metadata.primary_column, "geom_geoparquet");
+ assert_eq!(metadata.columns.len(), 1);
+ assert!(metadata.columns.contains_key("geom_geoparquet"));
+ assert_eq!(
+ metadata.columns.get("geom_geoparquet").unwrap().crs,
+ Some(Value::String("geom_geoparquet_crs".to_string()))
+ );
+
+ let schema_additional_parquet_geo = make_parquet_schema(&[
+ ("geom_geoparquet", None),
+ ("geom_parquet", Some(LogicalType::Geometry { crs: None })),
+ ]);
+ let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+ &schema_additional_parquet_geo,
+ kv_metadata_with_geo_key.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.columns.len(), 2);
+ assert!(metadata.columns.contains_key("geom_geoparquet"));
+ assert!(metadata.columns.contains_key("geom_parquet"));
+
+ let schema_overlapping_columns =
+ make_parquet_schema(&[("geom_geoparquet",
Some(LogicalType::Geometry { crs: None }))]);
+ let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+ &schema_overlapping_columns,
+ kv_metadata_with_geo_key.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.columns.len(), 1);
+ assert!(metadata.columns.contains_key("geom_geoparquet"));
+ // Ensure we use the CRS provided by the GeoParquet metadata instead
of the CRS
+ // provided by the type as a test that GeoParquet columns take
precedence if both
+ // are present.
+ assert_eq!(
+ metadata.columns.get("geom_geoparquet").unwrap().crs,
+ Some(Value::String("geom_geoparquet_crs".to_string()))
+ );
+
+ let schema_only_parquet_geo =
+ make_parquet_schema(&[("geom_parquet", Some(LogicalType::Geometry
{ crs: None }))]);
+ let metadata = GeoParquetMetadata::try_from_parquet_metadata_impl(
+ &schema_only_parquet_geo,
+ None, // No key/value metadata
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.columns.len(), 1);
+ assert!(metadata.columns.contains_key("geom_parquet"));
+ }
+
+ #[test]
+ fn test_column_from_logical_type() {
+ let kv_metadata = make_kv_metadata(&[("some_projjson_key",
"some_projjson_value")]);
+
+ // A missing logical type annotation is never Geometry or Geography
+ assert_eq!(
+ column_from_logical_type(None, kv_metadata.as_ref()).unwrap(),
+ None
+ );
+
+ // Logical type that is present but not Geometry or Geography should
return None
+ assert_eq!(
+ column_from_logical_type(Some(&LogicalType::Uuid),
kv_metadata.as_ref()).unwrap(),
+ None
+ );
+
+ // Geometry logical type
+ let metadata = column_from_logical_type(
+ Some(&LogicalType::Geometry { crs: None }),
+ kv_metadata.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata, GeoParquetColumnMetadata::default());
+
+ // Ensure CRS is translated
+ let metadata = column_from_logical_type(
+ Some(&LogicalType::Geometry {
+ crs: Some("projjson:some_projjson_key".to_string()),
+ }),
+ kv_metadata.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(
+ metadata.crs,
+ Some(Value::String("some_projjson_value".to_string()))
+ );
+
+ // Geography logical type
+ let metadata = column_from_logical_type(
+ Some(&LogicalType::Geography {
+ crs: None,
+ algorithm: None,
+ }),
+ kv_metadata.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.edges, Some("spherical".to_string()));
+
+ // Ensure CRS is translated
+ let metadata = column_from_logical_type(
+ Some(&LogicalType::Geography {
+ crs: Some("projjson:some_projjson_key".to_string()),
+ algorithm: None,
+ }),
+ kv_metadata.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(
+ metadata.crs,
+ Some(Value::String("some_projjson_value".to_string()))
+ );
+
+ // Ensure algorithm is translated
+ let metadata = column_from_logical_type(
+ Some(&LogicalType::Geography {
+ crs: None,
+ algorithm: Some(EdgeInterpolationAlgorithm::VINCENTY),
+ }),
+ kv_metadata.as_ref(),
+ )
+ .unwrap()
+ .unwrap();
+ assert_eq!(metadata.edges, Some("vincenty".to_string()));
+ }
+
+ #[test]
+ fn test_crs_from_logical_type() {
+ let kv_metadata = make_kv_metadata(&[("some_projjson_key",
"some_projjson_value")]);
+
+ // None and "" both map to a GeoParquet default CRS
+ assert_eq!(
+ geoparquet_crs_from_logical_type(None, kv_metadata.as_ref()),
+ None
+ );
+ assert_eq!(
+ geoparquet_crs_from_logical_type(Some(&"".to_string()),
kv_metadata.as_ref()),
+ None
+ );
+
+ // projjson: string should resolve from the key/value metadata or be
passed on
+ // verbatim if it can't be resolved.
+ assert_eq!(
+ geoparquet_crs_from_logical_type(None, kv_metadata.as_ref()),
+ None
+ );
+ assert_eq!(
+ geoparquet_crs_from_logical_type(
+ Some(&"projjson:some_projjson_key".to_string()),
+ kv_metadata.as_ref()
+ ),
+ Some(Value::String("some_projjson_value".to_string()))
+ );
+ assert_eq!(
+ geoparquet_crs_from_logical_type(
+ Some(&"projjson:not_in_kv_metadata".to_string()),
+ kv_metadata.as_ref()
+ ),
+ Some(Value::String("projjson:not_in_kv_metadata".to_string()))
+ );
+
+ // srid: string should have its prefix stripped
+ assert_eq!(
+ geoparquet_crs_from_logical_type(Some(&"srid:1234".to_string()),
kv_metadata.as_ref()),
+ Some(Value::String("1234".to_string()))
+ );
+
+ // strings should get passed through verbatim
+ assert_eq!(
+ geoparquet_crs_from_logical_type(Some(&"EPSG:1234".to_string()),
kv_metadata.as_ref()),
+ Some(Value::String("EPSG:1234".to_string()))
+ );
+
+ // Valid JSON should be parsed
+ assert_eq!(
+ geoparquet_crs_from_logical_type(
+ Some(&"\"EPSG:1234\"".to_string()),
+ kv_metadata.as_ref()
+ ),
+ Some(Value::String("EPSG:1234".to_string()))
+ );
+ }
+
+ #[test]
+ fn test_get_kv_metadata() {
+ let kv_metadata = make_kv_metadata(&[("key", "value")]);
+ assert_eq!(
+ get_parquet_key_value("key", kv_metadata.as_ref()),
+ Some("value".to_string())
+ );
+ }
+
+ // Helper to make a Parquet schema. None here means Binary since it's the
only primitive
+ // type we need to test
+ fn make_parquet_schema(fields: &[(&str, Option<LogicalType>)]) ->
parquet::schema::types::Type {
+ let fields = fields
+ .iter()
+ .map(|(name, logical_type)| {
+ let mut builder = Type::primitive_type_builder(name,
PhysicalType::BYTE_ARRAY)
+ .with_repetition(Repetition::OPTIONAL);
+ if let Some(lt) = logical_type {
+ builder = builder.with_logical_type(Some(lt.clone()));
+ }
+ Arc::new(builder.build().unwrap())
+ })
+ .collect::<Vec<_>>();
+
+ Type::group_type_builder("schema")
+ .with_fields(fields)
+ .build()
+ .unwrap()
+ }
+
+ // Helper to simulate key/value metadata
+ fn make_kv_metadata(pairs: &[(&str, &str)]) -> Option<Vec<KeyValue>> {
+ Some(
+ pairs
+ .iter()
+ .map(|(key, value)| KeyValue {
+ key: key.to_string(),
+ value: Some(value.to_string()),
+ })
+ .collect(),
+ )
+ }
}
diff --git a/rust/sedona-schema/src/schema.rs b/rust/sedona-schema/src/schema.rs
index 9ce1d3a4..f13dab24 100644
--- a/rust/sedona-schema/src/schema.rs
+++ b/rust/sedona-schema/src/schema.rs
@@ -77,24 +77,41 @@ impl SedonaSchema for Schema {
fn primary_geometry_column_index(&self) -> Result<Option<usize>> {
let indices = self.geometry_column_indices()?;
- if indices.is_empty() {
- return Ok(None);
+ let primary_index_opt =
+ primary_geometry_column_from_names(indices.iter().map(|i|
self.field(*i).name()));
+ if let Some(primary_index) = primary_index_opt {
+ Ok(Some(indices[primary_index]))
+ } else {
+ Ok(None)
}
+ }
+}
- let names_map = indices
- .iter()
- .rev()
- .map(|i| (self.field(*i).name().to_lowercase(), *i))
- .collect::<HashMap<_, _>>();
+/// Compute the primary geometry column given a list of geometry column names
+///
+/// This implementation powers [SedonaSchema::primary_geometry_column_index]
and is
+/// useful for applying a consistent heuristic when only a list of names are
+/// available (e.g., GeoParquet metadata).
+pub fn primary_geometry_column_from_names(
+ column_names: impl DoubleEndedIterator<Item = impl AsRef<str>>,
+) -> Option<usize> {
+ let names_map = column_names
+ .rev()
+ .enumerate()
+ .map(|(i, name)| (name.as_ref().to_lowercase(), i))
+ .collect::<HashMap<_, _>>();
+
+ if names_map.is_empty() {
+ return None;
+ }
- for special_name in ["geometry", "geography", "geom", "geog"] {
- if let Some(i) = names_map.get(special_name) {
- return Ok(Some(*i));
- }
+ for special_name in ["geometry", "geography", "geom", "geog"] {
+ if let Some(i) = names_map.get(special_name) {
+ return Some(names_map.len() - *i - 1);
}
-
- Ok(Some(indices[0]))
}
+
+ Some(0)
}
#[cfg(test)]