paleolimbot commented on code in PR #560:
URL: https://github.com/apache/sedona-db/pull/560#discussion_r2755009906


##########
python/sedonadb/tests/test_context.py:
##########
@@ -14,10 +14,37 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import json
+from pathlib import Path
+from typing import Any, Mapping
+
 import geoarrow.pyarrow as ga  # noqa: F401
 import pyarrow as pa
+import pyarrow.parquet as pq
 import pytest
 import sedonadb
+import shapely
+
+
+def _parse_geo_metadata(geoparquet_path: Path) -> Mapping[str, Any]:
+    """Return the GeoParquet "geo" metadata map, asserting it exists."""
+    metadata = pq.read_metadata(geoparquet_path).metadata
+    assert metadata is not None
+
+    geo = metadata.get(b"geo")
+    assert geo is not None
+
+    return json.loads(geo.decode("utf-8"))

Review Comment:
   Optional nit:
   
   ```suggestion
       return json.loads(geo.decode())
   ```



##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -151,7 +198,9 @@ def read_parquet(
 
         return DataFrame(
             self._impl,
-            self._impl.read_parquet([str(path) for path in table_paths], 
options),
+            self._impl.read_parquet(
+                [str(path) for path in table_paths], options, geometry_columns
+            ),

Review Comment:
   Optional, but accepting a dictionary here would probably be an easy/nice 
quality of life improvement for the Python user.
   
   ```suggestion
               if geometry_columns is not None and not 
isinstance(geometry_columns, str):
                   geometry_columns = json.dumps(geometry_columns)
   
               self._impl.read_parquet(
                   [str(path) for path in table_paths], options, 
geometry_columns
               ),
   ```



##########
rust/sedona-geoparquet/src/provider.rs:
##########
@@ -213,23 +244,32 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
 
         let mut options = self.inner.to_listing_options(config, table_options);
         if let Some(parquet_format) = 
options.format.as_any().downcast_ref::<ParquetFormat>() {
-            let geoparquet_options = parquet_format.options().clone().into();
+            let mut geoparquet_options =
+                TableGeoParquetOptions::from(parquet_format.options().clone());
+            if let Some(geometry_columns) = &self.geometry_columns {
+                geoparquet_options.geometry_columns = 
Some(geometry_columns.clone());
+            }
             options.format = 
Arc::new(GeoParquetFormat::new(geoparquet_options));
             return options;
         }
 
         unreachable!("GeoParquetReadOptions with non-ParquetFormat 
ListingOptions");
     }
 
+    /// Infer schema from GeoParquet metadata, then apply the user option
+    /// `geometry_columns` from `read_parquet()` to override if provided. See 
the
+    /// Python DataFrame `read_parquet(..)` documentation for details.

Review Comment:
   ```suggestion
   ```
   
   I'm not sure this comment is the best place to document the rules for 
calculating a GeoParquet schema (which are about to change again with Parquet 
geometry/geography).



##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -146,6 +150,114 @@ impl GeoParquetFormat {
     }
 }
 
+/// See `merge_geometry_columns(..)` for the override rule
+fn apply_option_override<T: PartialEq + Clone>(
+    column_name: &str,
+    field: &str,
+    existing: &mut Option<T>,
+    override_value: &Option<T>,
+) -> Result<()> {
+    let Some(override_value) = override_value.as_ref() else {
+        return Ok(());
+    };
+
+    match existing.as_ref() {
+        Some(existing_value) => {
+            if existing_value != override_value {
+                return plan_err!(
+                    "Geometry column '{column_name}' override conflicts with 
existing '{field}' value"
+                );
+            }
+        }
+        None => {
+            *existing = Some(override_value.clone());
+        }
+    }
+
+    Ok(())
+}
+
+/// See `merge_geometry_columns(..)` for the override rule
+fn apply_geometry_columns_override(
+    column_name: &str,
+    existing: &mut GeoParquetColumnMetadata,
+    override_meta: &GeoParquetColumnMetadata,
+) -> Result<()> {
+    apply_option_override(
+        column_name,
+        "encoding",
+        &mut existing.encoding,
+        &override_meta.encoding,
+    )?;
+    apply_option_override(column_name, "crs", &mut existing.crs, 
&override_meta.crs)?;
+    apply_option_override(
+        column_name,
+        "edges",
+        &mut existing.edges,
+        &override_meta.edges,
+    )?;
+    apply_option_override(
+        column_name,
+        "orientation",
+        &mut existing.orientation,
+        &override_meta.orientation,
+    )?;
+    apply_option_override(column_name, "bbox", &mut existing.bbox, 
&override_meta.bbox)?;
+    apply_option_override(
+        column_name,
+        "epoch",
+        &mut existing.epoch,
+        &override_meta.epoch,
+    )?;
+    apply_option_override(
+        column_name,
+        "covering",
+        &mut existing.covering,
+        &override_meta.covering,
+    )?;
+
+    if !override_meta.geometry_types.is_empty() {
+        if existing.geometry_types.is_empty() {
+            existing.geometry_types = override_meta.geometry_types.clone();
+        } else if existing.geometry_types != override_meta.geometry_types {
+            return plan_err!(
+                "Geometry column '{column_name}' override conflicts with 
existing 'geometry_types' value"
+            );
+        }
+    }
+
+    Ok(())
+}
+
+/// Merge geometry columns metadata.
+/// `overrides` columns may only provide additional data; for example,
+/// if `crs` is None (missing) in the `base` metadata, the combined
+/// metadata uses the `crs` field from `overrides`.
+///
+/// If conflicting field data is provided, returns a plan error.
+fn merge_geometry_columns(
+    base: &mut HashMap<String, GeoParquetColumnMetadata>,
+    overrides: &HashMap<String, GeoParquetColumnMetadata>,
+) -> Result<()> {

Review Comment:
   I think we can make this quite a bit simpler by just overriding columns 
instead of attempting to merge them, which serves your specific use case and I 
think is closer to what people actually want to do (ensure that a column is 
interpreted in a particular way regardless of what is specified by the file).



##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -222,38 +339,66 @@ impl FileFormat for GeoParquetFormat {
             }
         }
 
-        if let Some(geo_metadata) = geoparquet_metadata {
-            let new_fields: Result<Vec<_>> = inner_schema_without_metadata
-                .fields()
-                .iter()
-                .map(|field| {
-                    if let Some(geo_column) = 
geo_metadata.columns.get(field.name()) {
-                        match geo_column.encoding {
-                            GeoParquetColumnEncoding::WKB => {
-                                let extension = ExtensionType::new(
-                                    "geoarrow.wkb",
-                                    field.data_type().clone(),
-                                    Some(geo_column.to_geoarrow_metadata()?),
-                                );
-                                Ok(Arc::new(
-                                    extension.to_field(field.name(), 
field.is_nullable()),
-                                ))
-                            }
-                            _ => plan_err!(
-                                "Unsupported GeoParquet encoding: {}",
-                                geo_column.encoding
-                            ),
+        // Geometry columns have been inferred from metadata, next combine 
column
+        // metadata from options with the inferred ones
+        let mut inferred_geo_cols = match geoparquet_metadata {
+            Some(geo_metadata) => geo_metadata.columns,
+            None => HashMap::new(),
+        };
+
+        if let Some(geometry_columns) = &self.options.geometry_columns {
+            merge_geometry_columns(&mut inferred_geo_cols, geometry_columns)?;
+        }
+
+        if inferred_geo_cols.is_empty() {
+            return Ok(inner_schema_without_metadata);
+        }
+
+        let mut remaining: HashSet<String> = 
inferred_geo_cols.keys().cloned().collect();
+        let new_fields: Result<Vec<_>> = inner_schema_without_metadata
+            .fields()
+            .iter()
+            .map(|field| {
+                if let Some(geo_column) = inferred_geo_cols.get(field.name()) {
+                    remaining.remove(field.name());
+                    let encoding = match geo_column.encoding {
+                        Some(encoding) => encoding,
+                        None => {
+                            return plan_err!(
+                                "GeoParquet column '{}' missing required field 
'encoding'",
+                                field.name()
+                            )
                         }
-                    } else {
-                        Ok(field.clone())
+                    };
+                    match encoding {
+                        GeoParquetColumnEncoding::WKB => {
+                            let extension = ExtensionType::new(
+                                "geoarrow.wkb",
+                                field.data_type().clone(),
+                                Some(geo_column.to_geoarrow_metadata()?),
+                            );
+                            Ok(Arc::new(
+                                extension.to_field(field.name(), 
field.is_nullable()),
+                            ))
+                        }
+                        _ => plan_err!("Unsupported GeoParquet encoding: {}", 
encoding),
                     }
-                })
-                .collect();
+                } else {
+                    Ok(field.clone())
+                }
+            })
+            .collect();
 
-            Ok(Arc::new(Schema::new(new_fields?)))
-        } else {
-            Ok(inner_schema_without_metadata)
+        if !remaining.is_empty() {
+            let mut missing: Vec<_> = remaining.into_iter().collect();
+            missing.sort();
+            return plan_err!(
+                "Geometry columns not found in schema: {}",
+                missing.join(", ")
+            );

Review Comment:
   It's great to fix other issues with the Parquet Schema/GeoParquetMetadata 
interactions like this one (but if we do it in this PR we should add a test). I 
can also do these as part of the Parquet geometry/geometry PR since that 
further complicates the interaction.



##########
rust/sedona-geoparquet/src/lib.rs:
##########
@@ -20,3 +20,5 @@ mod metadata;
 pub mod options;
 pub mod provider;
 mod writer;
+
+pub use metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata};

Review Comment:
   I am not sure we want to expose these in this way...these are more like 
internal utilities at the moment (they could be exposed later if there is a 
good reason to do so and we're confident they won't change).



##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -134,14 +135,60 @@ def read_parquet(
                 files.
             options: Optional dictionary of options to pass to the Parquet 
reader.
                 For S3 access, use {"aws.skip_signature": True, "aws.region": 
"us-west-2"} for anonymous access to public buckets.
+            geometry_columns: Optional JSON string mapping column name to
+                GeoParquet column metadata (e.g.,
+                '{"geom": {"encoding": "WKB"}}'). Use this to mark binary WKB
+                columns as geometry columns or correct metadata such as the
+                column CRS.
+
+                Supported keys (others in the spec are not implemented):
+                - encoding: "WKB" (required if the column is not already 
geometry)
+                - crs: (e.g., "EPSG:4326")
+                - edges: "planar" (default) or "spherical"
+                See spec for details: https://geoparquet.org/releases/v1.1.0/
+
+                Useful for:
+                - Legacy Parquet files with Binary columns containing WKB 
payloads.
+                - Overriding GeoParquet metadata when fields like `crs` are 
missing.
+
+                Precedence:
+                - GeoParquet metadata is used to infer geometry columns first.
+                - geometry_columns then overrides the auto-inferred schema:
+                  - If a column is not geometry in metadata but appears in
+                    geometry_columns, it is treated as a geometry column.
+                  - If a column is geometry in metadata and also appears in
+                    geometry_columns, only the provided keys override; other
+                    fields remain as inferred. If a key already exists in 
metadata
+                    and is provided again with a different value, an error is
+                    returned.
+
+                Example:
+                - For `geo.parquet(geo1: geometry, geo2: geometry, geo3: 
binary)`,
+                  `read_parquet("geo.parquet", geometry_columns='{"geo2": 
{"encoding": "WKB"}, "geo3": {"encoding": "WKB"}}')`
+                  overrides `geo2` metadata and treats `geo3` as a geometry 
column.
+                - If `geo` inferred from metadata has:
+                  - `geo: {"encoding": "wkb", "crs": None, "edges": 
"spherical"...}`
+                  and geometry_columns provides:
+                  - `geo: {"crs": 4326}`
+                  then the result is (only override provided keys):
+                  - `geo: {"encoding": "wkb", "crs": "EPSG:4326", "edges": 
"spherical"...}`
+                - If `geo` inferred from metadata has:
+                  - `geo: {"encoding": "wkb", "crs": "EPSG:4326"}`
+                  and geometry_columns provides:
+                  - `geo: {"crs": "EPSG:3857"}`
+                  an error is returned for a conflicting key. This option is 
only
+                  allowed to provide missing optional fields in geometry 
columns.

Review Comment:
   ```suggestion
   ```
   
   If constraining this option to only ever override the columns provided by 
the file we can make this interaction considerably simpler.



##########
rust/sedona-geoparquet/src/format.rs:
##########


Review Comment:
   Can we apply the column overrides here and eliminate the somewhat 
complicated logic below?
   
   Note that in https://github.com/apache/sedona-db/pull/561 this is simplified 
to just use `try_from_parquet_metadata()`.



##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -167,6 +279,9 @@ impl FileFormat for GeoParquetFormat {
         self.inner().compression_type()
     }
 
+    /// Infer schema from GeoParquet metadata, then optionally override it 
using
+    /// user-provided geometry column options (see Python DataFrame API
+    /// `read_parquet(..)` `geometry_columns` for details).

Review Comment:
   ```suggestion
   ```
   
   The rules for this are about to change with Parquet geometry/geography 
support and in that PR I will try to find a good place to document the rules 
for schema inference based on the file schema, GeoParquet metadata, and 
overrides.



##########
rust/sedona-geoparquet/src/provider.rs:
##########
@@ -185,13 +188,41 @@ impl GeoParquetReadOptions<'_> {
         Ok(GeoParquetReadOptions {
             inner: ParquetReadOptions::default(),
             table_options: Some(options),
+            geometry_columns: None,
         })
     }
 
     /// Get the table options
     pub fn table_options(&self) -> Option<&HashMap<String, String>> {
         self.table_options.as_ref()
     }
+
+    /// Add geometry column metadata (JSON string) to apply during schema 
resolution
+    /// See python `read_parquet(..)` comments for details.
+    ///
+    /// Errors if invalid json configuration string is provided.

Review Comment:
   ```suggestion
       ///
       /// Reads Parquet files as if GeoParquet metadata with the 
`"geometry_columns"`
       /// key were present. If GeoParquet metadata is already present, the 
values provided
       /// here will override any definitions provided in the original metadata.
       /// Errors if an invalid JSON configuration string is provided.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to