paleolimbot commented on code in PR #805:
URL: https://github.com/apache/sedona-db/pull/805#discussion_r3192449202
##########
python/sedonadb/tests/io/test_parquet.py:
##########
@@ -483,6 +483,104 @@ def test_write_geoparquet_geography(con, geoarrow_data):
assert table_roundtrip == table
+def test_write_geoparquet_2_0(con, geoarrow_data):
+ # Checks a read and write of geography (roundtrip, since nobody else can
read/write)
+ path = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries_geo.parquet"
+ )
+ skip_if_not_exists(path)
+
+ table = con.read_parquet(path).to_arrow_table()
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(table).to_parquet(tmp_parquet,
geoparquet_version="2.0")
+
+ table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
+ assert table_roundtrip == table
+
+ # Check for metadata and logical type
+ file = parquet.ParquetFile(tmp_parquet)
+ file_kv_metadata = file.metadata.metadata
+ assert b"geo" in file_kv_metadata
+ geo_metadata = json.loads(file_kv_metadata[b"geo"])
+ assert geo_metadata["version"] == "2.0.0"
+
+ file.metadata.schema.column(2).logical_type.to_json() == '{"Type":
"Geometry"}'
Review Comment:
GeoParquet 2.0!
##########
rust/sedona-geoparquet/src/statistics_accumulator.rs:
##########
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::Result;
+use parquet::{
+ basic::LogicalType,
+ geospatial::accumulator::{
+ init_geo_stats_accumulator_factory, GeoStatsAccumulator,
GeoStatsAccumulatorFactory,
+ ParquetGeoStatsAccumulator, VoidGeoStatsAccumulator,
+ },
+ schema::types::ColumnDescPtr,
+};
+
+pub struct SedonaGeoStatsAccumulatorFactory;
+
+impl SedonaGeoStatsAccumulatorFactory {
+ pub fn try_init() -> Result<()> {
+ init_geo_stats_accumulator_factory(Arc::new(Self))?;
+ Ok(())
+ }
+}
+
+impl GeoStatsAccumulatorFactory for SedonaGeoStatsAccumulatorFactory {
+ fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box<dyn
GeoStatsAccumulator> {
+ if let Some(LogicalType::Geometry { .. }) = descr.logical_type_ref() {
+ return Box::new(ParquetGeoStatsAccumulator::default());
+ }
Review Comment:
To get Geography statistics written we have to do some plumbing to get
s2geography's rectangle bounder called from deep within the depths of the
Parquet crate. This is the mechanism I added in the PR that enabled Geometry
statistics to be written...you can initialize an "accumlator factory" on
startup that dishes out dynamic stats accumulators. It's a little unfortunate
we need an s2geography dependency here but the fact that we can write stats at
all is very cool.
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -77,40 +79,67 @@ pub fn create_geoparquet_writer_physical_plan(
}
// If there is no geometry, just use the inner implementation
- let mut output_geometry_column_indices =
conf.output_schema().geometry_column_indices()?;
- if output_geometry_column_indices.is_empty() {
+ let input_geometry_column_indices =
conf.output_schema().geometry_column_indices()?;
+ if input_geometry_column_indices.is_empty() {
Review Comment:
There is a lot of churn in this function, which prepares a plan before
writing. Basically, we now *always* do a "projection" (that just modifies
metadata) because we either have to strip the geoarrow.wkb type for GeoParquet
1.0 and 1.1 (Parquet with the geospatial feature would write that as Geometry
and we want plain byte array with no logical type), canonicalize the CRS to
write PROJJSON for sure in the logical type (GeoParquet 2.0/no GeoParquet),
and/or create some invalid GeoArrow metadata to work around a parquet Rust bug
(see above). Finally, sometimes we don't write the key/value metadata (no
GeoParquet).
##########
python/sedonadb/tests/io/test_parquet.py:
##########
@@ -483,6 +483,104 @@ def test_write_geoparquet_geography(con, geoarrow_data):
assert table_roundtrip == table
+def test_write_geoparquet_2_0(con, geoarrow_data):
+ # Checks a read and write of geography (roundtrip, since nobody else can
read/write)
+ path = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries_geo.parquet"
+ )
+ skip_if_not_exists(path)
+
+ table = con.read_parquet(path).to_arrow_table()
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(table).to_parquet(tmp_parquet,
geoparquet_version="2.0")
+
+ table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
+ assert table_roundtrip == table
+
+ # Check for metadata and logical type
+ file = parquet.ParquetFile(tmp_parquet)
+ file_kv_metadata = file.metadata.metadata
+ assert b"geo" in file_kv_metadata
+ geo_metadata = json.loads(file_kv_metadata[b"geo"])
+ assert geo_metadata["version"] == "2.0.0"
+
+ file.metadata.schema.column(2).logical_type.to_json() == '{"Type":
"Geometry"}'
+
+
+def test_write_geoparquet_no_metadata(con, geoarrow_data):
+ # Checks a read and write of geography (roundtrip, since nobody else can
read/write)
+ path = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries_geo.parquet"
+ )
+ skip_if_not_exists(path)
+
+ table = con.read_parquet(path).to_arrow_table()
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(table).to_parquet(tmp_parquet,
geoparquet_version="none")
+
+ table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
+ assert table_roundtrip == table
+
+ # Check for absent metadata and but correct logical type
+ file = parquet.ParquetFile(tmp_parquet)
+ file_kv_metadata = file.metadata.metadata
+ assert file_kv_metadata is None or b"geo" not in file_kv_metadata
+
+ file.metadata.schema.column(2).logical_type.to_json() == '{"Type":
"Geometry"}'
+ geo_stats = file.metadata.row_group(0).column(2).geo_statistics
+ assert geo_stats is not None
+ assert geo_stats.geospatial_types == [3, 6]
+ assert geo_stats.xmin <= -180
+ assert geo_stats.xmax >= 180
+
+
+def test_write_geoparquet_geography_no_metadata(con, geoarrow_data):
+ # Checks a read and write of geography (roundtrip, since nobody else can
read/write)
+ path = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries-geography_geo.parquet"
+ )
+ skip_if_not_exists(path)
+
+ table = con.read_parquet(path).to_arrow_table()
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(table).to_parquet(tmp_parquet,
geoparquet_version="none")
+
+ table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
+ assert table_roundtrip == table
+
+ # Check for absent metadata and but correct logical type
+ file = parquet.ParquetFile(tmp_parquet)
+ file_kv_metadata = file.metadata.metadata
+ assert file_kv_metadata is None or b"geo" not in file_kv_metadata
+
+ file.metadata.schema.column(2).logical_type.to_json() == '{"Type":
"Geography"}'
+
+ # We should only have stats if s2geography is enabled
+ geo_stats = file.metadata.row_group(0).column(2).geo_statistics
+ if "s2geography" not in sedonadb.__features__:
+ assert geo_stats is None
+ else:
+ assert geo_stats is not None
Review Comment:
Geography statistics + Geography logical type!
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -532,8 +574,183 @@ fn append_float_bbox(
Ok(())
}
+#[derive(Debug, PartialEq)]
+struct NormalizeForGeoParquet {
+ crs_provider: CrsProviderOption,
+ version: GeoParquetVersion,
+ signature: Signature,
+}
+
+impl NormalizeForGeoParquet {
+ fn new(crs_provider: CrsProviderOption, version: GeoParquetVersion) ->
Self {
+ Self {
+ crs_provider,
+ version,
+ signature: Signature::any(1, Volatility::Stable),
+ }
+ }
+}
+
+impl std::hash::Hash for NormalizeForGeoParquet {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.version.hash(state);
+ self.signature.hash(state);
+ }
+}
+
+impl Eq for NormalizeForGeoParquet {}
+
+impl ScalarUDFImpl for NormalizeForGeoParquet {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "normalize_for_geoparquet"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ sedona_internal_err!("return_type() should not be called")
+ }
+
+ fn return_field_from_args(&self, args: datafusion_expr::ReturnFieldArgs)
-> Result<FieldRef> {
+ normalize_field_for_geoparquet(&args.arg_fields[0], self.version,
&self.crs_provider)
+ }
+
+ fn invoke_with_args(&self, args: datafusion_expr::ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ Ok(args.args[0].clone())
+ }
+}
+
+fn normalize_field_for_geoparquet(
+ field: &FieldRef,
+ version: GeoParquetVersion,
+ crs_provider: &CrsProviderOption,
+) -> Result<FieldRef> {
+ if field.metadata().is_empty() {
+ return Ok(field.clone());
+ }
+
+ let sedona_type = SedonaType::from_storage_field(field)?;
+ match sedona_type {
+ SedonaType::Arrow(DataType::Struct(children)) => {
+ let new_type = DataType::Struct(
+ children
+ .iter()
+ .map(|f| normalize_field_for_geoparquet(f, version,
crs_provider))
+ .collect::<Result<_>>()?,
+ );
+ Ok(Arc::new(field.as_ref().clone().with_data_type(new_type)))
+ }
+ SedonaType::Arrow(DataType::List(child)) => {
+ let new_type = DataType::List(normalize_field_for_geoparquet(
+ &child,
+ version,
+ crs_provider,
+ )?);
+ Ok(Arc::new(field.as_ref().clone().with_data_type(new_type)))
+ }
+ SedonaType::Arrow(_) => Ok(field.clone()),
+ SedonaType::Wkb(edges, crs) | SedonaType::WkbView(edges, crs) => match
version {
+ // For GeoParquet 1.0 and 1.1, strip the metadata (we write Binary
storage)
+ GeoParquetVersion::V1_0 | GeoParquetVersion::V1_1 => Ok(Arc::new(
+ field.as_ref().clone().with_metadata(HashMap::new()),
+ )),
+ // For GeoParquet 2.0 and None, ensure we have projjson CRS output
+ GeoParquetVersion::V2_0 | GeoParquetVersion::Omitted => {
+ let normalized_crs_value =
+ normalize_crs_for_geoparquet(field.name(), &crs,
crs_provider)?;
+ let normalized_crs =
+
deserialize_crs_from_obj(&normalized_crs_value.unwrap_or(Value::Null))?;
+ Ok(serialize_edges_and_crs_with_parquet_bug(
+ field,
+ &normalized_crs,
+ edges,
+ ))
+ }
+ },
+ _ => exec_err!("Unsupported geometry output to Parquet:
{sedona_type}"),
+ }
+}
+
+// Due to a bug in the parquet type conversion, we need to serialize invalid
metadata for gegraphy
+// fields. The conversion logic expects "algorithm" but the valid GeoArrow
metadata we serialize
+// by default is "edges".
+//
https://github.com/apache/arrow-rs/blob/f725bc9b955f23772a6a6d8a38c99a8b3f359116/parquet-geospatial/src/types.rs#L64-L66
+fn serialize_edges_and_crs_with_parquet_bug(
+ original_field: &FieldRef,
+ crs: &Crs,
+ edges: Edges,
+) -> FieldRef {
+ let crs_component = crs
+ .as_ref()
+ .map(|crs| format!(r#""crs":{}"#, crs.to_json()));
+
+ let edges_component = match edges {
+ Edges::Planar => None,
+ // This is where we apply the workaround relative to our usual
+ // serialize_edges_and_crs().
+ Edges::Spherical => Some(r#""algorithm":"spherical""#),
+ };
Review Comment:
I filed https://github.com/apache/arrow-rs/issues/9929 for this one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]