paleolimbot commented on code in PR #8524:
URL: https://github.com/apache/arrow-rs/pull/8524#discussion_r2399984597
##########
parquet/src/column/writer/encoder.rs:
##########
@@ -121,6 +125,8 @@ pub trait ColumnValueEncoder {
/// will *not* be tracked by the bloom filter as it is empty since. This
should be called once
/// near the end of encoding.
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
+
+ fn flush_geospatial_statistics(&mut self) ->
Option<Box<GeospatialStatistics>>;
Review Comment:
The `ColumnValueEncoder` was not my first choice for where to put this;
however, putting it at a higher level (e.g., the ColumnMetrics) is more
disruptive because then the a reference to the bounder has to be passed through
all the write/encode methods. I'm open to suggestions 🙂
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -491,6 +499,14 @@ impl ArrowWriterOptions {
..self
}
}
+
+ /// Explicitly specify the Parquet schema to be used
+ pub fn with_parquet_schema(self, schema_descr: SchemaDescriptor) -> Self {
+ Self {
+ schema_descr: Some(schema_descr),
+ ..self
+ }
+ }
Review Comment:
This is just me ensuring I can test the Arrow-specific byte array
implementation (I couldn't figure out how to get a byte array column with a
Geometry logical type otherwise)
##########
parquet/tests/geospatial.rs:
##########
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(all(feature = "arrow", feature = "geospatial"))]
+mod test {
+ use std::sync::Arc;
+
+ use arrow_array::{ArrayRef, BinaryArray, RecordBatch};
+ use arrow_schema::{DataType, Field, Schema};
+ use bytes::Bytes;
+ use parquet::{
+ arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter},
+ basic::LogicalType,
+ data_type::{ByteArray, ByteArrayType},
+ file::{
+ properties::{EnabledStatistics, WriterProperties},
+ reader::{FileReader, SerializedFileReader},
+ writer::SerializedFileWriter,
+ },
+ geospatial::{bounding_box::BoundingBox,
statistics::GeospatialStatistics},
+ schema::types::{SchemaDescriptor, Type},
+ };
+
+ fn read_geo_statistics(buf: Vec<u8>) -> Vec<Option<GeospatialStatistics>> {
+ let b = Bytes::from(buf);
+ let reader = SerializedFileReader::new(b).unwrap();
+ reader
+ .metadata()
+ .row_groups()
+ .iter()
+ .map(|row_group| row_group.column(0).geo_statistics().cloned())
+ .collect()
+ }
+
+ #[test]
+ fn test_write_statistics_arrow() {
+ let arrow_schema = Arc::new(Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+ let batch = RecordBatch::try_new(
+ arrow_schema.clone(),
+ vec![wkb_array_xy([(1.0, 2.0), (11.0, 12.0)])],
+ )
+ .unwrap();
+ let expected_geometry_types = vec![1];
+ let expected_bounding_box = BoundingBox::new(1.0, 11.0, 2.0, 12.0);
Review Comment:
The whole point of this PR (ensuring statistics are accumulated and written
by the Arrow-specific byte array encoder)
##########
parquet/tests/geospatial.rs:
##########
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#[cfg(all(feature = "arrow", feature = "geospatial"))]
+mod test {
+ use std::sync::Arc;
+
+ use arrow_array::{ArrayRef, BinaryArray, RecordBatch};
+ use arrow_schema::{DataType, Field, Schema};
+ use bytes::Bytes;
+ use parquet::{
+ arrow::{arrow_writer::ArrowWriterOptions, ArrowWriter},
+ basic::LogicalType,
+ data_type::{ByteArray, ByteArrayType},
+ file::{
+ properties::{EnabledStatistics, WriterProperties},
+ reader::{FileReader, SerializedFileReader},
+ writer::SerializedFileWriter,
+ },
+ geospatial::{bounding_box::BoundingBox,
statistics::GeospatialStatistics},
+ schema::types::{SchemaDescriptor, Type},
+ };
+
+ fn read_geo_statistics(buf: Vec<u8>) -> Vec<Option<GeospatialStatistics>> {
+ let b = Bytes::from(buf);
+ let reader = SerializedFileReader::new(b).unwrap();
+ reader
+ .metadata()
+ .row_groups()
+ .iter()
+ .map(|row_group| row_group.column(0).geo_statistics().cloned())
+ .collect()
+ }
+
+ #[test]
+ fn test_write_statistics_arrow() {
+ let arrow_schema = Arc::new(Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+ let batch = RecordBatch::try_new(
+ arrow_schema.clone(),
+ vec![wkb_array_xy([(1.0, 2.0), (11.0, 12.0)])],
+ )
+ .unwrap();
+ let expected_geometry_types = vec![1];
+ let expected_bounding_box = BoundingBox::new(1.0, 11.0, 2.0, 12.0);
+
+ let root = Type::group_type_builder("root")
+ .with_fields(vec![Type::primitive_type_builder(
+ "geo",
+ parquet::basic::Type::BYTE_ARRAY,
+ )
+ .with_logical_type(Some(LogicalType::Geometry))
+ .build()
+ .unwrap()
+ .into()])
+ .build()
+ .unwrap();
+ let schema = SchemaDescriptor::new(root.into());
+
+ let props = WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Chunk)
+ .build();
+ let options = ArrowWriterOptions::new()
+ .with_parquet_schema(schema)
+ .with_properties(props);
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut file_writer =
+ ArrowWriter::try_new_with_options(&mut buf, arrow_schema.clone(),
options).unwrap();
+ file_writer.write(&batch).unwrap();
+
+ let thrift_metadata = file_writer.finish().unwrap();
+ drop(file_writer);
+
+ // Check that statistics exist in thrift output
+ thrift_metadata.row_groups[0].columns[0]
+ .meta_data
+ .as_ref()
+ .unwrap()
+ .geospatial_statistics
+ .as_ref()
+ .expect("geospatial_statistics in thrift column metadata");
+
+ // Check statistics on file read
+ let all_geo_stats = read_geo_statistics(buf);
+ assert_eq!(all_geo_stats.len(), 1);
+ let geo_stats = all_geo_stats[0].as_ref().unwrap();
+
+ assert_eq!(
+ geo_stats.geospatial_types.as_ref().unwrap(),
+ &expected_geometry_types
+ );
+ assert_eq!(geo_stats.bbox.as_ref().unwrap(), &expected_bounding_box);
+ }
+
+ #[test]
+ fn test_write_statistics_not_arrow() {
+ let column_values = [wkb_item_xy(1.0, 2.0), wkb_item_xy(11.0,
12.0)].map(ByteArray::from);
+ let expected_geometry_types = vec![1];
+ let expected_bounding_box = BoundingBox::new(1.0, 11.0, 2.0, 12.0);
Review Comment:
...and by the generic encoder.
##########
parquet/src/geospatial/statistics.rs:
##########
@@ -45,9 +45,9 @@ use crate::geospatial::bounding_box::BoundingBox;
#[derive(Clone, Debug, PartialEq, Default)]
pub struct GeospatialStatistics {
/// Optional bounding defining the spatial extent, where None represents a
lack of information.
- bbox: Option<BoundingBox>,
+ pub bbox: Option<BoundingBox>,
/// Optional list of geometry type identifiers, where None represents lack
of information
- geospatial_types: Option<Vec<i32>>,
+ pub geospatial_types: Option<Vec<i32>>,
Review Comment:
I can remove these once the PR with the geospatial/thrift update merges
(which exposes getters to inspect these).
##########
parquet/src/geospatial/accumulator.rs:
##########
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides implementations and traits for building
[GeospatialStatistics]
+
+use crate::{geospatial::statistics::GeospatialStatistics,
schema::types::ColumnDescPtr};
+
+/// Factory for [GeospatialStatistics] accumulators
+pub trait GeoStatsAccumulatorFactory {
+ /// Create a new accumulator
+ fn new_accumulator(&self, descr: &ColumnDescPtr) -> Box<dyn
GeoStatsAccumulator>;
+}
+
+/// Dynamic geospatial accumulator
+pub trait GeoStatsAccumulator: Send {
+ /// Returns true if this accumulator has any plans to actually return
statistics
+ fn is_valid(&self) -> bool;
+
+ /// Update with a single slice of possibly wkb-encoded values
+ fn update_wkb(&mut self, wkb: &[u8]);
+
+ /// Compute the final statistics from internal state
+ fn finish(&mut self) -> Option<Box<GeospatialStatistics>>;
+}
+
+/// Default accumulator for [GeospatialStatistics] reflecting the build-time
features of this build
+#[derive(Debug, Default)]
+pub struct DefaultGeoStatsAccumulatorFactory {}
+
+impl GeoStatsAccumulatorFactory for DefaultGeoStatsAccumulatorFactory {
+ fn new_accumulator(&self, _descr: &ColumnDescPtr) -> Box<dyn
GeoStatsAccumulator> {
+ #[cfg(feature = "geospatial")]
+ if let Some(crate::basic::LogicalType::Geometry) =
_descr.logical_type() {
+ Box::new(ParquetGeoStatsAccumulator::default())
+ } else {
+ Box::new(VoidGeospatialStatisticsAccumulator::default())
+ }
+
+ #[cfg(not(feature = "geospatial"))]
+ return Box::new(VoidGeospatialStatisticsAccumulator::default());
+ }
+}
Review Comment:
This was my strategy for avoiding `#[cfg(...)]` all over the encoders/column
writer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]