This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 1637efe4 feat(rust/sedona-pointcloud): add optional round robin
partitioning and parallel statistics extraction (#648)
1637efe4 is described below
commit 1637efe4d4c3bf296f7e88f3dfc5c2a83eb9d788
Author: Balthasar Teuscher <[email protected]>
AuthorDate: Tue Mar 3 00:52:35 2026 +0100
feat(rust/sedona-pointcloud): add optional round robin partitioning and
parallel statistics extraction (#648)
---
Cargo.lock | 2 +
rust/sedona-pointcloud/Cargo.toml | 2 +
rust/sedona-pointcloud/src/las/builder.rs | 17 ++---
rust/sedona-pointcloud/src/las/format.rs | 20 +++---
rust/sedona-pointcloud/src/las/metadata.rs | 54 +++++++-------
rust/sedona-pointcloud/src/las/opener.rs | 73 ++++++++++++++-----
rust/sedona-pointcloud/src/las/options.rs | 97 +++++++++++++++++++++----
rust/sedona-pointcloud/src/las/reader.rs | 21 +++---
rust/sedona-pointcloud/src/las/schema.rs | 2 +-
rust/sedona-pointcloud/src/las/source.rs | 66 ++++++++++++++---
rust/sedona-pointcloud/src/las/statistics.rs | 99 ++++++++++++++++++-------
rust/sedona-pointcloud/src/lib.rs | 1 -
rust/sedona-pointcloud/src/options.rs | 103 ---------------------------
rust/sedona/src/context.rs | 11 ++-
14 files changed, 331 insertions(+), 237 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 158086b4..7292f3dc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5434,6 +5434,7 @@ dependencies = [
"arrow-schema",
"async-stream",
"async-trait",
+ "byteorder",
"bytes",
"datafusion",
"datafusion-catalog",
@@ -5450,6 +5451,7 @@ dependencies = [
"las-crs",
"laz",
"object_store",
+ "rayon",
"sedona",
"sedona-expr",
"sedona-geometry",
diff --git a/rust/sedona-pointcloud/Cargo.toml
b/rust/sedona-pointcloud/Cargo.toml
index 478e9ad3..921f30c8 100644
--- a/rust/sedona-pointcloud/Cargo.toml
+++ b/rust/sedona-pointcloud/Cargo.toml
@@ -36,6 +36,7 @@ arrow-ipc = { workspace = true }
arrow-schema = { workspace = true }
async-stream = "0.3.6"
async-trait = { workspace = true }
+byteorder = { workspace = true }
bytes = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true }
@@ -51,6 +52,7 @@ las = { version = "0.9.10", features = ["laz"] }
las-crs = { version = "1.0.0" }
laz = "0.12.0"
object_store = { workspace = true }
+rayon = "1.11.0"
sedona-expr = { workspace = true }
sedona-geometry = { workspace = true }
diff --git a/rust/sedona-pointcloud/src/las/builder.rs
b/rust/sedona-pointcloud/src/las/builder.rs
index 8e2e8a85..0f396085 100644
--- a/rust/sedona-pointcloud/src/las/builder.rs
+++ b/rust/sedona-pointcloud/src/las/builder.rs
@@ -35,9 +35,10 @@ use geoarrow_array::{
use geoarrow_schema::Dimension;
use las::{Header, Point};
-use crate::{
- las::{metadata::ExtraAttribute, options::LasExtraBytes,
schema::try_schema_from_header},
- options::GeometryEncoding,
+use crate::las::{
+ metadata::ExtraAttribute,
+ options::{GeometryEncoding, LasExtraBytes},
+ schema::try_schema_from_header,
};
#[derive(Debug)]
@@ -515,9 +516,9 @@ mod tests {
use las::{point::Format, Builder, Writer};
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
- use crate::{
- las::{options::LasExtraBytes, reader::LasFileReaderFactory},
- options::PointcloudOptions,
+ use crate::las::{
+ options::{LasExtraBytes, LasOptions},
+ reader::LasFileReaderFactory,
};
#[tokio::test]
@@ -544,7 +545,7 @@ mod tests {
let file_reader = LasFileReaderFactory::new(Arc::new(store), None)
.create_reader(
PartitionedFile::new(location, object.size),
- PointcloudOptions::default(),
+ LasOptions::default(),
)
.unwrap();
let metadata = file_reader.get_metadata().await.unwrap();
@@ -578,7 +579,7 @@ mod tests {
let file_reader = LasFileReaderFactory::new(Arc::new(store), None)
.create_reader(
PartitionedFile::new(location, object.size),
-
PointcloudOptions::default().with_las_extra_bytes(LasExtraBytes::Typed),
+
LasOptions::default().with_las_extra_bytes(LasExtraBytes::Typed),
)
.unwrap();
let metadata = file_reader.get_metadata().await.unwrap();
diff --git a/rust/sedona-pointcloud/src/las/format.rs
b/rust/sedona-pointcloud/src/las/format.rs
index 48e07054..d2f68240 100644
--- a/rust/sedona-pointcloud/src/las/format.rs
+++ b/rust/sedona-pointcloud/src/las/format.rs
@@ -33,9 +33,9 @@ use datafusion_physical_plan::ExecutionPlan;
use futures::{StreamExt, TryStreamExt};
use object_store::{ObjectMeta, ObjectStore};
-use crate::{
- las::{metadata::LasMetadataReader, reader::LasFileReaderFactory,
source::LasSource},
- options::PointcloudOptions,
+use crate::las::{
+ metadata::LasMetadataReader, options::LasOptions,
reader::LasFileReaderFactory,
+ source::LasSource,
};
#[derive(Debug, Clone, Copy)]
@@ -56,7 +56,7 @@ impl Extension {
/// Factory struct used to create [LasFormat]
pub struct LasFormatFactory {
// inner options for LAS/LAZ
- pub options: Option<PointcloudOptions>,
+ pub options: Option<LasOptions>,
extension: Extension,
}
@@ -70,7 +70,7 @@ impl LasFormatFactory {
}
/// Creates an instance of [LasFormatFactory] with customized default
options
- pub fn new_with(options: PointcloudOptions, extension: Extension) -> Self {
+ pub fn new_with(options: LasOptions, extension: Extension) -> Self {
Self {
options: Some(options),
extension,
@@ -87,8 +87,8 @@ impl FileFormatFactory for LasFormatFactory {
let mut options = state
.config_options()
.extensions
- .get::<PointcloudOptions>()
- .or_else(||
state.table_options().extensions.get::<PointcloudOptions>())
+ .get::<LasOptions>()
+ .or_else(|| state.table_options().extensions.get::<LasOptions>())
.cloned()
.or(self.options.clone())
.unwrap_or_default();
@@ -129,7 +129,7 @@ impl fmt::Debug for LasFormatFactory {
/// The LAS/LAZ `FileFormat` implementation
#[derive(Debug)]
pub struct LasFormat {
- pub options: PointcloudOptions,
+ pub options: LasOptions,
extension: Extension,
}
@@ -141,7 +141,7 @@ impl LasFormat {
}
}
- pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+ pub fn with_options(mut self, options: LasOptions) -> Self {
self.options = options;
self
}
@@ -195,7 +195,7 @@ impl FileFormat for LasFormat {
Ok::<_, DataFusionError>((loc_path, schema))
})
.boxed() // Workaround
https://github.com/rust-lang/rust/issues/64552
- // fetch schemas concurrently, if requested
+ // fetch schemas concurrently, if requested (note that this is not
parallel)
.buffered(state.config_options().execution.meta_fetch_concurrency)
.try_collect()
.await?;
diff --git a/rust/sedona-pointcloud/src/las/metadata.rs
b/rust/sedona-pointcloud/src/las/metadata.rs
index ab9b40ac..5bb8052a 100644
--- a/rust/sedona-pointcloud/src/las/metadata.rs
+++ b/rust/sedona-pointcloud/src/las/metadata.rs
@@ -36,12 +36,10 @@ use las::{
use laz::laszip::ChunkTable;
use object_store::{ObjectMeta, ObjectStore};
-use crate::{
- las::{
- schema::try_schema_from_header,
- statistics::{chunk_statistics, LasStatistics},
- },
- options::PointcloudOptions,
+use crate::las::{
+ options::LasOptions,
+ schema::try_schema_from_header,
+ statistics::{chunk_statistics, LasStatistics},
};
/// LAS/LAZ chunk metadata
@@ -92,7 +90,7 @@ pub struct LasMetadataReader<'a> {
store: &'a dyn ObjectStore,
object_meta: &'a ObjectMeta,
file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
- options: PointcloudOptions,
+ options: LasOptions,
}
impl<'a> LasMetadataReader<'a> {
@@ -115,18 +113,11 @@ impl<'a> LasMetadataReader<'a> {
}
/// set table options
- pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+ pub fn with_options(mut self, options: LasOptions) -> Self {
self.options = options;
self
}
- /// Fetch header
- pub async fn fetch_header(&self) -> Result<Header, DataFusionError> {
- fetch_header(self.store, self.object_meta)
- .await
- .map_err(DataFusionError::External)
- }
-
/// Fetch LAS/LAZ metadata from the remote object store
pub async fn fetch_metadata(&self) -> Result<Arc<LasMetadata>,
DataFusionError> {
let Self {
@@ -149,13 +140,9 @@ impl<'a> LasMetadataReader<'a> {
return Ok(las_file_metadata);
}
- let header = self.fetch_header().await?;
+ let header = fetch_header(*store, object_meta).await?;
let extra_attributes = extra_bytes_attributes(&header)?;
- let chunk_table = if header.laz_vlr().is_ok() {
- laz_chunk_table(*store, object_meta, &header).await?
- } else {
- las_chunk_table(&header).await?
- };
+ let chunk_table = fetch_chunk_table(*store, object_meta,
&header).await?;
let statistics = if options.collect_statistics {
Some(
chunk_statistics(
@@ -164,6 +151,7 @@ impl<'a> LasMetadataReader<'a> {
&chunk_table,
&header,
options.persist_statistics,
+ options.parallel_statistics_extraction,
)
.await?,
)
@@ -192,7 +180,7 @@ impl<'a> LasMetadataReader<'a> {
let schema = try_schema_from_header(
&metadata.header,
self.options.geometry_encoding,
- self.options.las.extra_bytes,
+ self.options.extra_bytes,
)?;
Ok(schema)
@@ -241,7 +229,8 @@ impl<'a> LasMetadataReader<'a> {
}
}
-async fn fetch_header(
+/// Fetch the [Header] of a LAS/LAZ file
+pub async fn fetch_header(
store: &(impl ObjectStore + ?Sized),
object_meta: &ObjectMeta,
) -> Result<Header, Box<dyn Error + Send + Sync>> {
@@ -296,6 +285,7 @@ async fn fetch_header(
Ok(builder.into_header()?)
}
+/// Extra attribute information (custom attributes in LAS/LAZ files)
#[derive(Debug, Clone, PartialEq)]
pub struct ExtraAttribute {
pub data_type: DataType,
@@ -368,6 +358,19 @@ fn extra_bytes_attributes(
Ok(attributes)
}
+/// Fetch or generate chunk table metadata.
+pub async fn fetch_chunk_table(
+ store: &(impl ObjectStore + ?Sized),
+ object_meta: &ObjectMeta,
+ header: &Header,
+) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> {
+ if header.laz_vlr().is_ok() {
+ laz_chunk_table(store, object_meta, header).await
+ } else {
+ las_chunk_table(header).await
+ }
+}
+
async fn laz_chunk_table(
store: &(impl ObjectStore + ?Sized),
object_meta: &ObjectMeta,
@@ -482,7 +485,7 @@ mod tests {
use las::{point::Format, Builder, Reader, Writer};
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
- use crate::las::metadata::LasMetadataReader;
+ use crate::las::metadata::fetch_header;
#[tokio::test]
async fn header_basic_e2e() {
@@ -503,14 +506,13 @@ mod tests {
let store = LocalFileSystem::new();
let location = Path::from_filesystem_path(&tmp_path).unwrap();
let object_meta = store.head(&location).await.unwrap();
- let metadata_reader = LasMetadataReader::new(&store, &object_meta);
// read with las `Reader`
let reader = Reader::from_path(&tmp_path).unwrap();
assert_eq!(
reader.header(),
- &metadata_reader.fetch_header().await.unwrap()
+ &fetch_header(&store, &object_meta).await.unwrap()
);
}
}
diff --git a/rust/sedona-pointcloud/src/las/opener.rs
b/rust/sedona-pointcloud/src/las/opener.rs
index 249e5392..941788ef 100644
--- a/rust/sedona-pointcloud/src/las/opener.rs
+++ b/rust/sedona-pointcloud/src/las/opener.rs
@@ -29,12 +29,10 @@ use futures::StreamExt;
use sedona_expr::spatial_filter::SpatialFilter;
use sedona_geometry::bounding_box::BoundingBox;
-use crate::{
- las::{
- reader::{LasFileReader, LasFileReaderFactory},
- schema::try_schema_from_header,
- },
- options::PointcloudOptions,
+use crate::las::{
+ options::LasOptions,
+ reader::{LasFileReader, LasFileReaderFactory},
+ schema::try_schema_from_header,
};
pub struct LasOpener {
@@ -42,13 +40,18 @@ pub struct LasOpener {
pub projection: Arc<[usize]>,
/// Optional limit on the number of rows to read
pub limit: Option<usize>,
+ /// Filter predicate for pruning
pub predicate: Option<Arc<dyn PhysicalExpr>>,
/// Factory for instantiating LAS/LAZ reader
pub file_reader_factory: Arc<LasFileReaderFactory>,
/// Table options
- pub options: PointcloudOptions,
+ pub options: LasOptions,
/// Target batch size
- pub(crate) batch_size: usize,
+ pub batch_size: usize,
+ /// Target partition count
+ pub partition_count: usize,
+ /// Partition to read
+ pub partition: usize,
}
impl FileOpener for LasOpener {
@@ -56,6 +59,9 @@ impl FileOpener for LasOpener {
let projection = self.projection.clone();
let limit = self.limit;
let batch_size = self.batch_size;
+ let round_robin = self.options.round_robin_partitioning;
+ let partition_count = self.partition_count;
+ let partition = self.partition;
let predicate = self.predicate.clone();
@@ -68,7 +74,7 @@ impl FileOpener for LasOpener {
let schema = Arc::new(try_schema_from_header(
&metadata.header,
file_reader.options.geometry_encoding,
- file_reader.options.las.extra_bytes,
+ file_reader.options.extra_bytes,
)?);
let pruning_predicate = predicate.and_then(|physical_expr| {
@@ -117,6 +123,11 @@ impl FileOpener for LasOpener {
let stream = async_stream::try_stream! {
for (i, chunk_meta) in metadata.chunk_table.iter().enumerate()
{
+ // round robin
+ if round_robin && i % partition_count != partition {
+ continue;
+ }
+
// limit
if let Some(limit) = limit {
if row_count >= limit {
@@ -187,10 +198,10 @@ mod tests {
let ctx = SedonaContext::new_local_interactive().await.unwrap();
// ensure no faulty chunk pruning
- ctx.sql("SET pointcloud.geometry_encoding = 'plain'")
+ ctx.sql("SET las.geometry_encoding = 'plain'")
.await
.unwrap();
- ctx.sql("SET pointcloud.collect_statistics = 'true'")
+ ctx.sql("SET las.collect_statistics = 'true'")
.await
.unwrap();
@@ -212,9 +223,7 @@ mod tests {
.unwrap();
assert_eq!(count, 50000);
- ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
- .await
- .unwrap();
+ ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
let count = ctx
.sql(&format!("SELECT * FROM \"{path}\" WHERE
ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7,
0 0))'))"))
.await
@@ -233,10 +242,10 @@ mod tests {
let ctx = SedonaContext::new_local_interactive().await.unwrap();
// ensure no faulty chunk pruning
- ctx.sql("SET pointcloud.geometry_encoding = 'plain'")
+ ctx.sql("SET las.geometry_encoding = 'plain'")
.await
.unwrap();
- ctx.sql("SET pointcloud.collect_statistics = 'true'")
+ ctx.sql("SET las.collect_statistics = 'true'")
.await
.unwrap();
@@ -258,9 +267,7 @@ mod tests {
.unwrap();
assert_eq!(count, 50000);
- ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
- .await
- .unwrap();
+ ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
let count = ctx
.sql(&format!("SELECT * FROM \"{path}\" WHERE
ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7,
0 0))'))"))
.await
@@ -270,4 +277,32 @@ mod tests {
.unwrap();
assert_eq!(count, 50000);
}
+
+ #[tokio::test]
+ async fn round_robin_partitioning() {
+ // file with two clusters, one at 0.5 one at 1.0
+ let path = "tests/data/large.laz";
+
+ let ctx = SedonaContext::new_local_interactive().await.unwrap();
+
+ let result1 = ctx
+ .sql(&format!("SELECT * FROM \"{path}\""))
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ ctx.sql("SET las.round_robin_partitioning = 'true'")
+ .await
+ .unwrap();
+ let result2 = ctx
+ .sql(&format!("SELECT * FROM \"{path}\""))
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+ assert_eq!(result1, result2);
+ }
}
diff --git a/rust/sedona-pointcloud/src/las/options.rs
b/rust/sedona-pointcloud/src/las/options.rs
index de02628f..a7c656e2 100644
--- a/rust/sedona-pointcloud/src/las/options.rs
+++ b/rust/sedona-pointcloud/src/las/options.rs
@@ -18,11 +18,61 @@
use std::{fmt::Display, str::FromStr};
use datafusion_common::{
- config::{ConfigField, Visit},
- config_namespace,
+ config::{ConfigExtension, ConfigField, Visit},
error::DataFusionError,
+ extensions_options,
};
+/// Geometry representation
+#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
+pub enum GeometryEncoding {
+ /// Use plain coordinates as three fields `x`, `y`, `z` with datatype
Float64 encoding.
+ #[default]
+ Plain,
+ /// Resolves the coordinates to a fields `geometry` with WKB encoding.
+ Wkb,
+ /// Resolves the coordinates to a fields `geometry` with separated
GeoArrow encoding.
+ Native,
+}
+
+impl Display for GeometryEncoding {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ GeometryEncoding::Plain => f.write_str("plain"),
+ GeometryEncoding::Wkb => f.write_str("wkb"),
+ GeometryEncoding::Native => f.write_str("native"),
+ }
+ }
+}
+
+impl FromStr for GeometryEncoding {
+ type Err = String;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "plain" => Ok(Self::Plain),
+ "wkb" => Ok(Self::Wkb),
+ "native" => Ok(Self::Native),
+ s => Err(format!("Unable to parse from `{s}`")),
+ }
+ }
+}
+
+impl ConfigField for GeometryEncoding {
+ fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static
str) {
+ v.some(
+ &format!("{key}.geometry_encoding"),
+ self,
+ "Specify point geometry encoding",
+ );
+ }
+
+ fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
+ *self = value.parse().map_err(DataFusionError::Configuration)?;
+ Ok(())
+ }
+}
+
/// LAS extra bytes handling
#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
pub enum LasExtraBytes {
@@ -73,16 +123,37 @@ impl ConfigField for LasExtraBytes {
}
}
-config_namespace! {
- /// The LAS config options
+extensions_options! {
+ /// LAS/LAZ configuration options
+ ///
+ /// * `geometry encoding`: plain (x, y, z), wkb or native (geoarrow)
+ /// * `collect statistics`: extract las/laz chunk statistics (requires a
full scan on registration)
+ /// * `parallel statistics extraction`: extract statistics in parallel
+ /// * `persist statistics`: store statistics in a sidecar file for future
reuse (requires write access)
+ /// * `round robin partitioning`: read chunks in parallel with round robin
instead of byte range (default)
+ /// * `extra bytes`: las extra byte attributes handling, ignore, keep as
binary blob, or typed
pub struct LasOptions {
+ pub geometry_encoding: GeometryEncoding, default =
GeometryEncoding::default()
pub extra_bytes: LasExtraBytes, default = LasExtraBytes::default()
+ pub collect_statistics: bool, default = false
+ pub parallel_statistics_extraction: bool, default = false
+ pub persist_statistics: bool, default = false
+ pub round_robin_partitioning: bool, default = false
}
}
+impl ConfigExtension for LasOptions {
+ const PREFIX: &'static str = "las";
+}
+
impl LasOptions {
- pub fn with_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
+ pub fn with_geometry_encoding(mut self, geometry_encoding:
GeometryEncoding) -> Self {
+ self.geometry_encoding = geometry_encoding;
+ self
+ }
+
+ pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
self.extra_bytes = extra_bytes;
self
}
@@ -97,13 +168,13 @@ mod test {
prelude::{SessionConfig, SessionContext},
};
- use crate::{
- las::format::{Extension, LasFormatFactory},
- options::PointcloudOptions,
+ use crate::las::{
+ format::{Extension, LasFormatFactory},
+ options::LasOptions,
};
fn setup_context() -> SessionContext {
- let config =
SessionConfig::new().with_option_extension(PointcloudOptions::default());
+ let config =
SessionConfig::new().with_option_extension(LasOptions::default());
let mut state = SessionStateBuilder::new().with_config(config).build();
let file_format = Arc::new(LasFormatFactory::new(Extension::Las));
@@ -135,12 +206,8 @@ mod test {
assert_eq!(df.schema().fields().len(), 3);
// overwrite options
- ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
- .await
- .unwrap();
- ctx.sql("SET pointcloud.las.extra_bytes = 'blob'")
- .await
- .unwrap();
+ ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
+ ctx.sql("SET las.extra_bytes = 'blob'").await.unwrap();
let df = ctx
.sql("SELECT geometry, extra_bytes FROM 'tests/data/extra.las'")
diff --git a/rust/sedona-pointcloud/src/las/reader.rs
b/rust/sedona-pointcloud/src/las/reader.rs
index 64939a19..80411cfb 100644
--- a/rust/sedona-pointcloud/src/las/reader.rs
+++ b/rust/sedona-pointcloud/src/las/reader.rs
@@ -36,12 +36,10 @@ use laz::{
};
use object_store::ObjectStore;
-use crate::{
- las::{
- builder::RowBuilder,
- metadata::{ChunkMeta, LasMetadata, LasMetadataReader},
- },
- options::PointcloudOptions,
+use crate::las::{
+ builder::RowBuilder,
+ metadata::{ChunkMeta, LasMetadata, LasMetadataReader},
+ options::LasOptions,
};
/// LAS/LAZ file reader factory
@@ -66,7 +64,7 @@ impl LasFileReaderFactory {
pub fn create_reader(
&self,
partitioned_file: PartitionedFile,
- options: PointcloudOptions,
+ options: LasOptions,
) -> Result<Box<LasFileReader>, DataFusionError> {
Ok(Box::new(LasFileReader {
partitioned_file,
@@ -82,7 +80,7 @@ pub struct LasFileReader {
partitioned_file: PartitionedFile,
store: Arc<dyn ObjectStore>,
metadata_cache: Option<Arc<dyn FileMetadataCache>>,
- pub options: PointcloudOptions,
+ pub options: LasOptions,
}
impl LasFileReader {
@@ -111,10 +109,7 @@ impl LasFileReader {
let num_points = chunk_meta.num_points as usize;
let mut builder = RowBuilder::new(num_points, header.clone())
.with_geometry_encoding(self.options.geometry_encoding)
- .with_extra_attributes(
- metadata.extra_attributes.clone(),
- self.options.las.extra_bytes,
- );
+ .with_extra_attributes(metadata.extra_attributes.clone(),
self.options.extra_bytes);
// parse points
if header.laz_vlr().is_ok() {
@@ -187,7 +182,7 @@ pub fn record_decompressor(
Ok(decompressor)
}
-pub(crate) fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point,
DataFusionError> {
+fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point,
DataFusionError> {
RawPoint::read_from(buffer, header.point_format())
.map(|raw_point| Point::new(raw_point, header.transforms()))
.map_err(|e| DataFusionError::External(Box::new(e)))
diff --git a/rust/sedona-pointcloud/src/las/schema.rs
b/rust/sedona-pointcloud/src/las/schema.rs
index c3e68394..f6e799fc 100644
--- a/rust/sedona-pointcloud/src/las/schema.rs
+++ b/rust/sedona-pointcloud/src/las/schema.rs
@@ -22,7 +22,7 @@ use geoarrow_schema::{CoordType, Crs, Dimension, Metadata,
PointType, WkbType};
use las::Header;
use las_crs::{get_epsg_from_geotiff_crs, get_epsg_from_wkt_crs_bytes};
-use crate::{las::options::LasExtraBytes, options::GeometryEncoding};
+use crate::las::options::{GeometryEncoding, LasExtraBytes};
// Arrow schema for LAS points
pub fn try_schema_from_header(
diff --git a/rust/sedona-pointcloud/src/las/source.rs
b/rust/sedona-pointcloud/src/las/source.rs
index 004d726c..6f66b823 100644
--- a/rust/sedona-pointcloud/src/las/source.rs
+++ b/rust/sedona-pointcloud/src/las/source.rs
@@ -15,22 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-use std::{any::Any, sync::Arc};
+use std::{any::Any, iter, sync::Arc};
use datafusion_common::{config::ConfigOptions, error::DataFusionError,
Statistics};
use datafusion_datasource::{
- file::FileSource, file_scan_config::FileScanConfig,
file_stream::FileOpener, TableSchema,
+ file::FileSource, file_groups::FileGroupPartitioner,
file_scan_config::FileScanConfig,
+ file_stream::FileOpener, source::DataSource, TableSchema,
};
-use datafusion_physical_expr::{conjunction, PhysicalExpr};
+use datafusion_physical_expr::{conjunction, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::{
filter_pushdown::{FilterPushdownPropagation, PushedDown},
metrics::ExecutionPlanMetricsSet,
};
use object_store::ObjectStore;
-use crate::{
- las::{format::Extension, opener::LasOpener, reader::LasFileReaderFactory},
- options::PointcloudOptions,
+use crate::las::{
+ format::Extension, opener::LasOpener, options::LasOptions,
reader::LasFileReaderFactory,
};
#[derive(Clone, Debug)]
@@ -46,7 +46,7 @@ pub struct LasSource {
/// Batch size configuration
pub(crate) batch_size: Option<usize>,
pub(crate) projected_statistics: Option<Statistics>,
- pub(crate) options: PointcloudOptions,
+ pub(crate) options: LasOptions,
pub(crate) extension: Extension,
}
@@ -64,7 +64,7 @@ impl LasSource {
}
}
- pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+ pub fn with_options(mut self, options: LasOptions) -> Self {
self.options = options;
self
}
@@ -80,7 +80,7 @@ impl FileSource for LasSource {
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
- _partition: usize,
+ partition: usize,
) -> Arc<dyn FileOpener> {
let projection = base_config
.file_column_projection_indices()
@@ -98,6 +98,8 @@ impl FileSource for LasSource {
predicate: self.predicate.clone(),
file_reader_factory,
options: self.options.clone(),
+ partition_count:
base_config.output_partitioning().partition_count(),
+ partition,
})
}
@@ -149,6 +151,52 @@ impl FileSource for LasSource {
self.extension.as_str()
}
+ fn repartitioned(
+ &self,
+ target_partitions: usize,
+ repartition_file_min_size: usize,
+ output_ordering: Option<LexOrdering>,
+ config: &FileScanConfig,
+ ) -> Result<Option<FileScanConfig>, DataFusionError> {
+ if output_ordering.is_none() & self.options.round_robin_partitioning {
+ // Custom round robin repartitioning
+ //
+ // The default way to partition a dataset to enable parallel
reading
+ // by DataFusion is through splitting files by byte ranges into the
+ // number of target partitions. For selective queries on
(partially)
+ // ordered datasets that support pruning, this can result in
unequal
+ // resource use, as all the work is done on one partition while the
+ // rest is pruned. Additionally, this breaks the existing locality
+ // in the input when it is converted, as data from all partitions
+ // ends up in each output row group. This approach addresses these
+ // issues by partitioning the dataset using a round-robin scheme
+ // across sequential chunks. This improves selective query
performance
+ // by more than half.
+ let mut config = config.clone();
+ config.file_groups = config
+ .file_groups
+ .into_iter()
+ .flat_map(|fg| iter::repeat_n(fg, target_partitions))
+ .collect();
+ return Ok(Some(config));
+ } else {
+ // Default byte range repartitioning
+ let repartitioned_file_groups_option = FileGroupPartitioner::new()
+ .with_target_partitions(target_partitions)
+ .with_repartition_file_min_size(repartition_file_min_size)
+ .with_preserve_order_within_groups(output_ordering.is_some())
+ .repartition_file_groups(&config.file_groups);
+
+ if let Some(repartitioned_file_groups) =
repartitioned_file_groups_option {
+ let mut source = config.clone();
+ source.file_groups = repartitioned_file_groups;
+ return Ok(Some(source));
+ }
+ }
+
+ Ok(None)
+ }
+
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
diff --git a/rust/sedona-pointcloud/src/las/statistics.rs
b/rust/sedona-pointcloud/src/las/statistics.rs
index 87f11abd..36b2d4bf 100644
--- a/rust/sedona-pointcloud/src/las/statistics.rs
+++ b/rust/sedona-pointcloud/src/las/statistics.rs
@@ -15,7 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::{collections::HashSet, io::Cursor, sync::Arc};
+use std::{
+ collections::HashSet,
+ io::{Cursor, Read, Seek},
+ sync::Arc,
+};
use arrow_array::{
builder::PrimitiveBuilder,
@@ -25,22 +29,20 @@ use arrow_array::{
};
use arrow_ipc::{reader::FileReader, writer::FileWriter};
use arrow_schema::{DataType, Field, Schema};
+use byteorder::{LittleEndian, ReadBytesExt};
use datafusion_common::{arrow::compute::concat_batches, Column,
DataFusionError, ScalarValue};
use datafusion_pruning::PruningStatistics;
-use las::{Header, Point};
+use las::Header;
use object_store::{path::Path, ObjectMeta, ObjectStore, PutPayload};
-
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use sedona_geometry::bounding_box::BoundingBox;
-use crate::las::{
- metadata::ChunkMeta,
- reader::{read_point, record_decompressor},
-};
+use crate::las::{metadata::ChunkMeta, reader::record_decompressor};
/// Spatial statistics (extent) of LAS/LAZ chunks for pruning.
///
/// It wraps a `RecordBatch` with x, y, z min and max values and row count per
chunk.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq)]
pub struct LasStatistics {
pub values: RecordBatch,
}
@@ -208,6 +210,7 @@ pub async fn chunk_statistics(
chunk_table: &[ChunkMeta],
header: &Header,
persist: bool,
+ parallel: bool,
) -> Result<LasStatistics, DataFusionError> {
let stats_path = Path::parse(format!("{}.stats",
object_meta.location.as_ref()))?;
@@ -234,9 +237,31 @@ pub async fn chunk_statistics(
// extract statistics
let mut builder =
LasStatisticsBuilder::new_with_capacity(chunk_table.len());
- for chunk_meta in chunk_table {
- let stats = extract_chunk_stats(store, object_meta,
chunk_meta, header).await?;
- builder.add_values(&stats, chunk_meta.num_points);
+ if parallel {
+ // While the method to infer the schema, adopted from the
Parquet
+ // reader, uses concurrency (metadata fetch concurrency), it
is not
+ // parallel. Extracting statistics in parallel can
substantially improve
+ // the extraction process by a factor of the number of cores
available.
+ let stats: Vec<[f64; 6]> = chunk_table
+ .par_iter()
+ .map(|chunk_meta| {
+ futures::executor::block_on(extract_chunk_stats(
+ store,
+ object_meta,
+ chunk_meta,
+ header,
+ ))
+ })
+ .collect::<Result<Vec<[f64; 6]>, DataFusionError>>()?;
+
+ for (stat, meta) in stats.iter().zip(chunk_table) {
+ builder.add_values(stat, meta.num_points);
+ }
+ } else {
+ for chunk_meta in chunk_table {
+ let stats = extract_chunk_stats(store, object_meta,
chunk_meta, header).await?;
+ builder.add_values(&stats, chunk_meta.num_points);
+ }
}
let stats = builder.finish();
@@ -274,14 +299,14 @@ async fn extract_chunk_stats(
f64::NEG_INFINITY,
];
- let extend = |stats: &mut [f64; 6], point: Point| {
+ let extend = |stats: &mut [f64; 6], point: [f64; 3]| {
*stats = [
- stats[0].min(point.x),
- stats[1].max(point.x),
- stats[2].min(point.y),
- stats[3].max(point.y),
- stats[4].min(point.z),
- stats[5].max(point.z),
+ stats[0].min(point[0]),
+ stats[1].max(point[0]),
+ stats[2].min(point[1]),
+ stats[3].max(point[1]),
+ stats[4].min(point[2]),
+ stats[5].max(point[2]),
];
};
@@ -301,14 +326,16 @@ async fn extract_chunk_stats(
for _ in 0..chunk_meta.num_points {
buffer.set_position(0);
decompressor.decompress_next(buffer.get_mut())?;
- let point = read_point(&mut buffer, header)?;
+ let point = parse_coords(&mut buffer, header)?;
extend(&mut stats, point);
}
} else {
let mut buffer = Cursor::new(bytes);
-
+ // offset to next point after reading raw coords
+ let offset = header.point_format().len() as i64 - 3 * 4;
for _ in 0..chunk_meta.num_points {
- let point = read_point(&mut buffer, header)?;
+ let point = parse_coords(&mut buffer, header)?;
+ buffer.seek_relative(offset)?;
extend(&mut stats, point);
}
}
@@ -316,6 +343,14 @@ async fn extract_chunk_stats(
Ok(stats)
}
+fn parse_coords<R: Read>(mut buffer: R, header: &Header) -> Result<[f64; 3],
DataFusionError> {
+ let transforms = header.transforms();
+ let x = transforms.x.direct(buffer.read_i32::<LittleEndian>()?);
+ let y = transforms.y.direct(buffer.read_i32::<LittleEndian>()?);
+ let z = transforms.z.direct(buffer.read_i32::<LittleEndian>()?);
+ Ok([x, y, z])
+}
+
#[cfg(test)]
mod tests {
use std::fs::File;
@@ -327,10 +362,12 @@ mod tests {
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
use sedona_geometry::bounding_box::BoundingBox;
- use crate::{las::metadata::LasMetadataReader, options::PointcloudOptions};
+ use crate::las::{
+ metadata::LasMetadataReader, options::LasOptions,
statistics::chunk_statistics,
+ };
#[tokio::test]
- async fn chunk_statistics() {
+ async fn check_chunk_statistics() {
for path in ["tests/data/large.las", "tests/data/large.laz"] {
// read with `LasMetadataReader`
let store = LocalFileSystem::new();
@@ -341,7 +378,7 @@ mod tests {
let metadata = metadata_reader.fetch_metadata().await.unwrap();
assert!(metadata.statistics.is_none());
- let options = PointcloudOptions {
+ let options = LasOptions {
collect_statistics: true,
..Default::default()
};
@@ -376,6 +413,18 @@ mod tests {
None
))
);
+
+ let par_stats = chunk_statistics(
+ &store,
+ &object_meta,
+ &metadata.chunk_table,
+ &metadata.header,
+ false,
+ true,
+ )
+ .await
+ .unwrap();
+ assert_eq!(statistics, &par_stats);
}
}
@@ -406,7 +455,7 @@ mod tests {
let location = Path::from_filesystem_path(&tmp_path).unwrap();
let object_meta = store.head(&location).await.unwrap();
- let options = PointcloudOptions {
+ let options = LasOptions {
collect_statistics: true,
persist_statistics: true,
..Default::default()
diff --git a/rust/sedona-pointcloud/src/lib.rs
b/rust/sedona-pointcloud/src/lib.rs
index 7a75e041..05df4dff 100644
--- a/rust/sedona-pointcloud/src/lib.rs
+++ b/rust/sedona-pointcloud/src/lib.rs
@@ -16,4 +16,3 @@
// under the License.
pub mod las;
-pub mod options;
diff --git a/rust/sedona-pointcloud/src/options.rs
b/rust/sedona-pointcloud/src/options.rs
deleted file mode 100644
index 51e5067b..00000000
--- a/rust/sedona-pointcloud/src/options.rs
+++ /dev/null
@@ -1,103 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{fmt::Display, str::FromStr};
-
-use datafusion_common::{
- config::{ConfigExtension, ConfigField, Visit},
- error::DataFusionError,
- extensions_options,
-};
-
-use crate::las::options::{LasExtraBytes, LasOptions};
-
-/// Geometry representation
-#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
-pub enum GeometryEncoding {
- /// Use plain coordinates as three fields `x`, `y`, `z` with datatype
Float64 encoding.
- #[default]
- Plain,
- /// Resolves the coordinates to a fields `geometry` with WKB encoding.
- Wkb,
- /// Resolves the coordinates to a fields `geometry` with separated
GeoArrow encoding.
- Native,
-}
-
-impl Display for GeometryEncoding {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- GeometryEncoding::Plain => f.write_str("plain"),
- GeometryEncoding::Wkb => f.write_str("wkb"),
- GeometryEncoding::Native => f.write_str("native"),
- }
- }
-}
-
-impl FromStr for GeometryEncoding {
- type Err = String;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_lowercase().as_str() {
- "plain" => Ok(Self::Plain),
- "wkb" => Ok(Self::Wkb),
- "native" => Ok(Self::Native),
- s => Err(format!("Unable to parse from `{s}`")),
- }
- }
-}
-
-impl ConfigField for GeometryEncoding {
- fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static
str) {
- v.some(
- &format!("{key}.geometry_encoding"),
- self,
- "Specify point geometry encoding",
- );
- }
-
- fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
- *self = value.parse().map_err(DataFusionError::Configuration)?;
- Ok(())
- }
-}
-
-extensions_options! {
- /// Pointcloud configuration options
- pub struct PointcloudOptions {
- pub geometry_encoding: GeometryEncoding, default =
GeometryEncoding::default()
- pub collect_statistics: bool, default = false
- pub persist_statistics: bool, default = false
- pub las: LasOptions, default = LasOptions::default()
- }
-
-}
-
-impl ConfigExtension for PointcloudOptions {
- const PREFIX: &'static str = "pointcloud";
-}
-
-impl PointcloudOptions {
- pub fn with_geometry_encoding(mut self, geometry_encoding:
GeometryEncoding) -> Self {
- self.geometry_encoding = geometry_encoding;
- self
- }
-
- pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
- self.las.extra_bytes = extra_bytes;
- self
- }
-}
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 1664173a..43a84393 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -60,12 +60,9 @@ use sedona_geoparquet::{
provider::{geoparquet_listing_table, GeoParquetReadOptions},
};
#[cfg(feature = "pointcloud")]
-use sedona_pointcloud::{
- las::{
- format::{Extension, LasFormatFactory},
- options::LasExtraBytes,
- },
- options::{GeometryEncoding, PointcloudOptions},
+use sedona_pointcloud::las::{
+ format::{Extension, LasFormatFactory},
+ options::{GeometryEncoding, LasExtraBytes, LasOptions},
};
/// Sedona SessionContext wrapper
@@ -120,7 +117,7 @@ impl SedonaContext {
#[cfg(feature = "pointcloud")]
let session_config = session_config.with_option_extension(
- PointcloudOptions::default()
+ LasOptions::default()
.with_geometry_encoding(GeometryEncoding::Wkb)
.with_las_extra_bytes(LasExtraBytes::Typed),
);