paleolimbot commented on code in PR #398:
URL: https://github.com/apache/sedona-db/pull/398#discussion_r2575628900
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -168,10 +178,78 @@ pub fn create_geoparquet_writer_physical_plan(
);
// Create the sink
- let sink = Arc::new(ParquetSink::new(conf, parquet_options));
+ let sink_input_schema = conf.output_schema;
+ conf.output_schema = parquet_output_schema.clone();
+ let sink = Arc::new(GeoParquetSink {
+ inner: ParquetSink::new(conf, parquet_options),
+ projection: bbox_projection,
+ sink_input_schema,
+ parquet_output_schema,
+ });
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
+/// Implementation of [DataSink] that computes GeoParquet 1.1 bbox columns
+/// if needed. This is used instead of a ProjectionExec because DataFusion's
+/// optimizer rules seem to rearrange the projection in ways that cause
+/// the plan to fail <https://github.com/apache/sedona-db/issues/379>.
+#[derive(Debug)]
+struct GeoParquetSink {
+ inner: ParquetSink,
+ projection: Option<Vec<(Arc<dyn PhysicalExpr>, String)>>,
+ sink_input_schema: SchemaRef,
+ parquet_output_schema: SchemaRef,
+}
+
+impl DisplayAs for GeoParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ self.inner.fmt_as(t, f)
+ }
+}
+
+#[async_trait]
+impl DataSink for GeoParquetSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> &SchemaRef {
+ &self.sink_input_schema
+ }
+
+ async fn write_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ if let Some(projection) = &self.projection {
+ // If we have a projection, apply it here
+ let schema = self.parquet_output_schema.clone();
+ let projection = projection.clone();
Review Comment:
I need this clone to get this to compile; however, a struct that implements
`Stream` might be able to avoid this if it matters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]