This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 42a16a1 feat(rust/sedona-geoparquet): Add GeoParquet writer for
non-spatial output (#52)
42a16a1 is described below
commit 42a16a131d6d9fb8d59903b4fd4d616c00f25d98
Author: Dewey Dunnington <[email protected]>
AuthorDate: Sat Sep 13 16:35:45 2025 -0500
feat(rust/sedona-geoparquet): Add GeoParquet writer for non-spatial output
(#52)
---
Cargo.lock | 3 +
Cargo.toml | 1 +
python/sedonadb/Cargo.toml | 1 +
python/sedonadb/python/sedonadb/dataframe.py | 64 ++++++-
python/sedonadb/src/dataframe.rs | 41 ++++-
python/sedonadb/tests/io/test_parquet.py | 37 ++++
python/sedonadb/tests/test_dataframe.py | 51 ++++++
rust/sedona-geoparquet/Cargo.toml | 3 +-
rust/sedona-geoparquet/src/format.rs | 132 +++++++-------
rust/sedona-geoparquet/src/lib.rs | 2 +
rust/sedona-geoparquet/src/metadata.rs | 18 +-
rust/sedona-geoparquet/src/options.rs | 58 +++++++
rust/sedona-geoparquet/src/provider.rs | 3 +-
rust/sedona-geoparquet/src/writer.rs | 248 +++++++++++++++++++++++++++
rust/sedona/Cargo.toml | 1 +
rust/sedona/src/context.rs | 147 ++++++++++++++++
16 files changed, 741 insertions(+), 69 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index fa69a36..5466467 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4810,6 +4810,7 @@ dependencies = [
"sedona-tg",
"serde",
"serde_json",
+ "tempfile",
"tokio",
"url",
]
@@ -4987,6 +4988,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
+ "tempfile",
"tokio",
"url",
]
@@ -5167,6 +5169,7 @@ dependencies = [
"async-trait",
"datafusion",
"datafusion-common",
+ "datafusion-expr",
"datafusion-ffi",
"futures",
"libmimalloc-sys",
diff --git a/Cargo.toml b/Cargo.toml
index 0a8e803..a5cf286 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -110,6 +110,7 @@ rstest = "0.24.0"
serde = { version = "1" }
serde_json = { version = "1" }
serde_with = { version = "1" }
+tempfile = { version = "3"}
thiserror = { version = "2" }
tokio = { version = "1.44" }
url = "2.5.4"
diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml
index 1ddb4c0..98379bd 100644
--- a/python/sedonadb/Cargo.toml
+++ b/python/sedonadb/Cargo.toml
@@ -36,6 +36,7 @@ arrow-array = { workspace = true }
async-trait = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
+datafusion-expr = { workspace = true }
datafusion-ffi = { workspace = true }
futures = { workspace = true }
pyo3 = { version = "0.25.1" }
diff --git a/python/sedonadb/python/sedonadb/dataframe.py
b/python/sedonadb/python/sedonadb/dataframe.py
index 182a008..b390e77 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -14,7 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from typing import TYPE_CHECKING, Union, Optional, Any
+
+from pathlib import Path
+from typing import TYPE_CHECKING, Union, Optional, Any, Iterable
from sedonadb._options import global_options
@@ -263,6 +265,66 @@ class DataFrame:
else:
return table.to_pandas()
+ def to_parquet(
+ self,
+ path: Union[str, Path],
+ *,
+ partition_by: Optional[Union[str, Iterable[str]]] = None,
+ sort_by: Optional[Union[str, Iterable[str]]] = None,
+ single_file_output: Optional[bool] = None,
+ ):
+ """Write this DataFrame to one or more (Geo)Parquet files
+
+ For input that contains geometry columns, GeoParquet metadata is
written
+ such that suitable readers can recreate Geometry/Geography types when
+ reading the output.
+
+
+ Args:
+ path: A filename or directory to which parquet file(s) should be
written.
+ partition_by: A vector of column names to partition by. If
non-empty,
+ applies hive-style partitioning to the output.
+ sort_by: A vector of column names to sort by. Currently only
ascending
+ sort is supported.
+ single_file_output: Use True or False to force writing a single
Parquet
+ file vs. writing one file per partition to a directory. By
default,
+ a single file is written if `partition_by` is unspecified and
+ `path` ends with `.parquet`.
+
+ Examples:
+
+ >>> import sedonadb
+ >>> import tempfile
+ >>> con = sedonadb.connect()
+ >>> td = tempfile.TemporaryDirectory()
+ >>> url =
"https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet"
+ >>> con.read_parquet(url).to_parquet(f"{td.name}/tmp.parquet")
+
+ """
+
+ path = Path(path)
+
+ if single_file_output is None:
+ single_file_output = partition_by is None and
str(path).endswith(".parquet")
+
+ if isinstance(partition_by, str):
+ partition_by = [partition_by]
+ elif partition_by is not None:
+ partition_by = list(partition_by)
+ else:
+ partition_by = []
+
+ if isinstance(sort_by, str):
+ sort_by = [sort_by]
+ elif sort_by is not None:
+ sort_by = list(sort_by)
+ else:
+ sort_by = []
+
+ self._impl.to_parquet(
+ self._ctx, str(path), partition_by, sort_by, single_file_output
+ )
+
def show(
self,
limit: Optional[int] = 10,
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index 51f7a4b..aae953b 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -22,12 +22,16 @@ use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use arrow_schema::Schema;
use datafusion::catalog::MemTable;
+use datafusion::logical_expr::SortExpr;
use datafusion::prelude::DataFrame;
+use datafusion_common::Column;
+use datafusion_expr::Expr;
use datafusion_ffi::table_provider::FFI_TableProvider;
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
-use sedona::context::SedonaDataFrame;
+use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
use sedona::show::{DisplayMode, DisplayTableOptions};
+use sedona_geoparquet::options::TableGeoParquetOptions;
use sedona_schema::schema::SedonaSchema;
use tokio::runtime::Runtime;
@@ -119,6 +123,41 @@ impl InternalDataFrame {
))
}
+ fn to_parquet<'py>(
+ &self,
+ py: Python<'py>,
+ ctx: &InternalContext,
+ path: String,
+ partition_by: Vec<String>,
+ sort_by: Vec<String>,
+ single_file_output: bool,
+ ) -> Result<(), PySedonaError> {
+ // sort_by needs to be SortExpr. A Vec<String> can unambiguously be
interpreted as
+ // field names (ascending), but other types of expressions aren't
supported here yet.
+ let sort_by_expr = sort_by
+ .into_iter()
+ .map(|name| {
+ let column = Expr::Column(Column::new_unqualified(name));
+ SortExpr::new(column, true, false)
+ })
+ .collect::<Vec<_>>();
+
+ let options = SedonaWriteOptions::new()
+ .with_partition_by(partition_by)
+ .with_sort_by(sort_by_expr)
+ .with_single_file_output(single_file_output);
+ let writer_options = TableGeoParquetOptions::default();
+
+ wait_for_future(
+ py,
+ &self.runtime,
+ self.inner
+ .clone()
+ .write_geoparquet(&ctx.inner, &path, options,
Some(writer_options)),
+ )??;
+ Ok(())
+ }
+
fn show<'py>(
&self,
py: Python<'py>,
diff --git a/python/sedonadb/tests/io/test_parquet.py
b/python/sedonadb/tests/io/test_parquet.py
index 42fcaf5..a90e254 100644
--- a/python/sedonadb/tests/io/test_parquet.py
+++ b/python/sedonadb/tests/io/test_parquet.py
@@ -19,6 +19,7 @@ import pytest
import tempfile
import shapely
import geopandas
+import geopandas.testing
from pyarrow import parquet
from pathlib import Path
from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists
@@ -238,3 +239,39 @@ def test_read_geoparquet_prune_polygons(sedona_testing,
predicate):
"""
)
eng.assert_result(result, gdf)
+
+
[email protected]("name", ["water-junc", "water-point"])
+def test_write_geoparquet_geometry(con, geoarrow_data, name):
+ # Checks a read and write of some non-trivial files and ensures we match
GeoPandas
+ path = geoarrow_data / "ns-water" / "files" /
f"ns-water_{name}_geo.parquet"
+ skip_if_not_exists(path)
+
+ gdf =
geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(gdf).to_parquet(tmp_parquet)
+
+ gdf_roundtrip = geopandas.read_parquet(tmp_parquet)
+ geopandas.testing.assert_geodataframe_equal(gdf_roundtrip, gdf)
+
+
+def test_write_geoparquet_geography(con, geoarrow_data):
+ # Checks a read and write of geography (rounctrip, since nobody else can
read/write)
+ path = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries-geography_geo.parquet"
+ )
+ skip_if_not_exists(path)
+
+ table = con.read_parquet(path).to_arrow_table()
+
+ with tempfile.TemporaryDirectory() as td:
+ tmp_parquet = Path(td) / "tmp.parquet"
+ con.create_data_frame(table).to_parquet(tmp_parquet)
+
+ table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table()
+ assert table_roundtrip == table
diff --git a/python/sedonadb/tests/test_dataframe.py
b/python/sedonadb/tests/test_dataframe.py
index 5386d24..bd79b2a 100644
--- a/python/sedonadb/tests/test_dataframe.py
+++ b/python/sedonadb/tests/test_dataframe.py
@@ -18,9 +18,11 @@ import geoarrow.pyarrow as ga
import geoarrow.types as gat
import geopandas.testing
import pandas as pd
+from pathlib import Path
import pyarrow as pa
import pytest
import sedonadb
+import tempfile
def test_dataframe_from_dataframe(con):
@@ -281,6 +283,55 @@ def test_dataframe_to_pandas(con):
)
+def test_dataframe_to_parquet(con):
+ df = con.sql(
+ "SELECT * FROM (VALUES ('one', 1), ('two', 2), ('three', 3)) AS t(a,
b)"
+ )
+
+ with tempfile.TemporaryDirectory() as td:
+ # Defaults with a path that ends with .parquet (single file)
+ tmp_parquet_file = Path(td) / "tmp.parquet"
+ df.to_parquet(tmp_parquet_file)
+
+ assert tmp_parquet_file.exists()
+ assert tmp_parquet_file.is_file()
+ pd.testing.assert_frame_equal(
+ pd.read_parquet(tmp_parquet_file),
+ pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
+ )
+
+ # Defaults with a path that doesn't end in .parquet (directory)
+ tmp_parquet_dir = Path(td) / "tmp"
+ df.to_parquet(tmp_parquet_dir)
+
+ assert tmp_parquet_dir.exists()
+ assert tmp_parquet_dir.is_dir()
+ pd.testing.assert_frame_equal(
+ pd.read_parquet(tmp_parquet_dir),
+ pd.DataFrame({"a": ["one", "two", "three"], "b": [1, 2, 3]}),
+ )
+
+ # With partition_by
+ tmp_parquet_dir = Path(td) / "tmp_partitioned"
+ df.to_parquet(tmp_parquet_dir, partition_by=["a"])
+ assert tmp_parquet_dir.exists()
+ assert tmp_parquet_dir.is_dir()
+ pd.testing.assert_frame_equal(
+
pd.read_parquet(tmp_parquet_dir).sort_values("b").reset_index(drop=True),
+ pd.DataFrame(
+ {"b": [1, 2, 3], "a": pd.Categorical(["one", "two", "three"])}
+ ),
+ )
+
+ # With order_by
+ tmp_parquet = Path(td) / "tmp_ordered.parquet"
+ df.to_parquet(tmp_parquet, sort_by=["a"])
+ pd.testing.assert_frame_equal(
+ pd.read_parquet(tmp_parquet),
+ pd.DataFrame({"a": ["one", "three", "two"], "b": [1, 3, 2]}),
+ )
+
+
def test_show(con, capsys):
con.sql("SELECT 1 as one").show()
expected = """
diff --git a/rust/sedona-geoparquet/Cargo.toml
b/rust/sedona-geoparquet/Cargo.toml
index 6c1ffa1..d11acd0 100644
--- a/rust/sedona-geoparquet/Cargo.toml
+++ b/rust/sedona-geoparquet/Cargo.toml
@@ -34,6 +34,8 @@ default = []
sedona-testing = { path = "../sedona-testing" }
url = { workspace = true }
rstest = { workspace = true }
+tempfile = { workspace = true }
+tokio = { workspace = true }
[dependencies]
async-trait = { workspace = true }
@@ -59,4 +61,3 @@ sedona-schema = { path = "../sedona-schema" }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
-tokio = { workspace = true }
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index 02904e5..94576ab 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -14,12 +14,13 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
use std::{any::Any, collections::HashMap, sync::Arc};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::{
- config::{ConfigOptions, TableParquetOptions},
+ config::ConfigOptions,
datasource::{
file_format::{
file_compression_type::FileCompressionType,
@@ -32,7 +33,7 @@ use datafusion::{
},
};
use datafusion_catalog::{memory::DataSourceExec, Session};
-use datafusion_common::{not_impl_err, plan_err, GetExt, Result, Statistics};
+use datafusion_common::{plan_err, GetExt, Result, Statistics};
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::{
filter_pushdown::FilterPushdownPropagation,
metrics::ExecutionPlanMetricsSet, ExecutionPlan,
@@ -47,6 +48,8 @@ use sedona_schema::extension_type::ExtensionType;
use crate::{
file_opener::{storage_schema_contains_geo, GeoParquetFileOpener},
metadata::{GeoParquetColumnEncoding, GeoParquetMetadata},
+ options::{GeoParquetVersion, TableGeoParquetOptions},
+ writer::create_geoparquet_writer_physical_plan,
};
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::schema_adapter::SchemaAdapterFactory;
@@ -55,9 +58,10 @@ use
datafusion::datasource::schema_adapter::SchemaAdapterFactory;
///
/// A DataFusion FormatFactory provides a means to allow creating a table
/// or referencing one from a SQL context like COPY TO.
-#[derive(Debug)]
+#[derive(Debug, Default)]
pub struct GeoParquetFormatFactory {
inner: ParquetFormatFactory,
+ options: Option<TableGeoParquetOptions>,
}
impl GeoParquetFormatFactory {
@@ -66,13 +70,15 @@ impl GeoParquetFormatFactory {
pub fn new() -> Self {
Self {
inner: ParquetFormatFactory::new(),
+ options: None,
}
}
/// Creates an instance of [GeoParquetFormatFactory] with customized
default options
- pub fn new_with_options(options: TableParquetOptions) -> Self {
+ pub fn new_with_options(options: TableGeoParquetOptions) -> Self {
Self {
- inner: ParquetFormatFactory::new_with_options(options),
+ inner:
ParquetFormatFactory::new_with_options(options.inner.clone()),
+ options: Some(options),
}
}
}
@@ -83,9 +89,24 @@ impl FileFormatFactory for GeoParquetFormatFactory {
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>> {
- let inner_format = self.inner.create(state, format_options)?;
+ let mut options_mut = self.options.clone().unwrap_or_default();
+ let mut format_options_mut = format_options.clone();
+ options_mut.geoparquet_version =
+ if let Some(version_string) =
format_options_mut.remove("geoparquet_version") {
+ match version_string.as_str() {
+ "1.0" => GeoParquetVersion::V1_0,
+ "1.1" => GeoParquetVersion::V1_1,
+ "2.0" => GeoParquetVersion::V2_0,
+ _ => GeoParquetVersion::default(),
+ }
+ } else {
+ GeoParquetVersion::default()
+ };
+
+ let inner_format = self.inner.create(state, &format_options_mut)?;
if let Some(parquet_format) =
inner_format.as_any().downcast_ref::<ParquetFormat>() {
- Ok(Arc::new(GeoParquetFormat::new(parquet_format)))
+ options_mut.inner = parquet_format.options().clone();
+ Ok(Arc::new(GeoParquetFormat::new(options_mut)))
} else {
sedona_internal_err!(
"Unexpected format from ParquetFormatFactory: {:?}",
@@ -115,27 +136,19 @@ impl GetExt for GeoParquetFormatFactory {
/// FileFormat is to be able to be used in a ListingTable (i.e., multi file
table).
/// Here we also use it to implement a basic `TableProvider` that give us most
if
/// not all of the features of the underlying Parquet reader.
-#[derive(Debug)]
+#[derive(Debug, Default)]
pub struct GeoParquetFormat {
- inner: ParquetFormat,
+ options: TableGeoParquetOptions,
}
impl GeoParquetFormat {
/// Create a new instance of the file format
- pub fn new(inner: &ParquetFormat) -> Self {
- // For GeoParquet we currently inspect metadata at the Arrow level,
- // so we need this to be exposed by the underlying reader. Depending on
- // what exactly we're doing, we might need the underlying metadata or
might
- // need it to be omitted.
- Self {
- inner: ParquetFormat::new().with_options(inner.options().clone()),
- }
+ pub fn new(options: TableGeoParquetOptions) -> Self {
+ Self { options }
}
-}
-impl Default for GeoParquetFormat {
- fn default() -> Self {
- Self::new(&ParquetFormat::default())
+ fn inner(&self) -> ParquetFormat {
+ ParquetFormat::new().with_options(self.options.inner.clone())
}
}
@@ -146,18 +159,18 @@ impl FileFormat for GeoParquetFormat {
}
fn get_ext(&self) -> String {
- self.inner.get_ext()
+ ParquetFormatFactory::new().get_ext()
}
fn get_ext_with_compression(
&self,
file_compression_type: &FileCompressionType,
) -> Result<String> {
- self.inner.get_ext_with_compression(file_compression_type)
+ self.inner().get_ext_with_compression(file_compression_type)
}
fn compression_type(&self) -> Option<FileCompressionType> {
- self.inner.compression_type()
+ self.inner().compression_type()
}
async fn infer_schema(
@@ -169,7 +182,8 @@ impl FileFormat for GeoParquetFormat {
// First, try the underlying format without schema metadata. This
should work
// for regular Parquet reads and will at least ensure that the
underlying schemas
// are compatible.
- let inner_schema_without_metadata = self.inner.infer_schema(state,
store, objects).await?;
+ let inner_schema_without_metadata =
+ self.inner().infer_schema(state, store, objects).await?;
// Collect metadata separately. We can in theory do our own schema
// inference too to save an extra server request, but then we have to
@@ -180,7 +194,7 @@ impl FileFormat for GeoParquetFormat {
fetch_parquet_metadata(
store.as_ref(),
object,
- self.inner.metadata_size_hint(),
+ self.inner().metadata_size_hint(),
None,
)
})
@@ -252,7 +266,7 @@ impl FileFormat for GeoParquetFormat {
// We don't do anything special here to insert GeoStatistics because
pruning
// happens elsewhere. These might be useful for a future optimizer or
analyzer
// pass that can insert optimizations based on geometry type.
- self.inner
+ self.inner()
.infer_stats(state, store, table_schema, object)
.await
}
@@ -266,11 +280,11 @@ impl FileFormat for GeoParquetFormat {
// DataSourceExec is backed by a GeoParquetFileSource instead of a
ParquetFileSource
let mut metadata_size_hint = None;
- if let Some(metadata) = self.inner.metadata_size_hint() {
+ if let Some(metadata) = self.inner().metadata_size_hint() {
metadata_size_hint = Some(metadata);
}
- let mut source =
GeoParquetFileSource::new(self.inner.options().clone());
+ let mut source = GeoParquetFileSource::new(self.options.clone());
if let Some(metadata_size_hint) = metadata_size_hint {
source = source.with_metadata_size_hint(metadata_size_hint)
@@ -287,17 +301,17 @@ impl FileFormat for GeoParquetFormat {
async fn create_writer_physical_plan(
&self,
- _input: Arc<dyn ExecutionPlan>,
+ input: Arc<dyn ExecutionPlan>,
_state: &dyn Session,
- _conf: FileSinkConfig,
- _order_requirements: Option<LexRequirement>,
+ conf: FileSinkConfig,
+ order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
- not_impl_err!("GeoParquet writer not implemented")
+ create_geoparquet_writer_physical_plan(input, conf,
order_requirements, &self.options)
}
fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(
-
GeoParquetFileSource::try_from_file_source(self.inner.file_source(), None, None)
+
GeoParquetFileSource::try_from_file_source(self.inner().file_source(), None,
None)
.unwrap(),
)
}
@@ -320,9 +334,9 @@ pub struct GeoParquetFileSource {
impl GeoParquetFileSource {
/// Create a new file source based on [TableParquetOptions]
- pub fn new(options: TableParquetOptions) -> Self {
+ pub fn new(options: TableGeoParquetOptions) -> Self {
Self {
- inner: ParquetSource::new(options),
+ inner: ParquetSource::new(options.inner.clone()),
metadata_size_hint: None,
predicate: None,
}
@@ -351,7 +365,6 @@ impl GeoParquetFileSource {
) -> Result<Self> {
if let Some(parquet_source) =
inner.as_any().downcast_ref::<ParquetSource>() {
let mut parquet_source = parquet_source.clone();
-
// Extract the predicate from the existing source if it exists so
we can keep a copy of it
let new_predicate = match (parquet_source.predicate().cloned(),
predicate) {
(None, None) => None,
@@ -520,6 +533,7 @@ mod test {
use arrow_array::RecordBatch;
use arrow_schema::DataType;
+ use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::schema_adapter::{SchemaAdapter,
SchemaAdapterFactory};
use datafusion::{
@@ -534,6 +548,7 @@ mod test {
use rstest::rstest;
use sedona_schema::crs::lnglat;
use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
+ use sedona_schema::schema::SedonaSchema;
use sedona_testing::create::create_scalar;
use sedona_testing::data::{geoarrow_data_dir, test_geoparquet};
@@ -559,14 +574,11 @@ mod test {
["wkt", "geometry"]
);
- let sedona_types: Result<Vec<_>> = df
+ let sedona_types = df
.schema()
- .as_arrow()
- .fields()
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect();
- let sedona_types = sedona_types.unwrap();
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
assert_eq!(sedona_types.len(), 2);
assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
assert_eq!(
@@ -578,13 +590,11 @@ mod test {
// the correct schema
let batches = df.collect().await.unwrap();
assert_eq!(batches.len(), 1);
- let sedona_types: Result<Vec<_>> = batches[0]
+ let sedona_types = batches[0]
.schema()
- .fields()
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect();
- let sedona_types = sedona_types.unwrap();
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
assert_eq!(sedona_types.len(), 2);
assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
assert_eq!(
@@ -632,14 +642,11 @@ mod test {
.select(vec![col("wkt")])
.unwrap();
- let sedona_types: Result<Vec<_>> = df
+ let sedona_types = df
.schema()
- .as_arrow()
- .fields()
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect();
- let sedona_types = sedona_types.unwrap();
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
assert_eq!(sedona_types.len(), 1);
assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
}
@@ -653,14 +660,11 @@ mod test {
.await
.unwrap();
- let sedona_types: Result<Vec<_>> = df
+ let sedona_types = df
.schema()
- .as_arrow()
- .fields()
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect();
- let sedona_types = sedona_types.unwrap();
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
assert_eq!(sedona_types.len(), 2);
assert_eq!(sedona_types[0], SedonaType::Arrow(DataType::Utf8View));
assert_eq!(
diff --git a/rust/sedona-geoparquet/src/lib.rs
b/rust/sedona-geoparquet/src/lib.rs
index c8b16a8..732d868 100644
--- a/rust/sedona-geoparquet/src/lib.rs
+++ b/rust/sedona-geoparquet/src/lib.rs
@@ -17,4 +17,6 @@
mod file_opener;
pub mod format;
mod metadata;
+pub mod options;
pub mod provider;
+mod writer;
diff --git a/rust/sedona-geoparquet/src/metadata.rs
b/rust/sedona-geoparquet/src/metadata.rs
index b09b62b..406d195 100644
--- a/rust/sedona-geoparquet/src/metadata.rs
+++ b/rust/sedona-geoparquet/src/metadata.rs
@@ -63,6 +63,12 @@ pub enum GeoParquetColumnEncoding {
MultiPolygon,
}
+impl Default for GeoParquetColumnEncoding {
+ fn default() -> Self {
+ Self::WKB
+ }
+}
+
impl Display for GeoParquetColumnEncoding {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use GeoParquetColumnEncoding::*;
@@ -276,8 +282,18 @@ pub struct GeoParquetMetadata {
pub columns: HashMap<String, GeoParquetColumnMetadata>,
}
+impl Default for GeoParquetMetadata {
+ fn default() -> Self {
+ Self {
+ version: "1.0.0".to_string(),
+ primary_column: Default::default(),
+ columns: Default::default(),
+ }
+ }
+}
+
/// GeoParquet column metadata
-#[derive(Clone, Debug, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct GeoParquetColumnMetadata {
/// Name of the geometry encoding format. As of GeoParquet 1.1, `"WKB"`,
`"point"`,
/// `"linestring"`, `"polygon"`, `"multipoint"`, `"multilinestring"`, and
`"multipolygon"` are
diff --git a/rust/sedona-geoparquet/src/options.rs
b/rust/sedona-geoparquet/src/options.rs
new file mode 100644
index 0000000..93965b1
--- /dev/null
+++ b/rust/sedona-geoparquet/src/options.rs
@@ -0,0 +1,58 @@
+use datafusion::config::TableParquetOptions;
+
+/// [TableParquetOptions] wrapper with GeoParquet-specific options
+#[derive(Debug, Default, Clone)]
+pub struct TableGeoParquetOptions {
+ /// Inner [TableParquetOptions]
+ pub inner: TableParquetOptions,
+ /// [GeoParquetVersion] to use when writing GeoParquet files
+ pub geoparquet_version: GeoParquetVersion,
+}
+
+impl From<TableParquetOptions> for TableGeoParquetOptions {
+ fn from(value: TableParquetOptions) -> Self {
+ Self {
+ inner: value,
+ geoparquet_version: GeoParquetVersion::default(),
+ }
+ }
+}
+
+/// The GeoParquet Version to write for output with spatial columns
+#[derive(Debug, Clone, Copy)]
+pub enum GeoParquetVersion {
+ /// Write GeoParquet 1.0 metadata
+ ///
+ /// GeoParquet 1.0 has the widest support among readers and writers;
however
+ /// it does not include row-group level statistics.
+ V1_0,
+
+ /// Write GeoParquet 1.1 metadata and optional bounding box column
+ ///
+ /// A bbox column will be included for any column where the Parquet
options would
+ /// have otherwise written statistics (which it will by default).
+ /// This option may be more computationally expensive; however, will
result in
+ /// row-group level statistics that some readers (e.g., SedonaDB) can use
to prune
+ /// row groups on read.
+ V1_1,
+
+ /// Write GeoParquet 2.0
+ ///
+ /// The GeoParquet 2.0 options is identical to GeoParquet 1.0 except the
underlying storage
+ /// of spatial columns is Parquet native geometry, where the Parquet
writer will include
+ /// native statistics according to the underlying Parquet options. Some
readers
+ /// (e.g., SedonaDB) can use these statistics to prune row groups on read.
+ V2_0,
+
+ /// Do not write GeoParquet metadata
+ ///
+ /// This option suppresses GeoParquet metadata; however, spatial types
will be written as
+ /// Parquet native Geometry/Geography when this is supported by the
underlying writer.
+ Omitted,
+}
+
+impl Default for GeoParquetVersion {
+ fn default() -> Self {
+ Self::V1_0
+ }
+}
diff --git a/rust/sedona-geoparquet/src/provider.rs
b/rust/sedona-geoparquet/src/provider.rs
index 5b5e059..c76d7f5 100644
--- a/rust/sedona-geoparquet/src/provider.rs
+++ b/rust/sedona-geoparquet/src/provider.rs
@@ -173,7 +173,8 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> {
let mut options = self.inner.to_listing_options(config, table_options);
if let Some(parquet_format) =
options.format.as_any().downcast_ref::<ParquetFormat>() {
- options.format = Arc::new(GeoParquetFormat::new(parquet_format));
+ let geoparquet_options = parquet_format.options().clone().into();
+ options.format =
Arc::new(GeoParquetFormat::new(geoparquet_options));
return options;
}
diff --git a/rust/sedona-geoparquet/src/writer.rs
b/rust/sedona-geoparquet/src/writer.rs
new file mode 100644
index 0000000..eb5e140
--- /dev/null
+++ b/rust/sedona-geoparquet/src/writer.rs
@@ -0,0 +1,248 @@
+use std::sync::Arc;
+
+use datafusion::{
+ config::TableParquetOptions,
+ datasource::{
+ file_format::parquet::ParquetSink, physical_plan::FileSinkConfig,
sink::DataSinkExec,
+ },
+};
+use datafusion_common::{exec_datafusion_err, exec_err, not_impl_err, Result};
+use datafusion_expr::dml::InsertOp;
+use datafusion_physical_expr::LexRequirement;
+use datafusion_physical_plan::ExecutionPlan;
+use sedona_common::sedona_internal_err;
+use sedona_schema::{
+ crs::lnglat,
+ datatypes::{Edges, SedonaType},
+ schema::SedonaSchema,
+};
+
+use crate::{
+ metadata::{GeoParquetColumnMetadata, GeoParquetMetadata},
+ options::{GeoParquetVersion, TableGeoParquetOptions},
+};
+
+pub fn create_geoparquet_writer_physical_plan(
+ input: Arc<dyn ExecutionPlan>,
+ conf: FileSinkConfig,
+ order_requirements: Option<LexRequirement>,
+ options: &TableGeoParquetOptions,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.insert_op != InsertOp::Append {
+ return not_impl_err!("Overwrites are not implemented yet for Parquet");
+ }
+
+ // If there is no geometry, just use the inner implementation
+ let output_geometry_column_indices =
conf.output_schema().geometry_column_indices()?;
+ if output_geometry_column_indices.is_empty() {
+ return create_inner_writer(input, conf, order_requirements,
options.inner.clone());
+ }
+
+ // We have geometry and/or geography! Collect the GeoParquetMetadata we'll
need to write
+ let mut metadata = GeoParquetMetadata::default();
+
+ // Check the version
+ match options.geoparquet_version {
+ GeoParquetVersion::V1_0 => {
+ metadata.version = "1.0.0".to_string();
+ }
+ _ => {
+ return not_impl_err!(
+ "GeoParquetVersion {:?} is not yet supported",
+ options.geoparquet_version
+ );
+ }
+ };
+
+ let field_names = conf
+ .output_schema()
+ .fields()
+ .iter()
+ .map(|f| f.name())
+ .collect::<Vec<_>>();
+
+ // Apply primary column
+ if let Some(output_geometry_primary) =
conf.output_schema().primary_geometry_column_index()? {
+ metadata.primary_column = field_names[output_geometry_primary].clone();
+ }
+
+ // Apply all columns
+ for i in output_geometry_column_indices {
+ let f = conf.output_schema().field(i);
+ let sedona_type = SedonaType::from_storage_field(f)?;
+ let mut column_metadata = GeoParquetColumnMetadata::default();
+
+ let (edge_type, crs) = match sedona_type {
+ SedonaType::Wkb(edge_type, crs) | SedonaType::WkbView(edge_type,
crs) => {
+ (edge_type, crs)
+ }
+ _ => return sedona_internal_err!("Unexpected type: {sedona_type}"),
+ };
+
+ // Assign edge type if needed
+ match edge_type {
+ Edges::Planar => {}
+ Edges::Spherical => {
+ column_metadata.edges = Some("spherical".to_string());
+ }
+ }
+
+ // Assign crs
+ if crs == lnglat() {
+ // Do nothing, lnglat is the meaning of an omitted CRS
+ } else if let Some(crs) = crs {
+ column_metadata.crs = Some(crs.to_json().parse().map_err(|e| {
+ exec_datafusion_err!("Failed to parse CRS for column '{}'
{e}", f.name())
+ })?);
+ } else {
+ return exec_err!(
+ "Can't write GeoParquet from null CRS\nUse ST_SetSRID({}, ...)
to assign it one",
+ f.name()
+ );
+ }
+
+ // Add to metadata
+ metadata
+ .columns
+ .insert(f.name().to_string(), column_metadata);
+ }
+
+ // Apply to the Parquet options
+ let mut parquet_options = options.inner.clone();
+ parquet_options.key_value_metadata.insert(
+ "geo".to_string(),
+ Some(
+ serde_json::to_string(&metadata).map_err(|e| {
+ exec_datafusion_err!("Failed to serialize GeoParquet metadata:
{e}")
+ })?,
+ ),
+ );
+
+ // Create the sink
+ let sink = Arc::new(ParquetSink::new(conf, parquet_options));
+ Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
+}
+
+fn create_inner_writer(
+ input: Arc<dyn ExecutionPlan>,
+ conf: FileSinkConfig,
+ order_requirements: Option<LexRequirement>,
+ options: TableParquetOptions,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ // Create the sink
+ let sink = Arc::new(ParquetSink::new(conf, options));
+ Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
+}
+
+#[cfg(test)]
+mod test {
+ use std::iter::zip;
+
+ use datafusion::datasource::file_format::format_as_file_type;
+ use datafusion::prelude::DataFrame;
+ use datafusion::{
+ execution::SessionStateBuilder,
+ prelude::{col, SessionContext},
+ };
+ use datafusion_expr::LogicalPlanBuilder;
+ use sedona_testing::data::test_geoparquet;
+ use tempfile::tempdir;
+
+ use crate::format::GeoParquetFormatFactory;
+
+ use super::*;
+
+ fn setup_context() -> SessionContext {
+ let mut state = SessionStateBuilder::new().build();
+ state
+ .register_file_format(Arc::new(GeoParquetFormatFactory::new()),
true)
+ .unwrap();
+ SessionContext::new_with_state(state).enable_url_table()
+ }
+
+ async fn test_dataframe_roundtrip(ctx: SessionContext, df: DataFrame) {
+ // It's a bit verbose to trigger this without helpers
+ let format = GeoParquetFormatFactory::new();
+ let file_type = format_as_file_type(Arc::new(format));
+ let tmpdir = tempdir().unwrap();
+
+ let df_batches = df.clone().collect().await.unwrap();
+
+ let tmp_parquet = tmpdir.path().join("foofy_spatial.parquet");
+
+ let plan = LogicalPlanBuilder::copy_to(
+ df.into_unoptimized_plan(),
+ tmp_parquet.to_string_lossy().into(),
+ file_type,
+ Default::default(),
+ vec![],
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ DataFrame::new(ctx.state(), plan).collect().await.unwrap();
+
+ let df_parquet_batches = ctx
+ .table(tmp_parquet.to_string_lossy().to_string())
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ assert_eq!(df_parquet_batches.len(), df_batches.len());
+
+ // Check types, since the schema may not compare byte-for-byte equal
(CRSes)
+ let df_parquet_sedona_types = df_parquet_batches[0]
+ .schema()
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
+ let df_sedona_types = df_batches[0]
+ .schema()
+ .sedona_types()
+ .collect::<Result<Vec<_>>>()
+ .unwrap();
+ assert_eq!(df_parquet_sedona_types, df_sedona_types);
+
+ // Check batches without metadata
+ for (df_parquet_batch, df_batch) in zip(df_parquet_batches,
df_batches) {
+ assert_eq!(df_parquet_batch.columns(), df_batch.columns())
+ }
+ }
+
+ #[tokio::test]
+ async fn writer_without_spatial() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+
+ // Deselect all geometry columns
+ let df = ctx
+ .table(&example)
+ .await
+ .unwrap()
+ .select(vec![col("wkt")])
+ .unwrap();
+
+ test_dataframe_roundtrip(ctx, df).await;
+ }
+
+ #[tokio::test]
+ async fn writer_with_geometry() {
+ let example = test_geoparquet("example", "geometry").unwrap();
+ let ctx = setup_context();
+ let df = ctx.table(&example).await.unwrap();
+
+ test_dataframe_roundtrip(ctx, df).await;
+ }
+
+ #[tokio::test]
+ async fn writer_with_geography() {
+ let example = test_geoparquet("natural-earth",
"countries-geography").unwrap();
+ let ctx = setup_context();
+ let df = ctx.table(&example).await.unwrap();
+
+ test_dataframe_roundtrip(ctx, df).await;
+ }
+}
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 2be1be8..c7cb95a 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -40,6 +40,7 @@ spatial-join = ["dep:sedona-spatial-join"]
s2geography = ["dep:sedona-s2geography"]
[dev-dependencies]
+tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
rstest = { workspace = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 3ca8fc6..07fabf8 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -22,7 +22,10 @@ use crate::{
random_geometry_provider::RandomGeometryFunction,
show::{show_batches, DisplayTableOptions},
};
+use arrow_array::RecordBatch;
use async_trait::async_trait;
+use datafusion::dataframe::DataFrameWriteOptions;
+use datafusion::datasource::file_format::format_as_file_type;
use datafusion::{
common::{plan_datafusion_err, plan_err},
error::{DataFusionError, Result},
@@ -30,11 +33,15 @@ use datafusion::{
prelude::{DataFrame, SessionConfig, SessionContext},
sql::parser::{DFParser, Statement},
};
+use datafusion_common::not_impl_err;
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::sqlparser::dialect::{dialect_from_str, Dialect};
+use datafusion_expr::{LogicalPlanBuilder, SortExpr};
use parking_lot::Mutex;
use sedona_common::option::add_sedona_option_extension;
use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
use sedona_expr::{function_set::FunctionSet, scalar_udf::ScalarKernelRef};
+use sedona_geoparquet::options::TableGeoParquetOptions;
use sedona_geoparquet::{
format::GeoParquetFormatFactory,
provider::{geoparquet_listing_table, GeoParquetReadOptions},
@@ -269,6 +276,14 @@ pub trait SedonaDataFrame {
limit: Option<usize>,
options: DisplayTableOptions<'a>,
) -> Result<String>;
+
+ async fn write_geoparquet(
+ self,
+ ctx: &SedonaContext,
+ path: &str,
+ options: SedonaWriteOptions,
+ writer_options: Option<TableGeoParquetOptions>,
+ ) -> Result<Vec<RecordBatch>>;
}
#[async_trait]
@@ -287,6 +302,120 @@ impl SedonaDataFrame for DataFrame {
show_batches(ctx, &mut out, schema, batches, options)?;
String::from_utf8(out).map_err(|e|
DataFusionError::External(Box::new(e)))
}
+
+ async fn write_geoparquet(
+ self,
+ ctx: &SedonaContext,
+ path: &str,
+ options: SedonaWriteOptions,
+ writer_options: Option<TableGeoParquetOptions>,
+ ) -> Result<Vec<RecordBatch>, DataFusionError> {
+ if options.insert_op != InsertOp::Append {
+ return not_impl_err!(
+ "{} is not implemented for DataFrame::write_geoparquet.",
+ options.insert_op
+ );
+ }
+
+ let format = if let Some(parquet_opts) = writer_options {
+ Arc::new(GeoParquetFormatFactory::new_with_options(parquet_opts))
+ } else {
+ Arc::new(GeoParquetFormatFactory::new())
+ };
+
+ let file_type = format_as_file_type(format);
+
+ let plan = if options.sort_by.is_empty() {
+ self.into_unoptimized_plan()
+ } else {
+ LogicalPlanBuilder::from(self.into_unoptimized_plan())
+ .sort(options.sort_by)?
+ .build()?
+ };
+
+ let plan = LogicalPlanBuilder::copy_to(
+ plan,
+ path.into(),
+ file_type,
+ Default::default(),
+ options.partition_by,
+ )?
+ .build()?;
+
+ DataFrame::new(ctx.ctx.state(), plan).collect().await
+ }
+}
+
+/// A Sedona-specific copy of [DataFrameWriteOptions]
+///
+/// This is needed because [DataFrameWriteOptions] has private fields, so we
+/// can't use it in our interfaces. This object can be converted to a
+/// [DataFrameWriteOptions] using `.into()`.
+pub struct SedonaWriteOptions {
+ /// Controls how new data should be written to the table, determining
whether
+ /// to append, overwrite, or replace existing data.
+ pub insert_op: InsertOp,
+ /// Controls if all partitions should be coalesced into a single output
file
+ /// Generally will have slower performance when set to true.
+ pub single_file_output: bool,
+ /// Sets which columns should be used for hive-style partitioned writes by
name.
+ /// Can be set to empty vec![] for non-partitioned writes.
+ pub partition_by: Vec<String>,
+ /// Sets which columns should be used for sorting the output by name.
+ /// Can be set to empty vec![] for non-sorted writes.
+ pub sort_by: Vec<SortExpr>,
+}
+
+impl From<SedonaWriteOptions> for DataFrameWriteOptions {
+ fn from(value: SedonaWriteOptions) -> Self {
+ DataFrameWriteOptions::new()
+ .with_insert_operation(value.insert_op)
+ .with_single_file_output(value.single_file_output)
+ .with_partition_by(value.partition_by)
+ .with_sort_by(value.sort_by)
+ }
+}
+
+impl SedonaWriteOptions {
+ /// Create a new SedonaWriteOptions with default values
+ pub fn new() -> Self {
+ SedonaWriteOptions {
+ insert_op: InsertOp::Append,
+ single_file_output: false,
+ partition_by: vec![],
+ sort_by: vec![],
+ }
+ }
+
+ /// Set the insert operation
+ pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
+ self.insert_op = insert_op;
+ self
+ }
+
+ /// Set the single_file_output value to true or false
+ pub fn with_single_file_output(mut self, single_file_output: bool) -> Self
{
+ self.single_file_output = single_file_output;
+ self
+ }
+
+ /// Sets the partition_by columns for output partitioning
+ pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
+ self.partition_by = partition_by;
+ self
+ }
+
+ /// Sets the sort_by columns for output sorting
+ pub fn with_sort_by(mut self, sort_by: Vec<SortExpr>) -> Self {
+ self.sort_by = sort_by;
+ self
+ }
+}
+
+impl Default for SedonaWriteOptions {
+ fn default() -> Self {
+ Self::new()
+ }
}
// Because Dialect/dialect_from_str is not marked as Send, using the async
@@ -325,6 +454,7 @@ mod tests {
datatypes::{Edges, SedonaType},
};
use sedona_testing::data::test_geoparquet;
+ use tempfile::tempdir;
use super::*;
@@ -375,6 +505,23 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn write_geoparquet() {
+ let tmpdir = tempdir().unwrap();
+ let tmp_parquet = tmpdir.path().join("tmp.parquet");
+ let ctx = SedonaContext::new();
+ ctx.sql("SELECT 1 as one")
+ .await
+ .unwrap()
+ .write_parquet(
+ &tmp_parquet.to_string_lossy(),
+ DataFrameWriteOptions::default(),
+ None,
+ )
+ .await
+ .unwrap();
+ }
+
#[tokio::test]
async fn geoparquet_format() {
// Make sure that our context can be set up to identify and read