This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch support-multipart
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git

commit f8953a74fe51de4d77410d20e81707737cb2f299
Author: Pranav Toggi <[email protected]>
AuthorDate: Sat Oct 25 14:30:51 2025 -0700

    allow generating multi-part zone
---
 tpchgen-cli/src/main.rs              | 14 ++++-----
 tpchgen-cli/src/zone/config.rs       |  8 +++--
 tpchgen-cli/src/zone/datasource.rs   | 22 +++++++++-----
 tpchgen-cli/src/zone/main.rs         | 44 ++++++++++++++++++++--------
 tpchgen-cli/src/zone/mod.rs          | 57 +++++++++++++++++++++++++++++-------
 tpchgen-cli/src/zone/partition.rs    | 42 ++++++++++++++++++++------
 tpchgen-cli/src/zone/transform.rs    |  6 +---
 tpchgen-cli/src/zone/writer.rs       | 16 ++++------
 tpchgen-cli/tests/cli_integration.rs | 10 +++----
 9 files changed, 151 insertions(+), 68 deletions(-)

diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs
index a671404..c225e6a 100644
--- a/tpchgen-cli/src/main.rs
+++ b/tpchgen-cli/src/main.rs
@@ -60,6 +60,12 @@ use ::parquet::basic::Compression;
 use clap::builder::TypedValueParser;
 use clap::{Parser, ValueEnum};
 use log::{debug, info, LevelFilter};
+use std::fmt::Display;
+use std::fs::{self, File};
+use std::io::{self, BufWriter, Stdout, Write};
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::time::Instant;
 use tpchgen::distribution::Distributions;
 use tpchgen::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
@@ -69,12 +75,6 @@ use tpchgen::text::TextPool;
 use tpchgen_arrow::{
     BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, 
VehicleArrow,
 };
-use std::fmt::Display;
-use std::fs::{self, File};
-use std::io::{self, BufWriter, Stdout, Write};
-use std::path::PathBuf;
-use std::str::FromStr;
-use std::time::Instant;
 
 #[derive(Parser)]
 #[command(name = "tpchgen")]
@@ -423,7 +423,7 @@ impl Cli {
             self.parquet_row_group_bytes,
             self.parquet_compression,
         )
-            .await
+        .await
     }
 
     define_generate!(
diff --git a/tpchgen-cli/src/zone/config.rs b/tpchgen-cli/src/zone/config.rs
index c760dc9..2da6d5e 100644
--- a/tpchgen-cli/src/zone/config.rs
+++ b/tpchgen-cli/src/zone/config.rs
@@ -1,6 +1,6 @@
-use std::path::PathBuf;
 use anyhow::{anyhow, Result};
 use parquet::basic::Compression as ParquetCompression;
+use std::path::PathBuf;
 
 #[derive(Clone)]
 pub struct ZoneDfArgs {
@@ -43,6 +43,10 @@ impl ZoneDfArgs {
     }
 
     pub fn output_filename(&self) -> PathBuf {
-        self.output_dir.join("zone.parquet")
+        if self.parts > 1 {
+            self.output_dir.join(format!("zone.{}.parquet", self.part))
+        } else {
+            self.output_dir.join("zone.parquet")
+        }
     }
 }
diff --git a/tpchgen-cli/src/zone/datasource.rs 
b/tpchgen-cli/src/zone/datasource.rs
index 2be0ebf..2d61b87 100644
--- a/tpchgen-cli/src/zone/datasource.rs
+++ b/tpchgen-cli/src/zone/datasource.rs
@@ -1,4 +1,3 @@
-use std::sync::Arc;
 use anyhow::Result;
 use datafusion::{
     common::config::ConfigOptions,
@@ -7,6 +6,7 @@ use datafusion::{
 };
 use log::{debug, info};
 use object_store::http::HttpBuilder;
+use std::sync::Arc;
 use url::Url;
 
 use super::stats::ZoneTableStats;
@@ -35,12 +35,13 @@ impl ZoneDataSource {
     }
 
     pub fn create_context(&self) -> Result<SessionContext> {
-        let cfg = ConfigOptions::new();
+        let mut cfg = ConfigOptions::new();
 
-        let ctx = SessionContext::new_with_config_rt(
-            SessionConfig::from(cfg),
-            Arc::clone(&self.runtime),
-        );
+        // Avoid parallelism to ensure ordering of source data
+        cfg.execution.target_partitions = 1;
+
+        let ctx =
+            SessionContext::new_with_config_rt(SessionConfig::from(cfg), 
Arc::clone(&self.runtime));
 
         debug!("Created DataFusion session context");
         Ok(ctx)
@@ -52,7 +53,10 @@ impl ZoneDataSource {
         scale_factor: f64,
     ) -> Result<DataFrame> {
         let parquet_urls = self.generate_parquet_urls();
-        info!("Reading {} Parquet parts from Hugging Face...", 
parquet_urls.len());
+        info!(
+            "Reading {} Parquet parts from Hugging Face...",
+            parquet_urls.len()
+        );
 
         let df = ctx
             .read_parquet(parquet_urls, ParquetReadOptions::default())
@@ -71,6 +75,10 @@ impl ZoneDataSource {
         let df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
         info!("Applied subtype and is_land filters");
 
+        // Sort by 'id' to ensure deterministic ordering regardless of 
parallelism
+        // let df = df.sort(vec![col("id").sort(true, false)])?;
+        // info!("Sorted by id for deterministic ordering");
+
         Ok(df)
     }
 
diff --git a/tpchgen-cli/src/zone/main.rs b/tpchgen-cli/src/zone/main.rs
index e5cc176..b37d97c 100644
--- a/tpchgen-cli/src/zone/main.rs
+++ b/tpchgen-cli/src/zone/main.rs
@@ -1,6 +1,7 @@
+use log::info;
+use parquet::basic::Compression as ParquetCompression;
 use std::io;
 use std::path::PathBuf;
-use parquet::basic::Compression as ParquetCompression;
 
 use super::config::ZoneDfArgs;
 
@@ -16,17 +17,36 @@ pub async fn generate_zone(
 ) -> io::Result<()> {
     match format {
         OutputFormat::Parquet => {
-            let args = ZoneDfArgs::new(
-                1.0f64.max(scale_factor),
-                output_dir,
-                parts.unwrap_or(1),
-                part.unwrap_or(1),
-                parquet_row_group_bytes,
-                parquet_compression,
-            );
-            super::generate_zone_parquet(args)
-                .await
-                .map_err(io::Error::other)
+            let parts = parts.unwrap_or(1);
+
+            if part.is_some() {
+                // Single part mode - use LIMIT/OFFSET
+                let args = ZoneDfArgs::new(
+                    1.0f64.max(scale_factor),
+                    output_dir,
+                    parts,
+                    part.unwrap(),
+                    parquet_row_group_bytes,
+                    parquet_compression,
+                );
+                super::generate_zone_parquet_single(args)
+                    .await
+                    .map_err(io::Error::other)
+            } else {
+                // Multi-part mode - collect once and partition in memory
+                info!("Generating all {} part(s) for zone table", parts);
+                let args = ZoneDfArgs::new(
+                    1.0f64.max(scale_factor),
+                    output_dir,
+                    parts,
+                    1, // dummy value, not used in multi mode
+                    parquet_row_group_bytes,
+                    parquet_compression,
+                );
+                super::generate_zone_parquet_multi(args)
+                    .await
+                    .map_err(io::Error::other)
+            }
         }
         _ => Err(io::Error::new(
             io::ErrorKind::InvalidInput,
diff --git a/tpchgen-cli/src/zone/mod.rs b/tpchgen-cli/src/zone/mod.rs
index fc0f89f..f1b2f3c 100644
--- a/tpchgen-cli/src/zone/mod.rs
+++ b/tpchgen-cli/src/zone/mod.rs
@@ -9,8 +9,8 @@ mod writer;
 
 pub mod main;
 
-use std::sync::Arc;
 use anyhow::Result;
+use std::sync::Arc;
 
 pub use config::ZoneDfArgs;
 use datasource::ZoneDataSource;
@@ -19,22 +19,18 @@ use stats::ZoneTableStats;
 use transform::ZoneTransformer;
 use writer::ParquetWriter;
 
-pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
+/// Generate a single part using LIMIT/OFFSET on the dataframe
+pub async fn generate_zone_parquet_single(args: ZoneDfArgs) -> Result<()> {
     args.validate()?;
 
     let stats = ZoneTableStats::new(args.scale_factor, args.parts);
     let datasource = ZoneDataSource::new().await?;
     let ctx = datasource.create_context()?;
 
-    let df = datasource
-        .load_zone_data(&ctx, args.scale_factor)
-        .await?;
+    let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
 
-    let partition = PartitionStrategy::calculate(
-        stats.estimated_total_rows(),
-        args.parts,
-        args.part,
-    );
+    let partition =
+        PartitionStrategy::calculate(stats.estimated_total_rows(), args.parts, 
args.part);
 
     let df = partition.apply_to_dataframe(df)?;
 
@@ -46,8 +42,47 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> 
Result<()> {
     let batches = df.collect().await?;
 
     let writer = ParquetWriter::new(&args, &stats, schema);
-
     writer.write(&batches)?;
 
     Ok(())
 }
+
+/// Generate all parts by collecting once and partitioning in memory
+pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) -> Result<()> {
+    let stats = ZoneTableStats::new(args.scale_factor, args.parts);
+    let datasource = ZoneDataSource::new().await?;
+    let ctx = datasource.create_context()?;
+
+    let df = datasource.load_zone_data(&ctx, args.scale_factor).await?;
+
+    // Transform without offset (we'll adjust per-part later)
+    let transformer = ZoneTransformer::new(0);
+    let df = transformer.transform(&ctx, df).await?;
+
+    // Collect once
+    let schema = Arc::new(transformer.arrow_schema(&df)?);
+    let batches = df.collect().await?;
+
+    // Calculate total rows
+    let total_rows: i64 = batches.iter().map(|b| b.num_rows() as i64).sum();
+
+    // Write each part
+    for part in 1..=args.parts {
+        let partition = PartitionStrategy::calculate(total_rows, args.parts, 
part);
+        let partitioned_batches = partition.apply_to_batches(&batches)?;
+
+        let part_args = ZoneDfArgs::new(
+            args.scale_factor,
+            args.output_dir.clone(),
+            args.parts,
+            part,
+            args.parquet_row_group_bytes,
+            args.parquet_compression,
+        );
+
+        let writer = ParquetWriter::new(&part_args, &stats, schema.clone());
+        writer.write(&partitioned_batches)?;
+    }
+
+    Ok(())
+}
diff --git a/tpchgen-cli/src/zone/partition.rs 
b/tpchgen-cli/src/zone/partition.rs
index 8ea54ea..a27f656 100644
--- a/tpchgen-cli/src/zone/partition.rs
+++ b/tpchgen-cli/src/zone/partition.rs
@@ -1,3 +1,4 @@
+use arrow_array::RecordBatch;
 use datafusion::prelude::*;
 use log::info;
 
@@ -19,17 +20,10 @@ impl PartitionStrategy {
 
         info!(
             "Partition: total={}, parts={}, part={}, offset={}, limit={}",
-            total_rows,
-            parts,
-            part,
-            offset,
-            limit
+            total_rows, parts, part, offset, limit
         );
 
-        Self {
-            offset,
-            limit,
-        }
+        Self { offset, limit }
     }
 
     pub fn offset(&self) -> i64 {
@@ -39,6 +33,36 @@ impl PartitionStrategy {
     pub fn apply_to_dataframe(&self, df: DataFrame) -> 
datafusion::common::Result<DataFrame> {
         df.limit(self.offset as usize, Some(self.limit as usize))
     }
+
+    /// Apply partition to already-collected batches
+    pub fn apply_to_batches(&self, batches: &[RecordBatch]) -> 
anyhow::Result<Vec<RecordBatch>> {
+        let mut result = Vec::new();
+        let mut current_offset = 0i64;
+        let end_offset = self.offset + self.limit;
+
+        for batch in batches {
+            let batch_rows = batch.num_rows() as i64;
+            let batch_end = current_offset + batch_rows;
+
+            if batch_end <= self.offset || current_offset >= end_offset {
+                current_offset = batch_end;
+                continue;
+            }
+
+            let start_in_batch = 
(self.offset.saturating_sub(current_offset)).max(0) as usize;
+            let end_in_batch = ((end_offset - current_offset).min(batch_rows)) 
as usize;
+            let length = end_in_batch - start_in_batch;
+
+            if length > 0 {
+                let sliced = batch.slice(start_in_batch, length);
+                result.push(sliced);
+            }
+
+            current_offset = batch_end;
+        }
+
+        Ok(result)
+    }
 }
 
 #[cfg(test)]
diff --git a/tpchgen-cli/src/zone/transform.rs 
b/tpchgen-cli/src/zone/transform.rs
index e2f423b..74d9365 100644
--- a/tpchgen-cli/src/zone/transform.rs
+++ b/tpchgen-cli/src/zone/transform.rs
@@ -12,11 +12,7 @@ impl ZoneTransformer {
         Self { offset }
     }
 
-    pub async fn transform(
-        &self,
-        ctx: &SessionContext,
-        df: DataFrame,
-    ) -> Result<DataFrame> {
+    pub async fn transform(&self, ctx: &SessionContext, df: DataFrame) -> 
Result<DataFrame> {
         ctx.register_table(TableReference::bare("zone_filtered"), 
df.into_view())?;
         debug!("Registered filtered data as 'zone_filtered' table");
 
diff --git a/tpchgen-cli/src/zone/writer.rs b/tpchgen-cli/src/zone/writer.rs
index fe273a9..627566b 100644
--- a/tpchgen-cli/src/zone/writer.rs
+++ b/tpchgen-cli/src/zone/writer.rs
@@ -1,12 +1,9 @@
-use std::{path::PathBuf, sync::Arc, time::Instant};
 use anyhow::Result;
 use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use log::{debug, info};
-use parquet::{
-    arrow::ArrowWriter,
-    file::properties::WriterProperties,
-};
+use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
+use std::{path::PathBuf, sync::Arc, time::Instant};
 
 use super::config::ZoneDfArgs;
 use super::stats::ZoneTableStats;
@@ -20,10 +17,8 @@ pub struct ParquetWriter {
 
 impl ParquetWriter {
     pub fn new(args: &ZoneDfArgs, stats: &ZoneTableStats, schema: SchemaRef) 
-> Self {
-        let rows_per_group = stats.compute_rows_per_group(
-            args.parquet_row_group_bytes,
-            128 * 1024 * 1024,
-        );
+        let rows_per_group =
+            stats.compute_rows_per_group(args.parquet_row_group_bytes, 128 * 
1024 * 1024);
 
         let props = WriterProperties::builder()
             .set_compression(args.parquet_compression)
@@ -46,7 +41,8 @@ impl ParquetWriter {
 
         let t0 = Instant::now();
         let file = std::fs::File::create(&self.output_path)?;
-        let mut writer = ArrowWriter::try_new(file, Arc::clone(&self.schema), 
Some(self.props.clone()))?;
+        let mut writer =
+            ArrowWriter::try_new(file, Arc::clone(&self.schema), 
Some(self.props.clone()))?;
 
         for batch in batches {
             writer.write(batch)?;
diff --git a/tpchgen-cli/tests/cli_integration.rs 
b/tpchgen-cli/tests/cli_integration.rs
index bdedc3b..6916b8e 100644
--- a/tpchgen-cli/tests/cli_integration.rs
+++ b/tpchgen-cli/tests/cli_integration.rs
@@ -2,14 +2,14 @@ use arrow_array::RecordBatch;
 use assert_cmd::Command;
 use parquet::arrow::arrow_reader::{ArrowReaderOptions, 
ParquetRecordBatchReaderBuilder};
 use parquet::file::metadata::ParquetMetaDataReader;
-use tpchgen::generators::TripGenerator;
-use tpchgen_arrow::{RecordBatchIterator, TripArrow};
 use std::fs;
 use std::fs::File;
 use std::io::Read;
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use tempfile::tempdir;
+use tpchgen::generators::TripGenerator;
+use tpchgen_arrow::{RecordBatchIterator, TripArrow};
 
 /// Test TBL output for scale factor 0.51 and 0.001 using tpchgen-cli
 /// A scale factor of 0.51 is used because a sf of 0.5 and below will yield 0 
results in the Building table
@@ -106,7 +106,7 @@ async fn test_zone_deterministic_parts_generation() {
         .assert()
         .success();
 
-    let zone_file1 = temp_dir1.path().join("zone.parquet");
+    let zone_file1 = temp_dir1.path().join("zone.1.parquet");
 
     // Reference file is a sf=0.01 zone table with z_boundary column removed
     let reference_file = PathBuf::from("../tpchgen/data/sf-v1/zone.parquet");
@@ -362,7 +362,7 @@ async fn test_zone_write_parquet_row_group_size_default() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone",
+            table: "zone.1",
             row_group_bytes: vec![86288517],
         }],
     );
@@ -442,7 +442,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() {
     expect_row_group_sizes(
         output_dir.path(),
         vec![RowGroups {
-            table: "zone",
+            table: "zone.1",
             row_group_bytes: vec![15428592, 17250042, 19338201, 17046885, 
17251978],
         }],
     );

Reply via email to