This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 1637efe4 feat(rust/sedona-pointcloud): add optional round robin 
partitioning and parallel statistics extraction (#648)
1637efe4 is described below

commit 1637efe4d4c3bf296f7e88f3dfc5c2a83eb9d788
Author: Balthasar Teuscher <[email protected]>
AuthorDate: Tue Mar 3 00:52:35 2026 +0100

    feat(rust/sedona-pointcloud): add optional round robin partitioning and 
parallel statistics extraction (#648)
---
 Cargo.lock                                   |   2 +
 rust/sedona-pointcloud/Cargo.toml            |   2 +
 rust/sedona-pointcloud/src/las/builder.rs    |  17 ++---
 rust/sedona-pointcloud/src/las/format.rs     |  20 +++---
 rust/sedona-pointcloud/src/las/metadata.rs   |  54 +++++++-------
 rust/sedona-pointcloud/src/las/opener.rs     |  73 ++++++++++++++-----
 rust/sedona-pointcloud/src/las/options.rs    |  97 +++++++++++++++++++++----
 rust/sedona-pointcloud/src/las/reader.rs     |  21 +++---
 rust/sedona-pointcloud/src/las/schema.rs     |   2 +-
 rust/sedona-pointcloud/src/las/source.rs     |  66 ++++++++++++++---
 rust/sedona-pointcloud/src/las/statistics.rs |  99 ++++++++++++++++++-------
 rust/sedona-pointcloud/src/lib.rs            |   1 -
 rust/sedona-pointcloud/src/options.rs        | 103 ---------------------------
 rust/sedona/src/context.rs                   |  11 ++-
 14 files changed, 331 insertions(+), 237 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 158086b4..7292f3dc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5434,6 +5434,7 @@ dependencies = [
  "arrow-schema",
  "async-stream",
  "async-trait",
+ "byteorder",
  "bytes",
  "datafusion",
  "datafusion-catalog",
@@ -5450,6 +5451,7 @@ dependencies = [
  "las-crs",
  "laz",
  "object_store",
+ "rayon",
  "sedona",
  "sedona-expr",
  "sedona-geometry",
diff --git a/rust/sedona-pointcloud/Cargo.toml 
b/rust/sedona-pointcloud/Cargo.toml
index 478e9ad3..921f30c8 100644
--- a/rust/sedona-pointcloud/Cargo.toml
+++ b/rust/sedona-pointcloud/Cargo.toml
@@ -36,6 +36,7 @@ arrow-ipc = { workspace = true }
 arrow-schema = { workspace = true }
 async-stream = "0.3.6"
 async-trait = { workspace = true }
+byteorder = { workspace = true }
 bytes = { workspace = true }
 datafusion-catalog = { workspace = true }
 datafusion-common = { workspace = true }
@@ -51,6 +52,7 @@ las = { version = "0.9.10", features = ["laz"] }
 las-crs = { version = "1.0.0" }
 laz = "0.12.0"
 object_store = { workspace = true }
+rayon = "1.11.0"
 sedona-expr = { workspace = true }
 sedona-geometry = { workspace = true }
 
diff --git a/rust/sedona-pointcloud/src/las/builder.rs 
b/rust/sedona-pointcloud/src/las/builder.rs
index 8e2e8a85..0f396085 100644
--- a/rust/sedona-pointcloud/src/las/builder.rs
+++ b/rust/sedona-pointcloud/src/las/builder.rs
@@ -35,9 +35,10 @@ use geoarrow_array::{
 use geoarrow_schema::Dimension;
 use las::{Header, Point};
 
-use crate::{
-    las::{metadata::ExtraAttribute, options::LasExtraBytes, 
schema::try_schema_from_header},
-    options::GeometryEncoding,
+use crate::las::{
+    metadata::ExtraAttribute,
+    options::{GeometryEncoding, LasExtraBytes},
+    schema::try_schema_from_header,
 };
 
 #[derive(Debug)]
@@ -515,9 +516,9 @@ mod tests {
     use las::{point::Format, Builder, Writer};
     use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
 
-    use crate::{
-        las::{options::LasExtraBytes, reader::LasFileReaderFactory},
-        options::PointcloudOptions,
+    use crate::las::{
+        options::{LasExtraBytes, LasOptions},
+        reader::LasFileReaderFactory,
     };
 
     #[tokio::test]
@@ -544,7 +545,7 @@ mod tests {
             let file_reader = LasFileReaderFactory::new(Arc::new(store), None)
                 .create_reader(
                     PartitionedFile::new(location, object.size),
-                    PointcloudOptions::default(),
+                    LasOptions::default(),
                 )
                 .unwrap();
             let metadata = file_reader.get_metadata().await.unwrap();
@@ -578,7 +579,7 @@ mod tests {
         let file_reader = LasFileReaderFactory::new(Arc::new(store), None)
             .create_reader(
                 PartitionedFile::new(location, object.size),
-                
PointcloudOptions::default().with_las_extra_bytes(LasExtraBytes::Typed),
+                
LasOptions::default().with_las_extra_bytes(LasExtraBytes::Typed),
             )
             .unwrap();
         let metadata = file_reader.get_metadata().await.unwrap();
diff --git a/rust/sedona-pointcloud/src/las/format.rs 
b/rust/sedona-pointcloud/src/las/format.rs
index 48e07054..d2f68240 100644
--- a/rust/sedona-pointcloud/src/las/format.rs
+++ b/rust/sedona-pointcloud/src/las/format.rs
@@ -33,9 +33,9 @@ use datafusion_physical_plan::ExecutionPlan;
 use futures::{StreamExt, TryStreamExt};
 use object_store::{ObjectMeta, ObjectStore};
 
-use crate::{
-    las::{metadata::LasMetadataReader, reader::LasFileReaderFactory, 
source::LasSource},
-    options::PointcloudOptions,
+use crate::las::{
+    metadata::LasMetadataReader, options::LasOptions, 
reader::LasFileReaderFactory,
+    source::LasSource,
 };
 
 #[derive(Debug, Clone, Copy)]
@@ -56,7 +56,7 @@ impl Extension {
 /// Factory struct used to create [LasFormat]
 pub struct LasFormatFactory {
     // inner options for LAS/LAZ
-    pub options: Option<PointcloudOptions>,
+    pub options: Option<LasOptions>,
     extension: Extension,
 }
 
@@ -70,7 +70,7 @@ impl LasFormatFactory {
     }
 
     /// Creates an instance of [LasFormatFactory] with customized default 
options
-    pub fn new_with(options: PointcloudOptions, extension: Extension) -> Self {
+    pub fn new_with(options: LasOptions, extension: Extension) -> Self {
         Self {
             options: Some(options),
             extension,
@@ -87,8 +87,8 @@ impl FileFormatFactory for LasFormatFactory {
         let mut options = state
             .config_options()
             .extensions
-            .get::<PointcloudOptions>()
-            .or_else(|| 
state.table_options().extensions.get::<PointcloudOptions>())
+            .get::<LasOptions>()
+            .or_else(|| state.table_options().extensions.get::<LasOptions>())
             .cloned()
             .or(self.options.clone())
             .unwrap_or_default();
@@ -129,7 +129,7 @@ impl fmt::Debug for LasFormatFactory {
 /// The LAS/LAZ `FileFormat` implementation
 #[derive(Debug)]
 pub struct LasFormat {
-    pub options: PointcloudOptions,
+    pub options: LasOptions,
     extension: Extension,
 }
 
@@ -141,7 +141,7 @@ impl LasFormat {
         }
     }
 
-    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+    pub fn with_options(mut self, options: LasOptions) -> Self {
         self.options = options;
         self
     }
@@ -195,7 +195,7 @@ impl FileFormat for LasFormat {
                 Ok::<_, DataFusionError>((loc_path, schema))
             })
             .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
-            // fetch schemas concurrently, if requested
+            // fetch schemas concurrently, if requested (note that this is not 
parallel)
             .buffered(state.config_options().execution.meta_fetch_concurrency)
             .try_collect()
             .await?;
diff --git a/rust/sedona-pointcloud/src/las/metadata.rs 
b/rust/sedona-pointcloud/src/las/metadata.rs
index ab9b40ac..5bb8052a 100644
--- a/rust/sedona-pointcloud/src/las/metadata.rs
+++ b/rust/sedona-pointcloud/src/las/metadata.rs
@@ -36,12 +36,10 @@ use las::{
 use laz::laszip::ChunkTable;
 use object_store::{ObjectMeta, ObjectStore};
 
-use crate::{
-    las::{
-        schema::try_schema_from_header,
-        statistics::{chunk_statistics, LasStatistics},
-    },
-    options::PointcloudOptions,
+use crate::las::{
+    options::LasOptions,
+    schema::try_schema_from_header,
+    statistics::{chunk_statistics, LasStatistics},
 };
 
 /// LAS/LAZ chunk metadata
@@ -92,7 +90,7 @@ pub struct LasMetadataReader<'a> {
     store: &'a dyn ObjectStore,
     object_meta: &'a ObjectMeta,
     file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
-    options: PointcloudOptions,
+    options: LasOptions,
 }
 
 impl<'a> LasMetadataReader<'a> {
@@ -115,18 +113,11 @@ impl<'a> LasMetadataReader<'a> {
     }
 
     /// set table options
-    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+    pub fn with_options(mut self, options: LasOptions) -> Self {
         self.options = options;
         self
     }
 
-    /// Fetch header
-    pub async fn fetch_header(&self) -> Result<Header, DataFusionError> {
-        fetch_header(self.store, self.object_meta)
-            .await
-            .map_err(DataFusionError::External)
-    }
-
     /// Fetch LAS/LAZ metadata from the remote object store
     pub async fn fetch_metadata(&self) -> Result<Arc<LasMetadata>, 
DataFusionError> {
         let Self {
@@ -149,13 +140,9 @@ impl<'a> LasMetadataReader<'a> {
             return Ok(las_file_metadata);
         }
 
-        let header = self.fetch_header().await?;
+        let header = fetch_header(*store, object_meta).await?;
         let extra_attributes = extra_bytes_attributes(&header)?;
-        let chunk_table = if header.laz_vlr().is_ok() {
-            laz_chunk_table(*store, object_meta, &header).await?
-        } else {
-            las_chunk_table(&header).await?
-        };
+        let chunk_table = fetch_chunk_table(*store, object_meta, 
&header).await?;
         let statistics = if options.collect_statistics {
             Some(
                 chunk_statistics(
@@ -164,6 +151,7 @@ impl<'a> LasMetadataReader<'a> {
                     &chunk_table,
                     &header,
                     options.persist_statistics,
+                    options.parallel_statistics_extraction,
                 )
                 .await?,
             )
@@ -192,7 +180,7 @@ impl<'a> LasMetadataReader<'a> {
         let schema = try_schema_from_header(
             &metadata.header,
             self.options.geometry_encoding,
-            self.options.las.extra_bytes,
+            self.options.extra_bytes,
         )?;
 
         Ok(schema)
@@ -241,7 +229,8 @@ impl<'a> LasMetadataReader<'a> {
     }
 }
 
-async fn fetch_header(
+/// Fetch the [Header] of a LAS/LAZ file
+pub async fn fetch_header(
     store: &(impl ObjectStore + ?Sized),
     object_meta: &ObjectMeta,
 ) -> Result<Header, Box<dyn Error + Send + Sync>> {
@@ -296,6 +285,7 @@ async fn fetch_header(
     Ok(builder.into_header()?)
 }
 
+/// Extra attribute information (custom attributes in LAS/LAZ files)
 #[derive(Debug, Clone, PartialEq)]
 pub struct ExtraAttribute {
     pub data_type: DataType,
@@ -368,6 +358,19 @@ fn extra_bytes_attributes(
     Ok(attributes)
 }
 
+/// Fetch or generate chunk table metadata.
+pub async fn fetch_chunk_table(
+    store: &(impl ObjectStore + ?Sized),
+    object_meta: &ObjectMeta,
+    header: &Header,
+) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> {
+    if header.laz_vlr().is_ok() {
+        laz_chunk_table(store, object_meta, header).await
+    } else {
+        las_chunk_table(header).await
+    }
+}
+
 async fn laz_chunk_table(
     store: &(impl ObjectStore + ?Sized),
     object_meta: &ObjectMeta,
@@ -482,7 +485,7 @@ mod tests {
     use las::{point::Format, Builder, Reader, Writer};
     use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
 
-    use crate::las::metadata::LasMetadataReader;
+    use crate::las::metadata::fetch_header;
 
     #[tokio::test]
     async fn header_basic_e2e() {
@@ -503,14 +506,13 @@ mod tests {
         let store = LocalFileSystem::new();
         let location = Path::from_filesystem_path(&tmp_path).unwrap();
         let object_meta = store.head(&location).await.unwrap();
-        let metadata_reader = LasMetadataReader::new(&store, &object_meta);
 
         // read with las `Reader`
         let reader = Reader::from_path(&tmp_path).unwrap();
 
         assert_eq!(
             reader.header(),
-            &metadata_reader.fetch_header().await.unwrap()
+            &fetch_header(&store, &object_meta).await.unwrap()
         );
     }
 }
diff --git a/rust/sedona-pointcloud/src/las/opener.rs 
b/rust/sedona-pointcloud/src/las/opener.rs
index 249e5392..941788ef 100644
--- a/rust/sedona-pointcloud/src/las/opener.rs
+++ b/rust/sedona-pointcloud/src/las/opener.rs
@@ -29,12 +29,10 @@ use futures::StreamExt;
 use sedona_expr::spatial_filter::SpatialFilter;
 use sedona_geometry::bounding_box::BoundingBox;
 
-use crate::{
-    las::{
-        reader::{LasFileReader, LasFileReaderFactory},
-        schema::try_schema_from_header,
-    },
-    options::PointcloudOptions,
+use crate::las::{
+    options::LasOptions,
+    reader::{LasFileReader, LasFileReaderFactory},
+    schema::try_schema_from_header,
 };
 
 pub struct LasOpener {
@@ -42,13 +40,18 @@ pub struct LasOpener {
     pub projection: Arc<[usize]>,
     /// Optional limit on the number of rows to read
     pub limit: Option<usize>,
+    /// Filter predicate for pruning
     pub predicate: Option<Arc<dyn PhysicalExpr>>,
     /// Factory for instantiating LAS/LAZ reader
     pub file_reader_factory: Arc<LasFileReaderFactory>,
     /// Table options
-    pub options: PointcloudOptions,
+    pub options: LasOptions,
     /// Target batch size
-    pub(crate) batch_size: usize,
+    pub batch_size: usize,
+    /// Target partition count
+    pub partition_count: usize,
+    /// Partition to read
+    pub partition: usize,
 }
 
 impl FileOpener for LasOpener {
@@ -56,6 +59,9 @@ impl FileOpener for LasOpener {
         let projection = self.projection.clone();
         let limit = self.limit;
         let batch_size = self.batch_size;
+        let round_robin = self.options.round_robin_partitioning;
+        let partition_count = self.partition_count;
+        let partition = self.partition;
 
         let predicate = self.predicate.clone();
 
@@ -68,7 +74,7 @@ impl FileOpener for LasOpener {
             let schema = Arc::new(try_schema_from_header(
                 &metadata.header,
                 file_reader.options.geometry_encoding,
-                file_reader.options.las.extra_bytes,
+                file_reader.options.extra_bytes,
             )?);
 
             let pruning_predicate = predicate.and_then(|physical_expr| {
@@ -117,6 +123,11 @@ impl FileOpener for LasOpener {
 
             let stream = async_stream::try_stream! {
                 for (i, chunk_meta) in metadata.chunk_table.iter().enumerate() 
{
+                    // round robin
+                    if round_robin && i % partition_count != partition {
+                        continue;
+                    }
+
                     // limit
                     if let Some(limit) = limit {
                         if row_count >= limit {
@@ -187,10 +198,10 @@ mod tests {
         let ctx = SedonaContext::new_local_interactive().await.unwrap();
 
         // ensure no faulty chunk pruning
-        ctx.sql("SET pointcloud.geometry_encoding = 'plain'")
+        ctx.sql("SET las.geometry_encoding = 'plain'")
             .await
             .unwrap();
-        ctx.sql("SET pointcloud.collect_statistics = 'true'")
+        ctx.sql("SET las.collect_statistics = 'true'")
             .await
             .unwrap();
 
@@ -212,9 +223,7 @@ mod tests {
             .unwrap();
         assert_eq!(count, 50000);
 
-        ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
-            .await
-            .unwrap();
+        ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
         let count = ctx
             .sql(&format!("SELECT * FROM \"{path}\" WHERE 
ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7, 
0 0))'))"))
             .await
@@ -233,10 +242,10 @@ mod tests {
         let ctx = SedonaContext::new_local_interactive().await.unwrap();
 
         // ensure no faulty chunk pruning
-        ctx.sql("SET pointcloud.geometry_encoding = 'plain'")
+        ctx.sql("SET las.geometry_encoding = 'plain'")
             .await
             .unwrap();
-        ctx.sql("SET pointcloud.collect_statistics = 'true'")
+        ctx.sql("SET las.collect_statistics = 'true'")
             .await
             .unwrap();
 
@@ -258,9 +267,7 @@ mod tests {
             .unwrap();
         assert_eq!(count, 50000);
 
-        ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
-            .await
-            .unwrap();
+        ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
         let count = ctx
             .sql(&format!("SELECT * FROM \"{path}\" WHERE 
ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7, 
0 0))'))"))
             .await
@@ -270,4 +277,32 @@ mod tests {
             .unwrap();
         assert_eq!(count, 50000);
     }
+
+    #[tokio::test]
+    async fn round_robin_partitioning() {
+        // file with two clusters, one at 0.5 one at 1.0
+        let path = "tests/data/large.laz";
+
+        let ctx = SedonaContext::new_local_interactive().await.unwrap();
+
+        let result1 = ctx
+            .sql(&format!("SELECT * FROM \"{path}\""))
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        ctx.sql("SET las.round_robin_partitioning = 'true'")
+            .await
+            .unwrap();
+        let result2 = ctx
+            .sql(&format!("SELECT * FROM \"{path}\""))
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+        assert_eq!(result1, result2);
+    }
 }
diff --git a/rust/sedona-pointcloud/src/las/options.rs 
b/rust/sedona-pointcloud/src/las/options.rs
index de02628f..a7c656e2 100644
--- a/rust/sedona-pointcloud/src/las/options.rs
+++ b/rust/sedona-pointcloud/src/las/options.rs
@@ -18,11 +18,61 @@
 use std::{fmt::Display, str::FromStr};
 
 use datafusion_common::{
-    config::{ConfigField, Visit},
-    config_namespace,
+    config::{ConfigExtension, ConfigField, Visit},
     error::DataFusionError,
+    extensions_options,
 };
 
+/// Geometry representation
+#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
+pub enum GeometryEncoding {
+    /// Use plain coordinates as three fields `x`, `y`, `z` with datatype 
Float64 encoding.
+    #[default]
+    Plain,
+    /// Resolves the coordinates to a fields `geometry` with WKB encoding.
+    Wkb,
+    /// Resolves the coordinates to a fields `geometry` with separated 
GeoArrow encoding.
+    Native,
+}
+
+impl Display for GeometryEncoding {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            GeometryEncoding::Plain => f.write_str("plain"),
+            GeometryEncoding::Wkb => f.write_str("wkb"),
+            GeometryEncoding::Native => f.write_str("native"),
+        }
+    }
+}
+
+impl FromStr for GeometryEncoding {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "plain" => Ok(Self::Plain),
+            "wkb" => Ok(Self::Wkb),
+            "native" => Ok(Self::Native),
+            s => Err(format!("Unable to parse from `{s}`")),
+        }
+    }
+}
+
+impl ConfigField for GeometryEncoding {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static 
str) {
+        v.some(
+            &format!("{key}.geometry_encoding"),
+            self,
+            "Specify point geometry encoding",
+        );
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
+        *self = value.parse().map_err(DataFusionError::Configuration)?;
+        Ok(())
+    }
+}
+
 /// LAS extra bytes handling
 #[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
 pub enum LasExtraBytes {
@@ -73,16 +123,37 @@ impl ConfigField for LasExtraBytes {
     }
 }
 
-config_namespace! {
-    /// The LAS config options
+extensions_options! {
+    /// LAS/LAZ configuration options
+    ///
+    /// * `geometry encoding`: plain (x, y, z), wkb or native (geoarrow)
+    /// * `collect statistics`: extract las/laz chunk statistics (requires a 
full scan on registration)
+    /// * `parallel statistics extraction`: extract statistics in parallel
+    /// * `persist statistics`: store statistics in a sidecar file for future 
reuse (requires write access)
+    /// * `round robin partitioning`: read chunks in parallel with round robin 
instead of byte range (default)
+    /// * `extra bytes`: las extra byte attributes handling, ignore, keep as 
binary blob, or typed
     pub struct LasOptions {
+        pub geometry_encoding: GeometryEncoding, default = 
GeometryEncoding::default()
         pub extra_bytes: LasExtraBytes, default = LasExtraBytes::default()
+        pub collect_statistics: bool, default = false
+        pub parallel_statistics_extraction: bool, default = false
+        pub persist_statistics: bool, default = false
+        pub round_robin_partitioning: bool, default = false
     }
 
 }
 
+impl ConfigExtension for LasOptions {
+    const PREFIX: &'static str = "las";
+}
+
 impl LasOptions {
-    pub fn with_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
+    pub fn with_geometry_encoding(mut self, geometry_encoding: 
GeometryEncoding) -> Self {
+        self.geometry_encoding = geometry_encoding;
+        self
+    }
+
+    pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
         self.extra_bytes = extra_bytes;
         self
     }
@@ -97,13 +168,13 @@ mod test {
         prelude::{SessionConfig, SessionContext},
     };
 
-    use crate::{
-        las::format::{Extension, LasFormatFactory},
-        options::PointcloudOptions,
+    use crate::las::{
+        format::{Extension, LasFormatFactory},
+        options::LasOptions,
     };
 
     fn setup_context() -> SessionContext {
-        let config = 
SessionConfig::new().with_option_extension(PointcloudOptions::default());
+        let config = 
SessionConfig::new().with_option_extension(LasOptions::default());
         let mut state = SessionStateBuilder::new().with_config(config).build();
 
         let file_format = Arc::new(LasFormatFactory::new(Extension::Las));
@@ -135,12 +206,8 @@ mod test {
         assert_eq!(df.schema().fields().len(), 3);
 
         // overwrite options
-        ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
-            .await
-            .unwrap();
-        ctx.sql("SET pointcloud.las.extra_bytes = 'blob'")
-            .await
-            .unwrap();
+        ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap();
+        ctx.sql("SET las.extra_bytes = 'blob'").await.unwrap();
 
         let df = ctx
             .sql("SELECT geometry, extra_bytes FROM 'tests/data/extra.las'")
diff --git a/rust/sedona-pointcloud/src/las/reader.rs 
b/rust/sedona-pointcloud/src/las/reader.rs
index 64939a19..80411cfb 100644
--- a/rust/sedona-pointcloud/src/las/reader.rs
+++ b/rust/sedona-pointcloud/src/las/reader.rs
@@ -36,12 +36,10 @@ use laz::{
 };
 use object_store::ObjectStore;
 
-use crate::{
-    las::{
-        builder::RowBuilder,
-        metadata::{ChunkMeta, LasMetadata, LasMetadataReader},
-    },
-    options::PointcloudOptions,
+use crate::las::{
+    builder::RowBuilder,
+    metadata::{ChunkMeta, LasMetadata, LasMetadataReader},
+    options::LasOptions,
 };
 
 /// LAS/LAZ file reader factory
@@ -66,7 +64,7 @@ impl LasFileReaderFactory {
     pub fn create_reader(
         &self,
         partitioned_file: PartitionedFile,
-        options: PointcloudOptions,
+        options: LasOptions,
     ) -> Result<Box<LasFileReader>, DataFusionError> {
         Ok(Box::new(LasFileReader {
             partitioned_file,
@@ -82,7 +80,7 @@ pub struct LasFileReader {
     partitioned_file: PartitionedFile,
     store: Arc<dyn ObjectStore>,
     metadata_cache: Option<Arc<dyn FileMetadataCache>>,
-    pub options: PointcloudOptions,
+    pub options: LasOptions,
 }
 
 impl LasFileReader {
@@ -111,10 +109,7 @@ impl LasFileReader {
         let num_points = chunk_meta.num_points as usize;
         let mut builder = RowBuilder::new(num_points, header.clone())
             .with_geometry_encoding(self.options.geometry_encoding)
-            .with_extra_attributes(
-                metadata.extra_attributes.clone(),
-                self.options.las.extra_bytes,
-            );
+            .with_extra_attributes(metadata.extra_attributes.clone(), 
self.options.extra_bytes);
 
         // parse points
         if header.laz_vlr().is_ok() {
@@ -187,7 +182,7 @@ pub fn record_decompressor(
     Ok(decompressor)
 }
 
-pub(crate) fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point, 
DataFusionError> {
+fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point, 
DataFusionError> {
     RawPoint::read_from(buffer, header.point_format())
         .map(|raw_point| Point::new(raw_point, header.transforms()))
         .map_err(|e| DataFusionError::External(Box::new(e)))
diff --git a/rust/sedona-pointcloud/src/las/schema.rs 
b/rust/sedona-pointcloud/src/las/schema.rs
index c3e68394..f6e799fc 100644
--- a/rust/sedona-pointcloud/src/las/schema.rs
+++ b/rust/sedona-pointcloud/src/las/schema.rs
@@ -22,7 +22,7 @@ use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, 
PointType, WkbType};
 use las::Header;
 use las_crs::{get_epsg_from_geotiff_crs, get_epsg_from_wkt_crs_bytes};
 
-use crate::{las::options::LasExtraBytes, options::GeometryEncoding};
+use crate::las::options::{GeometryEncoding, LasExtraBytes};
 
 // Arrow schema for LAS points
 pub fn try_schema_from_header(
diff --git a/rust/sedona-pointcloud/src/las/source.rs 
b/rust/sedona-pointcloud/src/las/source.rs
index 004d726c..6f66b823 100644
--- a/rust/sedona-pointcloud/src/las/source.rs
+++ b/rust/sedona-pointcloud/src/las/source.rs
@@ -15,22 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{any::Any, sync::Arc};
+use std::{any::Any, iter, sync::Arc};
 
 use datafusion_common::{config::ConfigOptions, error::DataFusionError, 
Statistics};
 use datafusion_datasource::{
-    file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener, TableSchema,
+    file::FileSource, file_groups::FileGroupPartitioner, 
file_scan_config::FileScanConfig,
+    file_stream::FileOpener, source::DataSource, TableSchema,
 };
-use datafusion_physical_expr::{conjunction, PhysicalExpr};
+use datafusion_physical_expr::{conjunction, LexOrdering, PhysicalExpr};
 use datafusion_physical_plan::{
     filter_pushdown::{FilterPushdownPropagation, PushedDown},
     metrics::ExecutionPlanMetricsSet,
 };
 use object_store::ObjectStore;
 
-use crate::{
-    las::{format::Extension, opener::LasOpener, reader::LasFileReaderFactory},
-    options::PointcloudOptions,
+use crate::las::{
+    format::Extension, opener::LasOpener, options::LasOptions, 
reader::LasFileReaderFactory,
 };
 
 #[derive(Clone, Debug)]
@@ -46,7 +46,7 @@ pub struct LasSource {
     /// Batch size configuration
     pub(crate) batch_size: Option<usize>,
     pub(crate) projected_statistics: Option<Statistics>,
-    pub(crate) options: PointcloudOptions,
+    pub(crate) options: LasOptions,
     pub(crate) extension: Extension,
 }
 
@@ -64,7 +64,7 @@ impl LasSource {
         }
     }
 
-    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+    pub fn with_options(mut self, options: LasOptions) -> Self {
         self.options = options;
         self
     }
@@ -80,7 +80,7 @@ impl FileSource for LasSource {
         &self,
         object_store: Arc<dyn ObjectStore>,
         base_config: &FileScanConfig,
-        _partition: usize,
+        partition: usize,
     ) -> Arc<dyn FileOpener> {
         let projection = base_config
             .file_column_projection_indices()
@@ -98,6 +98,8 @@ impl FileSource for LasSource {
             predicate: self.predicate.clone(),
             file_reader_factory,
             options: self.options.clone(),
+            partition_count: 
base_config.output_partitioning().partition_count(),
+            partition,
         })
     }
 
@@ -149,6 +151,52 @@ impl FileSource for LasSource {
         self.extension.as_str()
     }
 
+    fn repartitioned(
+        &self,
+        target_partitions: usize,
+        repartition_file_min_size: usize,
+        output_ordering: Option<LexOrdering>,
+        config: &FileScanConfig,
+    ) -> Result<Option<FileScanConfig>, DataFusionError> {
+        if output_ordering.is_none() & self.options.round_robin_partitioning {
+            // Custom round robin repartitioning
+            //
+            // The default way to partition a dataset to enable parallel 
reading
+            // by DataFusion is through splitting files by byte ranges into the
+            // number of target partitions. For selective queries on 
(partially)
+            // ordered datasets that support pruning, this can result in 
unequal
+            // resource use, as all the work is done on one partition while the
+            // rest is pruned. Additionally, this breaks the existing locality
+            // in the input when it is converted, as data from all partitions
+            // ends up in each output row group. This approach addresses these
+            // issues by partitioning the dataset using a round-robin scheme
+            // across sequential chunks. This improves selective query 
performance
+            // by more than half.
+            let mut config = config.clone();
+            config.file_groups = config
+                .file_groups
+                .into_iter()
+                .flat_map(|fg| iter::repeat_n(fg, target_partitions))
+                .collect();
+            return Ok(Some(config));
+        } else {
+            // Default byte range repartitioning
+            let repartitioned_file_groups_option = FileGroupPartitioner::new()
+                .with_target_partitions(target_partitions)
+                .with_repartition_file_min_size(repartition_file_min_size)
+                .with_preserve_order_within_groups(output_ordering.is_some())
+                .repartition_file_groups(&config.file_groups);
+
+            if let Some(repartitioned_file_groups) = 
repartitioned_file_groups_option {
+                let mut source = config.clone();
+                source.file_groups = repartitioned_file_groups;
+                return Ok(Some(source));
+            }
+        }
+
+        Ok(None)
+    }
+
     fn try_pushdown_filters(
         &self,
         filters: Vec<Arc<dyn PhysicalExpr>>,
diff --git a/rust/sedona-pointcloud/src/las/statistics.rs 
b/rust/sedona-pointcloud/src/las/statistics.rs
index 87f11abd..36b2d4bf 100644
--- a/rust/sedona-pointcloud/src/las/statistics.rs
+++ b/rust/sedona-pointcloud/src/las/statistics.rs
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{collections::HashSet, io::Cursor, sync::Arc};
+use std::{
+    collections::HashSet,
+    io::{Cursor, Read, Seek},
+    sync::Arc,
+};
 
 use arrow_array::{
     builder::PrimitiveBuilder,
@@ -25,22 +29,20 @@ use arrow_array::{
 };
 use arrow_ipc::{reader::FileReader, writer::FileWriter};
 use arrow_schema::{DataType, Field, Schema};
+use byteorder::{LittleEndian, ReadBytesExt};
 use datafusion_common::{arrow::compute::concat_batches, Column, 
DataFusionError, ScalarValue};
 use datafusion_pruning::PruningStatistics;
-use las::{Header, Point};
+use las::Header;
 use object_store::{path::Path, ObjectMeta, ObjectStore, PutPayload};
-
+use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
 use sedona_geometry::bounding_box::BoundingBox;
 
-use crate::las::{
-    metadata::ChunkMeta,
-    reader::{read_point, record_decompressor},
-};
+use crate::las::{metadata::ChunkMeta, reader::record_decompressor};
 
 /// Spatial statistics (extent) of LAS/LAZ chunks for pruning.
 ///
 /// It wraps a `RecordBatch` with x, y, z min and max values and row count per 
chunk.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq)]
 pub struct LasStatistics {
     pub values: RecordBatch,
 }
@@ -208,6 +210,7 @@ pub async fn chunk_statistics(
     chunk_table: &[ChunkMeta],
     header: &Header,
     persist: bool,
+    parallel: bool,
 ) -> Result<LasStatistics, DataFusionError> {
     let stats_path = Path::parse(format!("{}.stats", 
object_meta.location.as_ref()))?;
 
@@ -234,9 +237,31 @@ pub async fn chunk_statistics(
             // extract statistics
             let mut builder = 
LasStatisticsBuilder::new_with_capacity(chunk_table.len());
 
-            for chunk_meta in chunk_table {
-                let stats = extract_chunk_stats(store, object_meta, 
chunk_meta, header).await?;
-                builder.add_values(&stats, chunk_meta.num_points);
+            if parallel {
+                // While the method to infer the schema, adopted from the 
Parquet
+                // reader, uses concurrency (metadata fetch concurrency), it 
is not
+                // parallel. Extracting statistics in parallel can 
substantially improve
+                // the extraction process by a factor of the number of cores 
available.
+                let stats: Vec<[f64; 6]> = chunk_table
+                    .par_iter()
+                    .map(|chunk_meta| {
+                        futures::executor::block_on(extract_chunk_stats(
+                            store,
+                            object_meta,
+                            chunk_meta,
+                            header,
+                        ))
+                    })
+                    .collect::<Result<Vec<[f64; 6]>, DataFusionError>>()?;
+
+                for (stat, meta) in stats.iter().zip(chunk_table) {
+                    builder.add_values(stat, meta.num_points);
+                }
+            } else {
+                for chunk_meta in chunk_table {
+                    let stats = extract_chunk_stats(store, object_meta, 
chunk_meta, header).await?;
+                    builder.add_values(&stats, chunk_meta.num_points);
+                }
             }
 
             let stats = builder.finish();
@@ -274,14 +299,14 @@ async fn extract_chunk_stats(
         f64::NEG_INFINITY,
     ];
 
-    let extend = |stats: &mut [f64; 6], point: Point| {
+    let extend = |stats: &mut [f64; 6], point: [f64; 3]| {
         *stats = [
-            stats[0].min(point.x),
-            stats[1].max(point.x),
-            stats[2].min(point.y),
-            stats[3].max(point.y),
-            stats[4].min(point.z),
-            stats[5].max(point.z),
+            stats[0].min(point[0]),
+            stats[1].max(point[0]),
+            stats[2].min(point[1]),
+            stats[3].max(point[1]),
+            stats[4].min(point[2]),
+            stats[5].max(point[2]),
         ];
     };
 
@@ -301,14 +326,16 @@ async fn extract_chunk_stats(
         for _ in 0..chunk_meta.num_points {
             buffer.set_position(0);
             decompressor.decompress_next(buffer.get_mut())?;
-            let point = read_point(&mut buffer, header)?;
+            let point = parse_coords(&mut buffer, header)?;
             extend(&mut stats, point);
         }
     } else {
         let mut buffer = Cursor::new(bytes);
-
+        // offset to next point after reading raw coords
+        let offset = header.point_format().len() as i64 - 3 * 4;
         for _ in 0..chunk_meta.num_points {
-            let point = read_point(&mut buffer, header)?;
+            let point = parse_coords(&mut buffer, header)?;
+            buffer.seek_relative(offset)?;
             extend(&mut stats, point);
         }
     }
@@ -316,6 +343,14 @@ async fn extract_chunk_stats(
     Ok(stats)
 }
 
+fn parse_coords<R: Read>(mut buffer: R, header: &Header) -> Result<[f64; 3], 
DataFusionError> {
+    let transforms = header.transforms();
+    let x = transforms.x.direct(buffer.read_i32::<LittleEndian>()?);
+    let y = transforms.y.direct(buffer.read_i32::<LittleEndian>()?);
+    let z = transforms.z.direct(buffer.read_i32::<LittleEndian>()?);
+    Ok([x, y, z])
+}
+
 #[cfg(test)]
 mod tests {
     use std::fs::File;
@@ -327,10 +362,12 @@ mod tests {
     use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
     use sedona_geometry::bounding_box::BoundingBox;
 
-    use crate::{las::metadata::LasMetadataReader, options::PointcloudOptions};
+    use crate::las::{
+        metadata::LasMetadataReader, options::LasOptions, 
statistics::chunk_statistics,
+    };
 
     #[tokio::test]
-    async fn chunk_statistics() {
+    async fn check_chunk_statistics() {
         for path in ["tests/data/large.las", "tests/data/large.laz"] {
             // read with `LasMetadataReader`
             let store = LocalFileSystem::new();
@@ -341,7 +378,7 @@ mod tests {
             let metadata = metadata_reader.fetch_metadata().await.unwrap();
             assert!(metadata.statistics.is_none());
 
-            let options = PointcloudOptions {
+            let options = LasOptions {
                 collect_statistics: true,
                 ..Default::default()
             };
@@ -376,6 +413,18 @@ mod tests {
                     None
                 ))
             );
+
+            let par_stats = chunk_statistics(
+                &store,
+                &object_meta,
+                &metadata.chunk_table,
+                &metadata.header,
+                false,
+                true,
+            )
+            .await
+            .unwrap();
+            assert_eq!(statistics, &par_stats);
         }
     }
 
@@ -406,7 +455,7 @@ mod tests {
         let location = Path::from_filesystem_path(&tmp_path).unwrap();
         let object_meta = store.head(&location).await.unwrap();
 
-        let options = PointcloudOptions {
+        let options = LasOptions {
             collect_statistics: true,
             persist_statistics: true,
             ..Default::default()
diff --git a/rust/sedona-pointcloud/src/lib.rs 
b/rust/sedona-pointcloud/src/lib.rs
index 7a75e041..05df4dff 100644
--- a/rust/sedona-pointcloud/src/lib.rs
+++ b/rust/sedona-pointcloud/src/lib.rs
@@ -16,4 +16,3 @@
 // under the License.
 
 pub mod las;
-pub mod options;
diff --git a/rust/sedona-pointcloud/src/options.rs 
b/rust/sedona-pointcloud/src/options.rs
deleted file mode 100644
index 51e5067b..00000000
--- a/rust/sedona-pointcloud/src/options.rs
+++ /dev/null
@@ -1,103 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{fmt::Display, str::FromStr};
-
-use datafusion_common::{
-    config::{ConfigExtension, ConfigField, Visit},
-    error::DataFusionError,
-    extensions_options,
-};
-
-use crate::las::options::{LasExtraBytes, LasOptions};
-
-/// Geometry representation
-#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
-pub enum GeometryEncoding {
-    /// Use plain coordinates as three fields `x`, `y`, `z` with datatype 
Float64 encoding.
-    #[default]
-    Plain,
-    /// Resolves the coordinates to a fields `geometry` with WKB encoding.
-    Wkb,
-    /// Resolves the coordinates to a fields `geometry` with separated 
GeoArrow encoding.
-    Native,
-}
-
-impl Display for GeometryEncoding {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            GeometryEncoding::Plain => f.write_str("plain"),
-            GeometryEncoding::Wkb => f.write_str("wkb"),
-            GeometryEncoding::Native => f.write_str("native"),
-        }
-    }
-}
-
-impl FromStr for GeometryEncoding {
-    type Err = String;
-
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        match s.to_lowercase().as_str() {
-            "plain" => Ok(Self::Plain),
-            "wkb" => Ok(Self::Wkb),
-            "native" => Ok(Self::Native),
-            s => Err(format!("Unable to parse from `{s}`")),
-        }
-    }
-}
-
-impl ConfigField for GeometryEncoding {
-    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static 
str) {
-        v.some(
-            &format!("{key}.geometry_encoding"),
-            self,
-            "Specify point geometry encoding",
-        );
-    }
-
-    fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
-        *self = value.parse().map_err(DataFusionError::Configuration)?;
-        Ok(())
-    }
-}
-
-extensions_options! {
-    /// Pointcloud configuration options
-    pub struct PointcloudOptions {
-        pub geometry_encoding: GeometryEncoding, default = 
GeometryEncoding::default()
-        pub collect_statistics: bool, default = false
-        pub persist_statistics: bool, default = false
-        pub las: LasOptions, default = LasOptions::default()
-    }
-
-}
-
-impl ConfigExtension for PointcloudOptions {
-    const PREFIX: &'static str = "pointcloud";
-}
-
-impl PointcloudOptions {
-    pub fn with_geometry_encoding(mut self, geometry_encoding: 
GeometryEncoding) -> Self {
-        self.geometry_encoding = geometry_encoding;
-        self
-    }
-
-    pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
-        self.las.extra_bytes = extra_bytes;
-        self
-    }
-}
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 1664173a..43a84393 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -60,12 +60,9 @@ use sedona_geoparquet::{
     provider::{geoparquet_listing_table, GeoParquetReadOptions},
 };
 #[cfg(feature = "pointcloud")]
-use sedona_pointcloud::{
-    las::{
-        format::{Extension, LasFormatFactory},
-        options::LasExtraBytes,
-    },
-    options::{GeometryEncoding, PointcloudOptions},
+use sedona_pointcloud::las::{
+    format::{Extension, LasFormatFactory},
+    options::{GeometryEncoding, LasExtraBytes, LasOptions},
 };
 
 /// Sedona SessionContext wrapper
@@ -120,7 +117,7 @@ impl SedonaContext {
 
         #[cfg(feature = "pointcloud")]
         let session_config = session_config.with_option_extension(
-            PointcloudOptions::default()
+            LasOptions::default()
                 .with_geometry_encoding(GeometryEncoding::Wkb)
                 .with_las_extra_bytes(LasExtraBytes::Typed),
         );


Reply via email to