This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/main by this push:
new 6084a14 Use DataFusion for Zone table generation (#46)
6084a14 is described below
commit 6084a14e019449511741000c0272c050e4654a64
Author: Pranav Toggi <[email protected]>
AuthorDate: Mon Oct 13 22:33:04 2025 -0700
Use DataFusion for Zone table generation (#46)
* init: use datafusion for Zone
* add row group size configurability
* Add debug statements; use 128MB as default row group size
* fix stats
* Clamp sf to minimum 1
* fix row group size estimation and partitioning
* fix zone table naming for partitioned outputs
* add unit tests
* add integration tests
* fmt fix
* Update spatialbench-cli/tests/cli_integration.rs
Co-authored-by: Copilot <[email protected]>
---------
Co-authored-by: Copilot <[email protected]>
---
spatialbench-arrow/src/lib.rs | 2 -
spatialbench-arrow/src/zone.rs | 108 -------
spatialbench-arrow/tests/reparse.rs | 8 +-
spatialbench-cli/Cargo.toml | 5 +
spatialbench-cli/src/csv.rs | 4 +-
spatialbench-cli/src/main.rs | 33 +-
spatialbench-cli/src/plan.rs | 6 +-
spatialbench-cli/src/tbl.rs | 2 -
spatialbench-cli/src/zone_df.rs | 501 ++++++++++++++++++++++++++++++
spatialbench-cli/tests/cli_integration.rs | 192 +++++++++++-
spatialbench/data/sf-v1/zone.parquet | Bin 0 -> 88467 bytes
spatialbench/src/csv.rs | 49 +--
spatialbench/src/generators.rs | 412 +-----------------------
spatialbench/tests/integration_tests.rs | 17 -
14 files changed, 726 insertions(+), 613 deletions(-)
diff --git a/spatialbench-arrow/src/lib.rs b/spatialbench-arrow/src/lib.rs
index eb8f66d..ac6fb78 100644
--- a/spatialbench-arrow/src/lib.rs
+++ b/spatialbench-arrow/src/lib.rs
@@ -41,7 +41,6 @@ mod customer;
mod driver;
mod trip;
mod vehicle;
-mod zone;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
@@ -50,7 +49,6 @@ pub use customer::CustomerArrow;
pub use driver::DriverArrow;
pub use trip::TripArrow;
pub use vehicle::VehicleArrow;
-pub use zone::ZoneArrow;
/// Iterator of Arrow [`RecordBatch`] that also knows its schema
pub trait RecordBatchIterator: Iterator<Item = RecordBatch> + Send {
diff --git a/spatialbench-arrow/src/zone.rs b/spatialbench-arrow/src/zone.rs
deleted file mode 100644
index 06794b3..0000000
--- a/spatialbench-arrow/src/zone.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-use crate::conversions::string_view_array_from_display_iter;
-use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator};
-use arrow::array::{BinaryArray, Int64Array, RecordBatch};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use geozero::{CoordDimensions, ToWkb};
-use spatialbench::generators::{ZoneGenerator, ZoneGeneratorIterator};
-use std::sync::{Arc, LazyLock};
-
-/// Generate [`Zone`]s in [`RecordBatch`] format
-///
-/// [`Zone`]: spatialbench::generators::Zone
-///
-/// # Example
-/// ```
-/// # use spatialbench::generators::{ZoneGenerator};
-/// # use spatialbench_arrow::ZoneArrow;
-///
-/// // Create a SF=1.0 generator and wrap it in an Arrow generator
-/// let generator = ZoneGenerator::new(0.001, 1, 1);
-/// let mut arrow_generator = ZoneArrow::new(generator)
-/// .with_batch_size(10);
-/// // Read the first 10 batches
-/// let batch = arrow_generator.next().unwrap();
-/// // compare the output by pretty printing it
-/// let formatted_batches =
arrow::util::pretty::pretty_format_batches(&[batch])
-/// .unwrap()
-/// .to_string();
-/// ```
-pub struct ZoneArrow {
- inner: ZoneGeneratorIterator,
- batch_size: usize,
-}
-
-impl ZoneArrow {
- pub fn new(generator: ZoneGenerator) -> Self {
- let inner = generator.clone().into_iter();
- Self {
- inner,
- batch_size: DEFAULT_BATCH_SIZE,
- }
- }
-
- pub fn with_batch_size(mut self, batch_size: usize) -> Self {
- self.batch_size = batch_size;
- self
- }
-}
-
-impl RecordBatchIterator for ZoneArrow {
- fn schema(&self) -> &SchemaRef {
- &ZONE_SCHEMA
- }
-}
-
-impl Iterator for ZoneArrow {
- type Item = RecordBatch;
-
- fn next(&mut self) -> Option<Self::Item> {
- // Get next rows to convert
- let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect();
- if rows.is_empty() {
- return None;
- }
-
- let z_zonekey = Int64Array::from_iter_values(rows.iter().map(|r|
r.z_zonekey));
- let z_gersid = string_view_array_from_display_iter(rows.iter().map(|r|
&r.z_gersid));
- let z_country =
string_view_array_from_display_iter(rows.iter().map(|r| &r.z_country));
- let z_region = string_view_array_from_display_iter(rows.iter().map(|r|
&r.z_region));
- let z_name = string_view_array_from_display_iter(rows.iter().map(|r|
&r.z_name));
- let z_subtype =
string_view_array_from_display_iter(rows.iter().map(|r| &r.z_subtype));
-
- // Convert geo::Polygon to WKB binary format
- let z_boundary = BinaryArray::from_iter_values(rows.iter().map(|r| {
- r.z_boundary
- .to_wkb(CoordDimensions::xy())
- .expect("Failed to encode WKB")
- }));
-
- let batch = RecordBatch::try_new(
- Arc::clone(self.schema()),
- vec![
- Arc::new(z_zonekey),
- Arc::new(z_gersid),
- Arc::new(z_country),
- Arc::new(z_region),
- Arc::new(z_name),
- Arc::new(z_subtype),
- Arc::new(z_boundary),
- ],
- )
- .unwrap();
- Some(batch)
- }
-}
-
-/// Schema for the Zone
-static ZONE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_zone_schema);
-fn make_zone_schema() -> SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("z_zonekey", DataType::Int64, false),
- Field::new("z_gersid", DataType::Utf8View, false),
- Field::new("z_country", DataType::Utf8View, false),
- Field::new("z_region", DataType::Utf8View, false),
- Field::new("z_name", DataType::Utf8View, false),
- Field::new("z_subtype", DataType::Utf8View, false),
- Field::new("z_boundary", DataType::Binary, false),
- ]))
-}
diff --git a/spatialbench-arrow/tests/reparse.rs
b/spatialbench-arrow/tests/reparse.rs
index 49c87e3..889933a 100644
--- a/spatialbench-arrow/tests/reparse.rs
+++ b/spatialbench-arrow/tests/reparse.rs
@@ -3,14 +3,13 @@
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
-use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv,
VehicleCsv, ZoneCsv};
+use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv,
VehicleCsv};
use spatialbench::generators::{
Building, BuildingGenerator, Customer, CustomerGenerator, Driver,
DriverGenerator, Trip,
- TripGenerator, Vehicle, VehicleGenerator, Zone, ZoneGenerator,
+ TripGenerator, Vehicle, VehicleGenerator,
};
use spatialbench_arrow::{
BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow,
VehicleArrow,
- ZoneArrow,
};
use std::io::Write;
use std::sync::Arc;
@@ -49,8 +48,6 @@ test_row_type!(trip_tbl, TripGenerator, TripArrow,
Test::tbl());
test_row_type!(trip_csv, TripGenerator, TripArrow, Test::csv());
test_row_type!(building_tbl, BuildingGenerator, BuildingArrow, Test::tbl());
test_row_type!(building_csv, BuildingGenerator, BuildingArrow, Test::csv());
-test_row_type!(zone_tbl, ZoneGenerator, ZoneArrow, Test::tbl());
-test_row_type!(zone_csv, ZoneGenerator, ZoneArrow, Test::csv());
/// Common trait for writing rows in TBL and CSV format
trait RowType {
@@ -84,7 +81,6 @@ impl_row_type!(Vehicle<'_>, VehicleCsv);
impl_row_type!(Driver, DriverCsv);
impl_row_type!(Trip, TripCsv);
impl_row_type!(Building<'_>, BuildingCsv);
-impl_row_type!(Zone, ZoneCsv);
#[derive(Debug, Clone, Copy)]
#[allow(clippy::upper_case_acronyms)]
diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml
index a77c23d..1180f39 100644
--- a/spatialbench-cli/Cargo.toml
+++ b/spatialbench-cli/Cargo.toml
@@ -23,6 +23,11 @@ env_logger = "0.11.7"
serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
serde_yaml = "0.9.33"
+datafusion = "47.0.0"
+object_store = { version = "0.12.4", features = ["aws"] }
+arrow-array = "55.2.0"
+arrow-schema = "55.2.0"
+url = "2.5.7"
[dev-dependencies]
assert_cmd = "2.0"
diff --git a/spatialbench-cli/src/csv.rs b/spatialbench-cli/src/csv.rs
index 7f9ca3b..78f93e5 100644
--- a/spatialbench-cli/src/csv.rs
+++ b/spatialbench-cli/src/csv.rs
@@ -1,9 +1,8 @@
//! Implementations of [`Source`] for generating data in TBL format
use super::generate::Source;
-use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv,
VehicleCsv, ZoneCsv};
+use spatialbench::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv,
VehicleCsv};
use spatialbench::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
- ZoneGenerator,
};
use std::io::Write;
@@ -45,4 +44,3 @@ define_csv_source!(DriverCsvSource, DriverGenerator<'static>,
DriverCsv);
define_csv_source!(CustomerCsvSource, CustomerGenerator<'static>, CustomerCsv);
define_csv_source!(TripCsvSource, TripGenerator, TripCsv);
define_csv_source!(BuildingCsvSource, BuildingGenerator<'static>, BuildingCsv);
-define_csv_source!(ZoneCsvSource, ZoneGenerator, ZoneCsv);
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 8067a45..8050974 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -47,6 +47,7 @@ mod plan;
mod spatial_config_file;
mod statistics;
mod tbl;
+mod zone_df;
use crate::csv::*;
use crate::generate::{generate_in_chunks, Sink, Source};
@@ -62,13 +63,11 @@ use log::{debug, info, LevelFilter};
use spatialbench::distribution::Distributions;
use spatialbench::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
- ZoneGenerator,
};
use spatialbench::spatial::overrides::{set_overrides, SpatialOverrides};
use spatialbench::text::TextPool;
use spatialbench_arrow::{
BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow,
VehicleArrow,
- ZoneArrow,
};
use std::fmt::Display;
use std::fs::{self, File};
@@ -408,6 +407,28 @@ impl Cli {
Ok(())
}
+ async fn generate_zone(&self) -> io::Result<()> {
+ match self.format {
+ OutputFormat::Parquet => {
+ let args = zone_df::ZoneDfArgs {
+ scale_factor: 1.0f64.max(self.scale_factor),
+ output_dir: self.output_dir.clone(),
+ parts: self.parts.unwrap_or(1),
+ part: self.part.unwrap_or(1),
+ parquet_row_group_bytes: self.parquet_row_group_bytes,
+ parquet_compression: self.parquet_compression,
+ };
+ zone_df::generate_zone_parquet(args)
+ .await
+ .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
+ }
+ _ => Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ "Zone table is only supported in --format=parquet (via
DataFusion/S3).",
+ )),
+ }
+ }
+
define_generate!(
generate_vehicle,
Table::Vehicle,
@@ -448,14 +469,6 @@ impl Cli {
BuildingCsvSource,
BuildingArrow
);
- define_generate!(
- generate_zone,
- Table::Zone,
- ZoneGenerator,
- ZoneTblSource,
- ZoneCsvSource,
- ZoneArrow
- );
/// return the output filename for the given table
fn output_filename(&self, table: Table) -> String {
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 926f379..809814f 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -4,7 +4,6 @@ use crate::{OutputFormat, Table};
use log::debug;
use spatialbench::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
- ZoneGenerator,
};
use std::fmt::Display;
use std::ops::RangeInclusive;
@@ -329,10 +328,7 @@ impl OutputSize {
Table::Customer =>
CustomerGenerator::calculate_row_count(scale_factor, 1, 1),
Table::Trip => TripGenerator::calculate_row_count(scale_factor, 1,
1),
Table::Building =>
BuildingGenerator::calculate_row_count(scale_factor, 1, 1),
- Table::Zone => {
- let generator = ZoneGenerator::new(scale_factor, 1, 1);
- generator.calculate_row_count()
- }
+ Table::Zone => todo!(),
}
}
}
diff --git a/spatialbench-cli/src/tbl.rs b/spatialbench-cli/src/tbl.rs
index b7019c7..8eeb448 100644
--- a/spatialbench-cli/src/tbl.rs
+++ b/spatialbench-cli/src/tbl.rs
@@ -3,7 +3,6 @@
use super::generate::Source;
use spatialbench::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
- ZoneGenerator,
};
use std::io::Write;
@@ -43,4 +42,3 @@ define_tbl_source!(DriverTblSource, DriverGenerator<'static>);
define_tbl_source!(CustomerTblSource, CustomerGenerator<'static>);
define_tbl_source!(TripTblSource, TripGenerator);
define_tbl_source!(BuildingTblSource, BuildingGenerator<'static>);
-define_tbl_source!(ZoneTblSource, ZoneGenerator);
diff --git a/spatialbench-cli/src/zone_df.rs b/spatialbench-cli/src/zone_df.rs
new file mode 100644
index 0000000..464b61d
--- /dev/null
+++ b/spatialbench-cli/src/zone_df.rs
@@ -0,0 +1,501 @@
+use std::{path::PathBuf, sync::Arc, time::Instant};
+
+use anyhow::{anyhow, Result};
+use arrow_array::RecordBatch;
+use arrow_schema::{Schema, SchemaRef};
+use datafusion::{
+ common::config::ConfigOptions, execution::runtime_env::RuntimeEnvBuilder,
prelude::*,
+ sql::TableReference,
+};
+
+use crate::plan::DEFAULT_PARQUET_ROW_GROUP_BYTES;
+use datafusion::execution::runtime_env::RuntimeEnv;
+use log::{debug, info};
+use object_store::aws::AmazonS3Builder;
+use object_store::ObjectStore;
+use parquet::{
+ arrow::ArrowWriter, basic::Compression as ParquetCompression,
+ file::properties::WriterProperties,
+};
+use url::Url;
+
+const OVERTURE_RELEASE_DATE: &str = "2025-08-20.1";
+const OVERTURE_S3_BUCKET: &str = "overturemaps-us-west-2";
+const OVERTURE_S3_PREFIX: &str = "release";
+
+fn zones_parquet_url() -> String {
+ format!(
+ "s3://{}/{}/{}/theme=divisions/type=division_area/",
+ OVERTURE_S3_BUCKET, OVERTURE_S3_PREFIX, OVERTURE_RELEASE_DATE
+ )
+}
+
+fn subtypes_for_scale_factor(sf: f64) -> Vec<&'static str> {
+ let mut v = vec!["microhood", "macrohood", "county"];
+ if sf >= 10.0 {
+ v.push("neighborhood");
+ }
+ if sf >= 100.0 {
+ v.extend_from_slice(&["localadmin", "locality", "region",
"dependency"]);
+ }
+ if sf >= 1000.0 {
+ v.push("country");
+ }
+ v
+}
+
+fn estimated_total_rows_for_sf(sf: f64) -> i64 {
+ let mut total = 0i64;
+ for s in subtypes_for_scale_factor(sf) {
+ total += match s {
+ "microhood" => 74797,
+ "macrohood" => 42619,
+ "neighborhood" => 298615,
+ "county" => 38679,
+ "localadmin" => 19007,
+ "locality" => 555834,
+ "region" => 3905,
+ "dependency" => 53,
+ "country" => 219,
+ _ => 0,
+ };
+ }
+ if sf < 1.0 {
+ (total as f64 * sf).ceil() as i64
+ } else {
+ total
+ }
+}
+
+fn get_zone_table_stats(sf: f64) -> (f64, i64) {
+ // Returns (size_in_gb, total_rows) for the given scale factor
+ if sf < 1.0 {
+ (0.92 * sf, (156_095.0 * sf).ceil() as i64)
+ } else if sf < 10.0 {
+ (1.42, 156_095)
+ } else if sf < 100.0 {
+ (2.09, 454_710)
+ } else if sf < 1000.0 {
+ (5.68, 1_033_456)
+ } else {
+ (6.13, 1_033_675)
+ }
+}
+
+fn compute_rows_per_group_from_stats(size_gb: f64, total_rows: i64,
target_bytes: i64) -> usize {
+ let total_bytes = size_gb * 1024.0 * 1024.0 * 1024.0; // Convert GB to
bytes
+ let bytes_per_row = total_bytes / total_rows as f64;
+
+ // Use default if target_bytes is not specified or invalid
+ let effective_target = if target_bytes <= 0 {
+ DEFAULT_PARQUET_ROW_GROUP_BYTES
+ } else {
+ target_bytes
+ };
+
+ debug!(
+ "Using hardcoded stats: {:.2} GB, {} rows, {:.2} bytes/row, target: {}
bytes",
+ size_gb, total_rows, bytes_per_row, effective_target
+ );
+
+ let est = (effective_target as f64 / bytes_per_row).floor();
+ // Keep RG count <= 32k, but avoid too-tiny RGs
+ est.clamp(1000.0, 32767.0) as usize
+}
+
+fn writer_props_with_rowgroup(comp: ParquetCompression, rows_per_group: usize)
-> WriterProperties {
+ WriterProperties::builder()
+ .set_compression(comp)
+ .set_max_row_group_size(rows_per_group)
+ .build()
+}
+
+fn write_parquet_with_rowgroup_bytes(
+ out_path: &PathBuf,
+ schema: SchemaRef,
+ all_batches: Vec<RecordBatch>,
+ target_rowgroup_bytes: i64,
+ comp: ParquetCompression,
+ scale_factor: f64,
+ parts: i32,
+) -> Result<()> {
+ let (mut size_gb, mut total_rows) = get_zone_table_stats(scale_factor);
+
+ // Use linear scaling stats for SF <= 1.0 with parts > 1
+ if scale_factor <= 1.0 && parts > 1 {
+ (size_gb, total_rows) = get_zone_table_stats(scale_factor / parts as
f64);
+ }
+
+ debug!(
+ "size_gb={}, total_rows={} for scale_factor={}",
+ size_gb, total_rows, scale_factor
+ );
+ let rows_per_group =
+ compute_rows_per_group_from_stats(size_gb, total_rows,
target_rowgroup_bytes);
+ let props = writer_props_with_rowgroup(comp, rows_per_group);
+
+ debug!(
+ "Using row group size: {} rows (based on hardcoded stats)",
+ rows_per_group
+ );
+
+ let mut writer = ArrowWriter::try_new(std::fs::File::create(out_path)?,
schema, Some(props))?;
+
+ for batch in all_batches {
+ writer.write(&batch)?;
+ }
+ writer.close()?;
+ Ok(())
+}
+
+#[derive(Clone)]
+pub struct ZoneDfArgs {
+ pub scale_factor: f64,
+ pub output_dir: PathBuf,
+ pub parts: i32,
+ pub part: i32,
+ pub parquet_row_group_bytes: i64,
+ pub parquet_compression: ParquetCompression,
+}
+
+impl ZoneDfArgs {
+ fn output_filename(&self) -> PathBuf {
+ let filename = "zone.parquet".to_string();
+ self.output_dir.join(filename)
+ }
+}
+
+pub async fn generate_zone_parquet(args: ZoneDfArgs) -> Result<()> {
+ if args.part < 1 || args.part > args.parts {
+ return Err(anyhow!(
+ "Invalid --part={} for --parts={}",
+ args.part,
+ args.parts
+ ));
+ }
+
+ info!(
+ "Starting zone parquet generation with scale factor {}",
+ args.scale_factor
+ );
+ debug!("Zone generation args: parts={}, part={}, output_dir={:?},
row_group_bytes={}, compression={:?}",
+ args.parts, args.part, args.output_dir,
args.parquet_row_group_bytes, args.parquet_compression);
+
+ let subtypes = subtypes_for_scale_factor(args.scale_factor);
+ info!(
+ "Selected subtypes for SF {}: {:?}",
+ args.scale_factor, subtypes
+ );
+
+ let estimated_rows = estimated_total_rows_for_sf(args.scale_factor);
+ info!(
+ "Estimated total rows for SF {}: {}",
+ args.scale_factor, estimated_rows
+ );
+
+ let mut cfg = ConfigOptions::new();
+ cfg.execution.target_partitions = 1;
+ debug!("Created DataFusion config with target_partitions=1");
+
+ let rt: Arc<RuntimeEnv> = Arc::new(RuntimeEnvBuilder::new().build()?);
+ debug!("Built DataFusion runtime environment");
+
+ // Register S3 store for Overture bucket
+ let bucket = OVERTURE_S3_BUCKET;
+ info!("Registering S3 store for bucket: {}", bucket);
+ let s3 = AmazonS3Builder::new()
+ .with_bucket_name(bucket)
+ .with_skip_signature(true)
+ .with_region("us-west-2")
+ .build()?;
+
+ let s3_url = Url::parse(&format!("s3://{bucket}"))?;
+ let s3_store: Arc<dyn ObjectStore> = Arc::new(s3);
+ rt.register_object_store(&s3_url, s3_store);
+ debug!("Successfully registered S3 object store");
+
+ let ctx = SessionContext::new_with_config_rt(SessionConfig::from(cfg), rt);
+ debug!("Created DataFusion session context");
+
+ let url = zones_parquet_url();
+ info!("Reading parquet data from: {}", url);
+ let t_read_start = Instant::now();
+ let mut df = ctx.read_parquet(url, ParquetReadOptions::default()).await?;
+ let read_dur = t_read_start.elapsed();
+ info!("Successfully read parquet data in {:?}", read_dur);
+
+ // Build filter predicate
+ debug!("Building filter predicate for subtypes: {:?}", subtypes);
+ let mut pred = col("subtype").eq(lit("__never__"));
+ for s in subtypes_for_scale_factor(args.scale_factor) {
+ pred = pred.or(col("subtype").eq(lit(s)));
+ }
+ df = df.filter(pred.and(col("is_land").eq(lit(true))))?;
+ info!("Applied subtype and is_land filters");
+
+ // df = df.sort(vec![col("id").sort(true, true)])?;
+ // debug!("Applied sorting by id");
+
+ let total = estimated_total_rows_for_sf(args.scale_factor);
+ let i = (args.part as i64) - 1; // 0-based part index
+ let parts = args.parts as i64;
+
+ let base = total / parts;
+ let rem = total % parts;
+
+ // first `rem` parts get one extra row
+ let rows_this = base + if i < rem { 1 } else { 0 };
+ let offset = i * base + std::cmp::min(i, rem);
+
+ info!(
+ "Partitioning data: total_rows={}, parts={}, base={}, rem={},
this_part_rows={}, offset={}",
+ total, parts, base, rem, rows_this, offset
+ );
+
+ df = df.limit(offset as usize, Some(rows_this as usize))?;
+ debug!("Applied limit with offset={}, rows={}", offset, rows_this);
+
+ ctx.register_table(TableReference::bare("zone_filtered"), df.into_view())?;
+ debug!("Registered filtered data as 'zone_filtered' table");
+
+ let sql = format!(
+ r#"
+ SELECT
+ CAST(ROW_NUMBER() OVER (ORDER BY id) + {offset} AS BIGINT) AS
z_zonekey,
+ COALESCE(id, '') AS z_gersid,
+ COALESCE(country, '') AS z_country,
+ COALESCE(region, '') AS z_region,
+ COALESCE(names.primary, '') AS z_name,
+ COALESCE(subtype, '') AS z_subtype,
+ geometry AS z_boundary
+ FROM zone_filtered
+ "#
+ );
+ debug!("Executing SQL transformation with offset: {}", offset);
+ let df2 = ctx.sql(&sql).await?;
+ info!("SQL transformation completed successfully");
+
+ let t0 = Instant::now();
+ info!("Starting data collection...");
+ let batches = df2.clone().collect().await?;
+ let collect_dur = t0.elapsed();
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+ info!(
+ "Collected {} record batches with {} total rows in {:?}",
+ batches.len(),
+ total_rows,
+ collect_dur
+ );
+
+ std::fs::create_dir_all(&args.output_dir)?;
+ debug!("Created output directory: {:?}", args.output_dir);
+
+ let out = args.output_filename();
+ info!("Writing output to: {}", out.display());
+
+ debug!(
+ "Created parquet writer properties with compression: {:?}",
+ args.parquet_compression
+ );
+
+ // Convert DFSchema to Arrow Schema
+ let schema = Arc::new(Schema::new(
+ df2.schema()
+ .fields()
+ .iter()
+ .map(|f| f.as_ref().clone())
+ .collect::<Vec<_>>(),
+ ));
+ debug!(
+ "Converted DataFusion schema to Arrow schema with {} fields",
+ schema.fields().len()
+ );
+
+ let t1 = Instant::now();
+ info!(
+ "Starting parquet file write with row group size: {} bytes",
+ args.parquet_row_group_bytes
+ );
+ write_parquet_with_rowgroup_bytes(
+ &out,
+ schema,
+ batches,
+ args.parquet_row_group_bytes,
+ args.parquet_compression,
+ args.scale_factor,
+ args.parts,
+ )?;
+ let write_dur = t1.elapsed();
+
+ info!(
+ "Zone -> {} (part {}/{}). collect={:?}, write={:?}, total_rows={}",
+ out.display(),
+ args.part,
+ args.parts,
+ collect_dur,
+ write_dur,
+ total_rows
+ );
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use parquet::basic::Compression;
+ use tempfile::TempDir;
+
+ fn create_test_args(scale_factor: f64, temp_dir: &TempDir) -> ZoneDfArgs {
+ ZoneDfArgs {
+ scale_factor,
+ output_dir: temp_dir.path().to_path_buf(),
+ parts: 1,
+ part: 1,
+ parquet_row_group_bytes: DEFAULT_PARQUET_ROW_GROUP_BYTES,
+ parquet_compression: Compression::SNAPPY,
+ }
+ }
+
+ #[tokio::test]
+ async fn test_zone_generation_invalid_part() {
+ let temp_dir = TempDir::new().unwrap();
+ let mut args = create_test_args(1.0, &temp_dir);
+ args.parts = 2;
+ args.part = 3; // Invalid part number
+
+ let result = generate_zone_parquet(args).await;
+ assert!(result.is_err(), "Should fail with invalid part number");
+ }
+
+ #[tokio::test]
+ async fn test_subtypes_for_different_scale_factors() {
+ // Test scale factor categorization
+ let sf_01_subtypes = subtypes_for_scale_factor(0.1);
+ assert_eq!(sf_01_subtypes, vec!["microhood", "macrohood", "county"]);
+
+ let sf_10_subtypes = subtypes_for_scale_factor(10.0);
+ assert_eq!(
+ sf_10_subtypes,
+ vec!["microhood", "macrohood", "county", "neighborhood"]
+ );
+
+ let sf_100_subtypes = subtypes_for_scale_factor(100.0);
+ assert!(sf_100_subtypes.contains(&"localadmin"));
+ assert!(sf_100_subtypes.contains(&"locality"));
+
+ let sf_1000_subtypes = subtypes_for_scale_factor(1000.0);
+ assert!(sf_1000_subtypes.contains(&"country"));
+ }
+
+ #[test]
+ fn test_partition_distribution_logic() {
+ // Test the mathematical logic for distributing rows across partitions
+ let total_rows = 100i64;
+ let parts = 3i64;
+
+ let mut collected_rows = Vec::new();
+ let mut collected_offsets = Vec::new();
+
+ // Simulate the partition calculation for each part
+ for part_idx in 0..parts {
+ let i = part_idx;
+ let base = total_rows / parts;
+ let rem = total_rows % parts;
+ let rows_this = base + if i < rem { 1 } else { 0 };
+ let offset = i * base + std::cmp::min(i, rem);
+
+ collected_rows.push(rows_this);
+ collected_offsets.push(offset);
+ }
+
+ // Verify partitioning logic
+ assert_eq!(collected_rows.iter().sum::<i64>(), total_rows); // All
rows accounted for
+ assert_eq!(collected_offsets[0], 0); // First partition starts at 0
+
+ // Verify no gaps or overlaps between partitions
+ for i in 1..parts as usize {
+ let expected_offset = collected_offsets[i - 1] + collected_rows[i
- 1];
+ assert_eq!(collected_offsets[i], expected_offset);
+ }
+
+ // Verify remainder distribution (first partitions get extra rows)
+ let remainder = (total_rows % parts) as usize;
+ for i in 0..remainder {
+ assert_eq!(collected_rows[i], collected_rows[remainder] + 1);
+ }
+ }
+
+ #[test]
+ fn test_rows_per_group_bounds() {
+ // Test that compute_rows_per_group_from_stats respects bounds
+
+ // Test minimum bound (should be at least 1000)
+ let rows_per_group_tiny = compute_rows_per_group_from_stats(0.001,
1000, 1_000_000);
+ assert!(rows_per_group_tiny >= 1000);
+
+ // Test maximum bound (should not exceed 32767)
+ let rows_per_group_huge = compute_rows_per_group_from_stats(1000.0,
1000, 1);
+ assert!(rows_per_group_huge <= 32767);
+
+ // Test negative target bytes falls back to default
+ let rows_per_group_negative = compute_rows_per_group_from_stats(1.0,
100000, -1);
+ let rows_per_group_default =
+ compute_rows_per_group_from_stats(1.0, 100000,
DEFAULT_PARQUET_ROW_GROUP_BYTES);
+ assert_eq!(rows_per_group_negative, rows_per_group_default);
+ }
+
+ #[test]
+ fn test_subtype_selection_logic() {
+ // Test the cumulative nature of subtype selection
+ let base_subtypes = subtypes_for_scale_factor(1.0);
+ let sf10_subtypes = subtypes_for_scale_factor(10.0);
+ let sf100_subtypes = subtypes_for_scale_factor(100.0);
+ let sf1000_subtypes = subtypes_for_scale_factor(1000.0);
+
+ // Each higher scale factor should include all previous subtypes
+ for subtype in &base_subtypes {
+ assert!(sf10_subtypes.contains(subtype));
+ assert!(sf100_subtypes.contains(subtype));
+ assert!(sf1000_subtypes.contains(subtype));
+ }
+
+ for subtype in &sf10_subtypes {
+ assert!(sf100_subtypes.contains(subtype));
+ assert!(sf1000_subtypes.contains(subtype));
+ }
+
+ for subtype in &sf100_subtypes {
+ assert!(sf1000_subtypes.contains(subtype));
+ }
+
+ // Verify progressive addition
+ assert!(sf10_subtypes.len() > base_subtypes.len());
+ assert!(sf100_subtypes.len() > sf10_subtypes.len());
+ assert!(sf1000_subtypes.len() > sf100_subtypes.len());
+ }
+
+ #[test]
+ fn test_estimated_rows_scaling_consistency() {
+ // Test that estimated rows scale proportionally for SF < 1.0
+ let base_rows = estimated_total_rows_for_sf(1.0);
+ let half_rows = estimated_total_rows_for_sf(0.5);
+ let quarter_rows = estimated_total_rows_for_sf(0.25);
+
+ // Should scale proportionally (within rounding)
+ assert!((half_rows as f64 - (base_rows as f64 * 0.5)).abs() < 1.0);
+ assert!((quarter_rows as f64 - (base_rows as f64 * 0.25)).abs() < 1.0);
+
+ // Test that SF >= 1.0 gives discrete jumps (not proportional scaling)
+ let sf1_rows = estimated_total_rows_for_sf(1.0);
+ let sf5_rows = estimated_total_rows_for_sf(5.0);
+ let sf10_rows = estimated_total_rows_for_sf(10.0);
+
+ // These should be equal (same category)
+ assert_eq!(sf1_rows, sf5_rows);
+
+ // This should be different (different category)
+ assert_ne!(sf5_rows, sf10_rows);
+ }
+}
diff --git a/spatialbench-cli/tests/cli_integration.rs
b/spatialbench-cli/tests/cli_integration.rs
index 5cfaf91..fa42806 100644
--- a/spatialbench-cli/tests/cli_integration.rs
+++ b/spatialbench-cli/tests/cli_integration.rs
@@ -1,3 +1,4 @@
+use arrow_array::RecordBatch;
use assert_cmd::Command;
use parquet::arrow::arrow_reader::{ArrowReaderOptions,
ParquetRecordBatchReaderBuilder};
use parquet::file::metadata::ParquetMetaDataReader;
@@ -6,7 +7,7 @@ use spatialbench_arrow::{RecordBatchIterator, TripArrow};
use std::fs;
use std::fs::File;
use std::io::Read;
-use std::path::Path;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::tempdir;
@@ -83,6 +84,112 @@ fn test_spatialbench_cli_tbl_scale_factor_v1() {
}
}
+/// Test zone parquet output determinism - same data should be generated every
time
+#[tokio::test]
+async fn test_zone_deterministic_parts_generation() {
+ let temp_dir1 = tempdir().expect("Failed to create temporary directory 1");
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("parquet")
+ .arg("--scale-factor")
+ .arg("1.0")
+ .arg("--output-dir")
+ .arg(temp_dir1.path())
+ .arg("--tables")
+ .arg("zone")
+ .arg("--parts")
+ .arg("100")
+ .arg("--part")
+ .arg("1")
+ .assert()
+ .success();
+
+ let zone_file1 = temp_dir1.path().join("zone.parquet");
+
+ // Reference file is a sf=0.01 zone table with z_boundary column removed
+ let reference_file =
PathBuf::from("../spatialbench/data/sf-v1/zone.parquet");
+
+ assert!(
+ zone_file1.exists(),
+ "First zone.parquet file was not created"
+ );
+ assert!(
+ reference_file.exists(),
+ "Reference zone.parquet file does not exist"
+ );
+
+ let file1 = File::open(&zone_file1).expect("Failed to open generated
zone.parquet file");
+ let file2 = File::open(&reference_file).expect("Failed to open reference
zone.parquet file");
+
+ let reader1 = ParquetRecordBatchReaderBuilder::try_new(file1)
+ .expect("Failed to create reader for generated file")
+ .build()
+ .expect("Failed to build reader for generated file");
+
+ let reader2 = ParquetRecordBatchReaderBuilder::try_new(file2)
+ .expect("Failed to create reader for reference file")
+ .build()
+ .expect("Failed to build reader for reference file");
+
+ let batches1: Result<Vec<RecordBatch>, _> = reader1.collect();
+ let batches2: Result<Vec<RecordBatch>, _> = reader2.collect();
+
+ let batches1 = batches1.expect("Failed to read batches from generated
file");
+ let batches2 = batches2.expect("Failed to read batches from reference
file");
+
+ // Check that files are non-empty
+ assert!(
+ !batches1.is_empty(),
+ "Generated zone parquet file has no data"
+ );
+ assert!(
+ !batches2.is_empty(),
+ "Reference zone parquet file has no data"
+ );
+
+ // Check that both files have the same number of batches
+ assert_eq!(
+ batches1.len(),
+ batches2.len(),
+ "Different number of record batches"
+ );
+
+ // Compare each batch, excluding z_boundary column
+ for (i, (batch1, batch2)) in
batches1.iter().zip(batches2.iter()).enumerate() {
+ assert_eq!(
+ batch1.num_rows(),
+ batch2.num_rows(),
+ "Batch {} has different number of rows",
+ i
+ );
+
+ let schema1 = batch1.schema();
+
+ // Compare all columns except z_boundary
+ for field in schema1.fields() {
+ let column_name = field.name();
+ if column_name == "z_boundary" {
+ continue;
+ }
+
+ let col1 = batch1
+ .column_by_name(column_name)
+ .unwrap_or_else(|| panic!("Column {} not found in generated
file", column_name));
+ let col2 = batch2
+ .column_by_name(column_name)
+ .unwrap_or_else(|| panic!("Column {} not found in reference
file", column_name));
+
+ assert_eq!(
+ col1, col2,
+ "Column {} differs between generated and reference files in
batch {}",
+ column_name, i
+ );
+ }
+ }
+}
+
/// Test generating the trip table using --parts and --part options
#[test]
fn test_spatialbench_cli_parts() {
@@ -231,6 +338,36 @@ async fn test_write_parquet_row_group_size_default() {
);
}
+#[tokio::test]
+async fn test_zone_write_parquet_row_group_size_default() {
+ // Run the CLI command to generate parquet data with default settings
+ let output_dir = tempdir().unwrap();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("parquet")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("zone")
+ .arg("--output-dir")
+ .arg(output_dir.path())
+ .arg("--parts")
+ .arg("10")
+ .arg("--part")
+ .arg("1")
+ .assert()
+ .success();
+
+ expect_row_group_sizes(
+ output_dir.path(),
+ vec![RowGroups {
+ table: "zone",
+ row_group_bytes: vec![91351103],
+ }],
+ );
+}
+
#[tokio::test]
async fn test_write_parquet_row_group_size_20mb() {
// Run the CLI command to generate parquet data with larger row group size
@@ -279,6 +416,38 @@ async fn test_write_parquet_row_group_size_20mb() {
);
}
+#[tokio::test]
+async fn test_zone_write_parquet_row_group_size_20mb() {
+ // Run the CLI command to generate parquet data with larger row group size
+ let output_dir = tempdir().unwrap();
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("parquet")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("zone")
+ .arg("--output-dir")
+ .arg(output_dir.path())
+ .arg("--parquet-row-group-bytes")
+ .arg("20000000") // 20 MB
+ .arg("--parts")
+ .arg("10")
+ .arg("--part")
+ .arg("1")
+ .assert()
+ .success();
+
+ expect_row_group_sizes(
+ output_dir.path(),
+ vec![RowGroups {
+ table: "zone",
+ row_group_bytes: vec![16284828, 19041211, 20977976, 17291992,
18079175],
+ }],
+ );
+}
+
#[test]
fn test_spatialbench_cli_part_no_parts() {
let temp_dir = tempdir().expect("Failed to create temporary directory");
@@ -402,6 +571,27 @@ async fn test_incompatible_options_warnings() {
));
}
+#[test]
+fn test_zone_generation_tbl_fails() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("zone")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .failure()
+ .stderr(predicates::str::contains(
+ "Zone table is only supported in --format=parquet",
+ ));
+}
+
fn read_gzipped_file_to_string<P: AsRef<Path>>(path: P) -> Result<String,
std::io::Error> {
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
diff --git a/spatialbench/data/sf-v1/zone.parquet
b/spatialbench/data/sf-v1/zone.parquet
new file mode 100644
index 0000000..0b1740d
Binary files /dev/null and b/spatialbench/data/sf-v1/zone.parquet differ
diff --git a/spatialbench/src/csv.rs b/spatialbench/src/csv.rs
index b9b5a9d..37eb9ab 100644
--- a/spatialbench/src/csv.rs
+++ b/spatialbench/src/csv.rs
@@ -1,6 +1,6 @@
//! CSV formatting support for the row struct objects generated by the library.
-use crate::generators::{Building, Customer, Driver, Trip, Vehicle, Zone};
+use crate::generators::{Building, Customer, Driver, Trip, Vehicle};
use core::fmt;
use std::fmt::Display;
@@ -259,50 +259,3 @@ impl Display for BuildingCsv<'_> {
)
}
}
-
-/// Write [`Zone`]s in CSV format.
-///
-/// # Example
-/// ```
-/// # use spatialbench::generators::ZoneGenerator;
-/// # use spatialbench::csv::ZoneCsv;
-/// # use std::fmt::Write;
-/// // Output the first 3 rows in CSV format
-/// let generator = ZoneGenerator::new(0.001, 1, 1);
-/// let mut csv = String::new();
-/// writeln!(&mut csv, "{}", ZoneCsv::header()).unwrap(); // write header
-/// for line in generator.iter().take(3) {
-/// // write line using CSV formatter
-/// writeln!(&mut csv, "{}", ZoneCsv::new(line)).unwrap();
-/// }
-/// ```
-pub struct ZoneCsv {
- inner: Zone,
-}
-
-impl ZoneCsv {
- pub fn new(inner: Zone) -> Self {
- Self { inner }
- }
-
- /// Returns the CSV header for the Zone table
- pub fn header() -> &'static str {
- "z_zonekey,z_gersid,z_country,z_region,z_name,z_subtype,z_boundary"
- }
-}
-
-impl Display for ZoneCsv {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(
- f,
- "{},{},{},{},{},{},\"{:?}\"",
- self.inner.z_zonekey,
- self.inner.z_gersid,
- self.inner.z_country,
- self.inner.z_region,
- self.inner.z_name,
- self.inner.z_subtype,
- self.inner.z_boundary,
- )
- }
-}
diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs
index 109c0ce..476cbd6 100644
--- a/spatialbench/src/generators.rs
+++ b/spatialbench/src/generators.rs
@@ -1,4 +1,4 @@
-//! Generators for each TPC-H Tables
+//! Generators for each Spatial Bench Tables
use crate::dates;
use crate::dates::{GenerateUtils, TPCHDate};
use crate::decimal::TPCHDecimal;
@@ -14,17 +14,12 @@ use crate::spatial::utils::continent::{build_continent_cdf,
WeightedTarget};
use crate::spatial::utils::{hash_to_unit_u64, spider_seed_for_index};
use crate::spatial::{ContinentAffines, SpatialDefaults, SpatialGenerator};
use crate::text::TextPool;
-use duckdb::Connection;
-use geo::Geometry;
use geo::Point;
-use geozero::{wkb::Wkb, ToGeo};
-use log::{debug, error, info};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::convert::TryInto;
use std::fmt;
use std::fmt::Display;
-use std::time::Instant;
/// A Vehicle Manufacturer, formatted as `"Manufacturer#<n>"`
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -1439,337 +1434,6 @@ impl<'a> Iterator for BuildingGeneratorIterator<'a> {
}
}
-/// Represents a Zone in the dataset
-#[derive(Debug, Clone, PartialEq)]
-pub struct Zone {
- /// Primary key
- pub z_zonekey: i64,
- /// GERS ID of the zone
- pub z_gersid: String,
- /// Country of the zone
- pub z_country: String,
- /// Region of the zone
- pub z_region: String,
- /// Name of the zone
- pub z_name: String,
- /// Subtype of the zone
- pub z_subtype: String,
- /// Boundary geometry in WKT format
- pub z_boundary: Geometry,
-}
-
-impl Display for Zone {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(
- f,
- "{}|{}|{}|{}|{}|{}|{:?}|",
- self.z_zonekey,
- self.z_gersid,
- self.z_country,
- self.z_region,
- self.z_name,
- self.z_subtype,
- self.z_boundary
- )
- }
-}
-
-/// Generator for [`Zone`]s that loads from a parquet file in S3
-#[derive(Debug, Clone)]
-pub struct ZoneGenerator {
- scale_factor: f64,
- part: i32,
- part_count: i32,
-}
-
-impl ZoneGenerator {
- /// S3 URL for the zones parquet file
- const OVERTURE_RELEASE_DATE: &'static str = "2025-08-20.1";
- const OVERTURE_S3_BUCKET: &'static str = "overturemaps-us-west-2";
- const OVERTURE_S3_PREFIX: &'static str = "release";
-
- /// Gets the S3 URL for the zones parquet file
- fn get_zones_parquet_url() -> String {
- format!(
- "s3://{}/{}/{}/theme=divisions/type=division_area/*",
- Self::OVERTURE_S3_BUCKET,
- Self::OVERTURE_S3_PREFIX,
- Self::OVERTURE_RELEASE_DATE
- )
- }
-
- /// Get zone subtypes based on scale factor
- fn get_zone_subtypes_for_scale_factor(scale_factor: f64) -> Vec<&'static
str> {
- let mut subtypes = vec!["microhood", "macrohood", "county"];
-
- if scale_factor >= 10.0 {
- subtypes.extend_from_slice(&["neighborhood"]);
- }
-
- if scale_factor >= 100.0 {
- subtypes.extend_from_slice(&["localadmin", "locality", "region",
"dependency"]);
- }
-
- if scale_factor >= 1000.0 {
- subtypes.push("country");
- }
-
- subtypes
- }
-
- /// Calculate total zones for a given scale factor based on subtype counts
- fn calculate_total_zones_for_scale_factor(scale_factor: f64) -> i64 {
- let subtypes = Self::get_zone_subtypes_for_scale_factor(scale_factor);
- let mut total = 0i64;
-
- for subtype in subtypes {
- let count = match subtype {
- "microhood" => 74797,
- "macrohood" => 42619,
- "neighborhood" => 298615,
- "county" => 39680,
- "localadmin" => 19007,
- "locality" => 555834,
- "region" => 4714,
- "dependency" => 105,
- "country" => 378,
- _ => 0,
- };
- total += count;
- }
-
- // Scale down for testing purposes
- if scale_factor < 1.0 {
- total = (total as f64 * scale_factor).ceil() as i64;
- }
-
- total
- }
-
- /// Create a new zone generator with streaming approach
- pub fn new(scale_factor: f64, part: i32, part_count: i32) -> Self {
- let start = Instant::now();
- info!(
- "Creating ZoneGenerator with scale_factor={}, part={},
part_count={}",
- scale_factor, part, part_count
- );
- let elapsed = start.elapsed();
- info!("ZoneGenerator created in {:?}", elapsed);
-
- Self {
- scale_factor,
- part,
- part_count,
- }
- }
-
- /// Calculate zones per partition
- fn calculate_zones_per_part(&self) -> i64 {
- let total_zones =
Self::calculate_total_zones_for_scale_factor(self.scale_factor);
- (total_zones as f64 / self.part_count as f64).ceil() as i64
- }
-
- /// Calculate offset for this partition
- fn calculate_offset(&self) -> i64 {
- let zones_per_part = self.calculate_zones_per_part();
- (self.part - 1) as i64 * zones_per_part
- }
-
- /// Load zones for this specific partition using LIMIT and OFFSET
- fn load_partition_zones(&self) -> Result<Vec<Zone>, Box<dyn
std::error::Error>> {
- info!(
- "Loading zones for partition {} of {}",
- self.part, self.part_count
- );
- let start_total = Instant::now();
-
- // Create a connection to DuckDB
- let t0 = Instant::now();
- let conn = Connection::open_in_memory()?;
- debug!("Opened DuckDB connection in {:?}", t0.elapsed());
-
- // Install and load required extensions
- let t1 = Instant::now();
- conn.execute_batch(
- r#"
- INSTALL httpfs;
- LOAD httpfs;
- INSTALL spatial;
- LOAD spatial;
-
- -- Public bucket: force unsigned requests
- SET s3_access_key_id = '';
- SET s3_secret_access_key = '';
- SET s3_session_token = '';
-
- -- Region + endpoint for the Overture bucket
- SET s3_region = 'us-west-2';
- SET s3_endpoint = 's3.us-west-2.amazonaws.com';
- "#,
- )?;
- debug!(
- "Installed and loaded DuckDB extensions in {:?}",
- t1.elapsed()
- );
-
- // Calculate partition parameters
- let zones_per_part = self.calculate_zones_per_part();
- let offset = self.calculate_offset();
- let zones_url = Self::get_zones_parquet_url();
- let subtypes =
Self::get_zone_subtypes_for_scale_factor(self.scale_factor);
-
- info!(
- "Partition {}: LIMIT {} OFFSET {} from {} with subtypes: {:?}",
- self.part, zones_per_part, offset, zones_url, subtypes
- );
-
- // Build the subtype filter
- let subtype_filter = if subtypes.is_empty() {
- return Err(format!(
- "No subtypes found for scale factor {} in partition {}. This
indicates a logic error.",
- self.scale_factor,
- self.part
- ).into());
- } else {
- format!(
- "subtype IN ({})",
- subtypes
- .iter()
- .map(|s| format!("'{}'", s))
- .collect::<Vec<_>>()
- .join(", ")
- )
- };
-
- // Combine subtype filter with is_land filter
- let combined_filter = format!("{} AND is_land = true", subtype_filter);
-
- let query = format!(
- "SELECT
- COALESCE(id, '') as z_gersid,
- COALESCE(country, '') as z_country,
- COALESCE(region, '') as z_region,
- COALESCE(names.primary, '') as z_name,
- COALESCE(subtype, '') as z_subtype,
- ST_AsWKB(geometry) as z_boundary
- FROM read_parquet('{}', hive_partitioning=1)
- WHERE {}
- LIMIT {} OFFSET {};",
- zones_url, combined_filter, zones_per_part, offset
- );
- debug!("Generated partition query: {}", query);
-
- // Prepare + execute query
- let t2 = Instant::now();
- let mut stmt = conn.prepare(&query)?;
- debug!("Prepared statement in {:?}", t2.elapsed());
-
- let t3 = Instant::now();
- let mut rows = stmt.query([])?;
- debug!("Executed query and got row iterator in {:?}", t3.elapsed());
-
- // Iterate rows and parse geometries
- let mut zones = Vec::new();
- let mut zone_id = offset + 1;
-
- let t4 = Instant::now();
- while let Ok(Some(row)) = rows.next() {
- let z_gersid: String = row.get(0)?;
- let z_country: String = row.get(1)?;
- let z_region: String = row.get(2)?;
- let z_name: String = row.get(3)?;
- let z_subtype: String = row.get(4)?;
- let wkb_bytes: Vec<u8> = row.get(5)?;
- let geometry: Geometry = Wkb(&wkb_bytes).to_geo()?;
-
- zones.push(Zone {
- z_zonekey: zone_id,
- z_gersid,
- z_country,
- z_region,
- z_name,
- z_subtype,
- z_boundary: geometry,
- });
-
- if zones.len() % 1000 == 0 {
- debug!("Loaded {} zones for partition {}", zones.len(),
self.part);
- }
- zone_id += 1;
- }
-
- info!(
- "Partition {} loaded: {} zones in {:?}",
- self.part,
- zones.len(),
- t4.elapsed()
- );
-
- info!("Total partition load took {:?}", start_total.elapsed());
- Ok(zones)
- }
-
- /// Return the row count for the given part
- pub fn calculate_row_count(&self) -> i64 {
- let total_zones =
Self::calculate_total_zones_for_scale_factor(self.scale_factor);
- let zones_per_part = self.calculate_zones_per_part();
- let offset = self.calculate_offset();
-
- // Don't exceed total available zones
- std::cmp::min(zones_per_part, total_zones - offset).max(0)
- }
-
- /// Returns an iterator over the zone rows
- pub fn iter(&self) -> ZoneGeneratorIterator {
- ZoneGeneratorIterator::new(self.clone())
- }
-}
-
-impl IntoIterator for ZoneGenerator {
- type Item = Zone;
- type IntoIter = ZoneGeneratorIterator;
-
- fn into_iter(self) -> Self::IntoIter {
- self.iter()
- }
-}
-
-/// Iterator that generates Zone rows by loading partition data on-demand
-#[derive(Debug)]
-pub struct ZoneGeneratorIterator {
- zones: Vec<Zone>,
- index: usize,
-}
-
-impl ZoneGeneratorIterator {
- fn new(generator: ZoneGenerator) -> Self {
- // Load zones for this partition only
- let zones = generator.load_partition_zones().unwrap_or_else(|e| {
- error!(
- "Failed to load zones for partition {}: {}",
- generator.part, e
- );
- Vec::new()
- });
-
- ZoneGeneratorIterator { zones, index: 0 }
- }
-}
-
-impl Iterator for ZoneGeneratorIterator {
- type Item = Zone;
-
- fn next(&mut self) -> Option<Self::Item> {
- if self.index >= self.zones.len() {
- return None;
- }
-
- let zone = self.zones[self.index].clone();
- self.index += 1;
- Some(zone)
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
@@ -1908,78 +1572,4 @@ mod tests {
assert_eq!(first.b_buildingkey, 2);
assert_eq!(first.to_string(), "2|blush|POLYGON((124.218033476
10.538071565,124.215762091 10.536069114,124.214352934
10.536014944,124.212486371 10.539913704,124.217919324
10.539075339,124.218033476 10.538071565))|")
}
-
- #[test]
- fn test_zone_generation() {
- // Create a generator with a small scale factor
- let generator = ZoneGenerator::new(0.001, 1, 1);
- let zones: Vec<_> = generator.into_iter().collect();
-
- assert_eq!(zones.len(), 158);
-
- // Check first zone
- let first = &zones[0];
- assert_eq!(first.z_zonekey, 1);
- // The first zone is now a county due to the is_land filter and county
being in base subtypes
- assert_eq!(first.z_subtype, "county");
- // Verify the string format matches the expected pattern (but don't
check exact content since it's dynamic)
- let expected_pattern = format!(
- "{}|{}|{}|{}|{}|{}|{:?}|",
- first.z_zonekey,
- first.z_gersid,
- first.z_country,
- first.z_region,
- first.z_name,
- first.z_subtype,
- first.z_boundary
- );
- assert_eq!(first.to_string(), expected_pattern);
- }
-
- #[test]
- fn test_zone_subtype_filters() {
- // Test scale factor 0-10: should include microhood, macrohood, and
county
- let subtypes_0_10 =
ZoneGenerator::get_zone_subtypes_for_scale_factor(5.0);
- assert_eq!(subtypes_0_10, vec!["microhood", "macrohood", "county"]);
-
- // Test scale factor 10-100: should include microhood, macrohood,
county, and neighborhood
- let subtypes_10_100 =
ZoneGenerator::get_zone_subtypes_for_scale_factor(50.0);
- assert_eq!(
- subtypes_10_100,
- vec!["microhood", "macrohood", "county", "neighborhood"]
- );
-
- // Test scale factor 100-1000: should include all except country
- let subtypes_100_1000 =
ZoneGenerator::get_zone_subtypes_for_scale_factor(500.0);
- assert_eq!(
- subtypes_100_1000,
- vec![
- "microhood",
- "macrohood",
- "county",
- "neighborhood",
- "localadmin",
- "locality",
- "region",
- "dependency"
- ]
- );
-
- // Test scale factor 1000+: should include all subtypes
- let subtypes_1000_plus =
ZoneGenerator::get_zone_subtypes_for_scale_factor(2000.0);
- assert_eq!(
- subtypes_1000_plus,
- vec![
- "microhood",
- "macrohood",
- "county",
- "neighborhood",
- "localadmin",
- "locality",
- "region",
- "dependency",
- "country"
- ]
- );
- }
}
diff --git a/spatialbench/tests/integration_tests.rs
b/spatialbench/tests/integration_tests.rs
index 6108279..1880d79 100644
--- a/spatialbench/tests/integration_tests.rs
+++ b/spatialbench/tests/integration_tests.rs
@@ -3,7 +3,6 @@
use spatialbench::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
- ZoneGenerator,
};
struct TestIntoIterator<G>
@@ -103,22 +102,6 @@ fn test_vehicle_into_iter() {
}
}
-#[test]
-fn test_zone_into_iter() {
- {
- assert_eq!(
- TestIntoIterator::new(ZoneGenerator::new(0.001, 1, 1))
- .to_string_vec(5)
- .len(),
- 5
- );
- }
- {
- let zone = ZoneGenerator::new(0.001, 1, 1);
- assert_eq!(TestIntoIterator::new(zone).to_string_vec(5).len(), 5);
- }
-}
-
#[test]
fn test_building_into_iter() {
{