paleolimbot commented on code in PR #578:
URL: https://github.com/apache/sedona-db/pull/578#discussion_r2776196192
##########
python/sedonadb/python/sedonadb/context.py:
##########
@@ -176,9 +177,18 @@ def read_parquet(
Safety:
- - Columns specified here are not validated against the
provided options
- (e.g., WKB encoding checks); inconsistent data may cause
undefined
- behavior.
+ - Columns specified here can optionally be validated according
to the
+ `validate` option (e.g., WKB encoding checks). If validation
is not
+ enabled, inconsistent data may cause undefined behavior.
+ validate:
+ When set to `True`, geometry column contents are validated
against
+ their metadata. Metadata can come from the source Parquet file
or
+ the user-provided `geometry_columns` option.
+ Only supported properties are validated; unsupported
properties are
+ ignored. If validation fails, execution stops with an error.
+
+ Supported validation properties:
+ - WKB encoding
Review Comment:
```suggestion
Currently the only property that is validated is the WKB of
input geometry
columns.
```
##########
rust/sedona-geoparquet/src/file_opener.rs:
##########
@@ -158,12 +170,108 @@ impl FileOpener for GeoParquetFileOpener {
// We could also consider filtering using null_count here in the
future (i.e.,
// skip row groups that are all null)
let file = file.with_extensions(Arc::new(access_plan));
+ let stream = self_clone.inner.open(file)?.await?;
+
+ // Validate geometry columns when enabled from read option.
+ let validation_columns = if self_clone.options.validate {
+ maybe_geoparquet_metadata
+ .as_ref()
+ .map(|metadata|
wkb_validation_columns(&self_clone.file_schema, metadata))
+ .unwrap_or_default()
+ } else {
+ Vec::new()
+ };
- self_clone.inner.open(file)?.await
+ if !self_clone.options.validate || validation_columns.is_empty() {
+ return Ok(stream);
+ }
+
+ let validated_stream = stream.map(move |batch_result| {
+ let batch = batch_result?;
+ validate_wkb_batch(&batch, &validation_columns)?;
+ Ok(batch)
+ });
+
+ Ok(Box::pin(validated_stream))
}))
}
}
+fn wkb_validation_columns(
+ file_schema: &SchemaRef,
+ metadata: &GeoParquetMetadata,
+) -> Vec<(usize, String)> {
+ file_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .filter_map(|(column_index, field)| {
+ metadata
+ .columns
+ .get(field.name())
+ .and_then(|column_metadata| {
+ if matches!(column_metadata.encoding,
GeoParquetColumnEncoding::WKB) {
+ Some((column_index, field.name().clone()))
+ } else {
+ None
+ }
+ })
+ })
+ .collect()
+}
+
+fn validate_wkb_batch(batch: &RecordBatch, validation_columns: &[(usize,
String)]) -> Result<()> {
+ for (column_index, column_name) in validation_columns {
+ let column = batch.column(*column_index);
+ validate_wkb_array(column.as_ref(), column_name)?;
+ }
+ Ok(())
+}
+
+fn validate_wkb_array(array: &dyn Array, column_name: &str) -> Result<()> {
+ match array.data_type() {
+ DataType::Binary => {
+ let array = as_binary_array(array)?;
+ for (row_index, maybe_wkb) in array.iter().enumerate() {
+ if let Some(wkb_bytes) = maybe_wkb {
+ if let Err(e) = wkb::reader::read_wkb(wkb_bytes) {
+ return exec_err!(
+ "WKB validation failed for column '{}' at row {}:
{}",
+ column_name,
+ row_index,
+ e
+ );
+ }
+ }
+ }
+ }
Review Comment:
While we're here, it would be good to handle the `LargeBinary` case (this
can happen for reasons outside a GeoParquet producer's control due to various
auto-store/load of schemas)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]