This is an automated email from the ASF dual-hosted git repository. prantogg pushed a commit to branch update-zone-generation in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit f55a16d513780a442c65b02d303f7fd371af1afd Author: Pranav Toggi <[email protected]> AuthorDate: Mon Sep 29 09:45:01 2025 -0700 add row group size configurability --- spatialbench-cli/src/zone_df.rs | 168 ++++++++++++++++++++++++++++++---------- spatialbench/src/generators.rs | 3 +- 2 files changed, 126 insertions(+), 45 deletions(-) diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs index 2a7a4cb..9ca27be 100644 --- a/spatialbench-cli/src/zone_df.rs +++ b/spatialbench-cli/src/zone_df.rs @@ -1,4 +1,3 @@ -// spatialbench-cli/src/zone_df.rs use std::{path::PathBuf, sync::Arc, time::Instant}; use anyhow::{anyhow, Result}; @@ -10,7 +9,7 @@ use datafusion::{ }; use datafusion::execution::runtime_env::RuntimeEnv; -use log::info; +use log::{debug, info}; use object_store::aws::AmazonS3Builder; use object_store::ObjectStore; use parquet::{ @@ -67,24 +66,41 @@ fn estimated_total_rows_for_sf(sf: f64) -> i64 { } } -fn parquet_writer_props(comp: ParquetCompression) -> WriterProperties { - WriterProperties::builder().set_compression(comp).build() +fn get_zone_table_stats(sf: f64) -> (f64, i64) { + // Returns (size_in_gb, total_rows) for the given scale factor + if sf < 10.0 { + (1.42, 156_095) + } else if sf < 100.0 { + (5.68, 455_711) + } else { + (6.13, 1_035_371) + } } -fn approx_bytes_per_row(batches: &[RecordBatch]) -> f64 { - let mut rows = 0usize; - let mut bytes = 0usize; - for b in batches { - rows += b.num_rows(); - for col in b.columns() { - bytes += col.get_array_memory_size(); - } - } - if rows == 0 { - 0.0 - } else { - bytes as f64 / rows as f64 +fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64, target_bytes: i64) -> usize { + let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to bytes + let bytes_per_row = total_bytes / total_rows as f64; + + debug!("Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row", + size_gb, total_rows, bytes_per_row); + + if bytes_per_row <= 0.0 { + return 128_000; // fallback } + + let est = (target_bytes as f64 / bytes_per_row).floor(); + // Keep RG count <= 32k, but avoid too-tiny RGs + est.max(10_000.0).min(10_000_000.0) as usize +} + +fn writer_props_with_rowgroup( + comp: ParquetCompression, + rows_per_group: usize, +) -> WriterProperties { + WriterProperties::builder() + .set_compression(comp) + .set_max_row_group_size(rows_per_group) // <-- the key line + .build() } fn write_parquet_with_rowgroup_bytes( @@ -92,32 +108,19 @@ fn write_parquet_with_rowgroup_bytes( schema: SchemaRef, all_batches: Vec<RecordBatch>, target_rowgroup_bytes: i64, - props: WriterProperties, + comp: ParquetCompression, + scale_factor: f64, ) -> Result<()> { - let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, schema, Some(props))?; + let (size_gb, total_rows) = get_zone_table_stats(scale_factor); + let rows_per_group = compute_rows_per_group_from_stats(size_gb, total_rows, target_rowgroup_bytes); + let props = writer_props_with_rowgroup(comp, rows_per_group); - if all_batches.is_empty() { - writer.close()?; - return Ok(()); - } + debug!("Using row group size: {} rows (based on hardcoded stats)", rows_per_group); - let bpr = approx_bytes_per_row(&all_batches); - let rows_per_group: usize = if bpr > 0.0 { - (target_rowgroup_bytes as f64 / bpr) - .floor() - .max(10_000.0) - .min(1_000_000.0) as usize - } else { - 128_000 - }; + let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?, schema, Some(props))?; for batch in all_batches { - let mut start = 0usize; - while start < batch.num_rows() { - let end = (start + rows_per_group).min(batch.num_rows()); - writer.write(&batch.slice(start, end - start))?; - start = end; - } + writer.write(&batch)?; } writer.close()?; Ok(()) @@ -152,13 +155,35 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { )); } + info!( + "Starting zone parquet generation with scale factor {}", + args.scale_factor + ); + debug!("Zone generation args: parts={}, part={}, output_dir={:?}, row_group_bytes={}, compression={:?}", + args.parts, args.part, args.output_dir, args.parquet_row_group_bytes, args.parquet_compression); + + let subtypes = subtypes_for_scale_factor(args.scale_factor); + info!( + "Selected subtypes for SF {}: {:?}", + args.scale_factor, subtypes + ); + + let estimated_rows = estimated_total_rows_for_sf(args.scale_factor); + info!( + "Estimated total rows for SF {}: {}", + args.scale_factor, estimated_rows + ); + let mut cfg = ConfigOptions::new(); cfg.execution.target_partitions = 1; + debug!("Created DataFusion config with target_partitions=1"); let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?); + debug!("Built DataFusion runtime environment"); // Register S3 store for Overture bucket (object_store 0.11) let bucket = OVERTURE_S3_BUCKET; // "overturemaps-us-west-2" + info!("Registering S3 store for bucket: {}", bucket); let s3 = AmazonS3Builder::new() .with_bucket_name(bucket) .with_skip_signature(true) @@ -168,27 +193,50 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { let s3_url = Url::parse(&format!("s3://{bucket}"))?; let s3_store: Arc<dyn ObjectStore> = Arc::new(s3); rt.register_object_store(&s3_url, s3_store); + debug!("Successfully registered S3 object store"); let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt); + debug!("Created DataFusion session context"); let url = zones_parquet_url(); + info!("Reading parquet data from: {}", url); + let t_read_start = Instant::now(); let mut df = ctx.read_parquet(url, ParquetReadOptions::default()).await?; + let read_dur = t_read_start.elapsed(); + info!("Successfully read parquet data in {:?}", read_dur); + // Build filter predicate + debug!("Building filter predicate for subtypes: {:?}", subtypes); let mut pred = col("subtype").eq(lit("__never__")); for s in subtypes_for_scale_factor(args.scale_factor) { pred = pred.or(col("subtype").eq(lit(s))); } df = df.filter(pred.and(col("is_land").eq(lit(true))))?; + info!("Applied subtype and is_land filters"); + + // df = df.sort(vec![col("id").sort(true, true)])?; + // debug!("Applied sorting by id"); - df = df.sort(vec![col("id").sort(true, true)])?; let total = estimated_total_rows_for_sf(args.scale_factor); let parts = args.parts as i64; let this = (args.part as i64) - 1; let rows_per_part = (total + parts - 1) / parts; let offset = this * rows_per_part; + + info!( + "Partitioning data: total_rows={}, parts={}, rows_per_part={}, offset={}", + total, parts, rows_per_part, offset + ); + df = df.limit(offset as usize, Some(rows_per_part as usize))?; + debug!( + "Applied limit with offset={}, rows={}", + offset, rows_per_part + ); ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?; + debug!("Registered filtered data as 'zone_filtered' table"); + let sql = format!( r#" SELECT @@ -202,15 +250,33 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { FROM zone_filtered "# ); + debug!("Executing SQL transformation with offset: {}", offset); let df2 = ctx.sql(&sql).await?; + info!("SQL transformation completed successfully"); let t0 = Instant::now(); + info!("Starting data collection..."); let batches = df2.clone().collect().await?; let collect_dur = t0.elapsed(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + info!( + "Collected {} record batches with {} total rows in {:?}", + batches.len(), + total_rows, + collect_dur + ); + std::fs::create_dir_all(&args.output_dir)?; + debug!("Created output directory: {:?}", args.output_dir); + let out = args.output_filename(); - let props = parquet_writer_props(args.parquet_compression); + info!("Writing output to: {}", out.display()); + + debug!( + "Created parquet writer properties with compression: {:?}", + args.parquet_compression + ); // Convert DFSchema to Arrow Schema let schema = Arc::new(Schema::new( @@ -220,18 +286,34 @@ pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> { .map(|f| f.as_ref().clone()) .collect::<Vec<_>>(), )); + debug!( + "Converted DataFusion schema to Arrow schema with {} fields", + schema.fields().len() + ); let t1 = Instant::now(); - write_parquet_with_rowgroup_bytes(&out, schema, batches, args.parquet_row_group_bytes, props)?; + info!( + "Starting parquet file write with row group size: {} bytes", + args.parquet_row_group_bytes + ); + write_parquet_with_rowgroup_bytes( + &out, + schema, + batches, + args.parquet_row_group_bytes, + args.parquet_compression, + args.scale_factor, + )?; let write_dur = t1.elapsed(); info!( - "Zone -> {} (part {}/{}). collect={:?}, write={:?}", + "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}", out.display(), args.part, args.parts, collect_dur, - write_dur + write_dur, + total_rows ); Ok(()) diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs index 5144217..476cbd6 100644 --- a/spatialbench/src/generators.rs +++ b/spatialbench/src/generators.rs @@ -1,4 +1,4 @@ -//! Generators for each TPC-H Tables +//! Generators for each Spatial Bench Tables use crate::dates; use crate::dates::{GenerateUtils, TPCHDate}; use crate::decimal::TPCHDecimal; @@ -15,7 +15,6 @@ use crate::spatial::utils::{hash_to_unit_u64, spider_seed_for_index}; use crate::spatial::{ContinentAffines, SpatialDefaults, SpatialGenerator}; use crate::text::TextPool; use geo::Point; -use geozero::ToGeo; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::convert::TryInto;
