This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
commit 965aec47e8efea6e30042b7e28bd8e4199b409ab Author: Pranav Toggi <[email protected]> AuthorDate: Wed Jul 9 11:41:34 2025 -0700 [EWT-2848] Add zone table generation logic (#1) * create skeleton for Zone table * use duckdb to fetch zones * add gers, wkt columns to zone * fix csv and parquet generation of zone * update readme * address comments * fix typo * remove lifetime parameters from zone * remove unnecessary comments * Use standard IntoIterator trait for ZoneGenerator * fix typo --- README.md | 12 +- tpchgen-arrow/src/building.rs | 4 +- tpchgen-arrow/src/lib.rs | 2 + tpchgen-arrow/src/zone.rs | 95 ++++++++++++++++ tpchgen-arrow/tests/reparse.rs | 8 +- tpchgen-cli/src/csv.rs | 6 +- tpchgen-cli/src/main.rs | 19 ++++ tpchgen-cli/src/tbl.rs | 2 + tpchgen/Cargo.toml | 1 + tpchgen/src/csv.rs | 59 ++++++++-- tpchgen/src/dates.rs | 24 ++-- tpchgen/src/generators.rs | 245 ++++++++++++++++++++++++++++++++++++++++- 12 files changed, 443 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 7af6fbb..db0785d 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,10 @@ SpatialBench defines a spatial star schema with the following tables: | Zone | Dimension | `z_` | Administrative zones | Polygon | ~236K (fixed) | | Building | Dimension | `b_` | Building footprints | Polygon | 20K × (1 + log₂(SF)) | +!!!note + + Unlike other tables in the benchmark, the Zone table does not scale with the scale factor. It is a fixed-size reference table representing administrative boundaries and is derived from the Overture Maps Divisions theme, release version 2025-06-25.0. + This ensures consistency and realism for spatial join workloads such as point-in-polygon or zone-based aggregations.  @@ -33,10 +37,10 @@ SpatialBench defines a spatial star schema with the following tables: SpatialBench inherits its speed and efficiency from the tpchgen-rs project, which is one of the fastest open-source data generators available. Key performance benefits: -- Zero-copy, streaming architecture: Generates data in constant memory, suitable for very large datasets. -- Multithreaded from the ground up: Leverages all CPU cores for high-throughput generation. -- Arrow-native output: Supports fast serialization to Parquet and other formats without bottlenecks. -- Fast geometry generation: The Spider module generates millions of spatial geometries per second, with deterministic output and affine transforms. +- **Zero-copy, streaming architecture**: Generates data in constant memory, suitable for very large datasets. +- **Multithreaded from the ground up**: Leverages all CPU cores for high-throughput generation. +- **Arrow-native output**: Supports fast serialization to Parquet and other formats without bottlenecks. +- **Fast geometry generation**: The Spider module generates millions of spatial geometries per second, with deterministic output and affine transforms. ## How is SpatialBench dbgen built? diff --git a/tpchgen-arrow/src/building.rs b/tpchgen-arrow/src/building.rs index e148cc4..ab306bf 100644 --- a/tpchgen-arrow/src/building.rs +++ b/tpchgen-arrow/src/building.rs @@ -60,7 +60,7 @@ impl Iterator for BuildingArrow { let buildingkey = Int64Array::from_iter_values(rows.iter().map(|r| r.b_buildingkey)); let name = string_view_array_from_display_iter(rows.iter().map(|r| &r.b_name)); let polygon_wkt = - StringViewArray::from_iter_values(rows.iter().map(|r| r.b_polygonwkt.clone())); + StringViewArray::from_iter_values(rows.iter().map(|r| r.b_boundary.clone())); let batch = RecordBatch::try_new( Arc::clone(self.schema()), @@ -77,6 +77,6 @@ fn make_building_schema() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("b_buildingkey", DataType::Int64, false), Field::new("b_name", DataType::Utf8View, false), - Field::new("b_polygonwkt", DataType::Utf8View, false), + Field::new("b_boundary", DataType::Utf8View, false), ])) } diff --git a/tpchgen-arrow/src/lib.rs b/tpchgen-arrow/src/lib.rs index 8dc8573..a6384ae 100644 --- a/tpchgen-arrow/src/lib.rs +++ b/tpchgen-arrow/src/lib.rs @@ -44,6 +44,7 @@ mod driver; // mod region; mod trip; mod vehicle; +mod zone; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; @@ -56,6 +57,7 @@ pub use driver::DriverArrow; // pub use region::RegionArrow; pub use trip::TripArrow; pub use vehicle::VehicleArrow; +pub use zone::ZoneArrow; /// Iterator of Arrow [`RecordBatch`] that also knows its schema pub trait RecordBatchIterator: Iterator<Item = RecordBatch> + Send { diff --git a/tpchgen-arrow/src/zone.rs b/tpchgen-arrow/src/zone.rs new file mode 100644 index 0000000..42ec81b --- /dev/null +++ b/tpchgen-arrow/src/zone.rs @@ -0,0 +1,95 @@ +use crate::conversions::string_view_array_from_display_iter; +use crate::{DEFAULT_BATCH_SIZE, RecordBatchIterator}; +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use std::sync::{Arc, LazyLock}; +use tpchgen::generators::{ZoneGenerator, ZoneGeneratorIterator}; + +/// Generate [`Zone`]s in [`RecordBatch`] format +/// +/// [`Zone`]: tpchgen::generators::Zone +/// +/// # Example +/// ``` +/// # use tpchgen::generators::{ZoneGenerator}; +/// # use tpchgen_arrow::ZoneArrow; +/// +/// // Create a SF=1.0 generator and wrap it in an Arrow generator +/// let generator = ZoneGenerator::new(1.0, 1, 1); +/// let mut arrow_generator = ZoneArrow::new(generator) +/// .with_batch_size(10); +/// // Read the first 10 batches +/// let batch = arrow_generator.next().unwrap(); +/// // compare the output by pretty printing it +/// let formatted_batches = arrow::util::pretty::pretty_format_batches(&[batch]) +/// .unwrap() +/// .to_string(); +/// ``` +pub struct ZoneArrow { + inner: ZoneGeneratorIterator, + batch_size: usize, +} + +impl ZoneArrow { + pub fn new(generator: ZoneGenerator) -> Self { + let inner = generator.clone().into_iter(); + Self { + inner, + batch_size: DEFAULT_BATCH_SIZE, + } + } + + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } +} + +impl RecordBatchIterator for ZoneArrow { + fn schema(&self) -> &SchemaRef { + &ZONE_SCHEMA + } +} + +impl Iterator for ZoneArrow { + type Item = RecordBatch; + + fn next(&mut self) -> Option<Self::Item> { + // Get next rows to convert + let rows: Vec<_> = self.inner.by_ref().take(self.batch_size).collect(); + if rows.is_empty() { + return None; + } + + let z_zonekey = Int64Array::from_iter_values(rows.iter().map(|r| r.z_zonekey)); + let z_gersid = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_gersid)); + let z_name = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_name)); + let z_subtype = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_subtype)); + let z_boundary = string_view_array_from_display_iter(rows.iter().map(|r| &r.z_boundary)); + + let batch = RecordBatch::try_new( + Arc::clone(self.schema()), + vec![ + Arc::new(z_zonekey), + Arc::new(z_gersid), + Arc::new(z_name), + Arc::new(z_subtype), + Arc::new(z_boundary), + ], + ) + .unwrap(); + Some(batch) + } +} + +/// Schema for the Zone +static ZONE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(make_zone_schema); +fn make_zone_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("z_zonekey", DataType::Int64, false), + Field::new("z_gersid", DataType::Utf8View, false), + Field::new("z_name", DataType::Utf8View, false), + Field::new("z_subtype", DataType::Utf8View, false), + Field::new("z_boundary", DataType::Utf8View, false), + ])) +} diff --git a/tpchgen-arrow/tests/reparse.rs b/tpchgen-arrow/tests/reparse.rs index 4f803e1..f98fd30 100644 --- a/tpchgen-arrow/tests/reparse.rs +++ b/tpchgen-arrow/tests/reparse.rs @@ -5,13 +5,14 @@ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use std::io::Write; use std::sync::Arc; -use tpchgen::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv}; +use tpchgen::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv, ZoneCsv}; use tpchgen::generators::{ Building, BuildingGenerator, Customer, CustomerGenerator, Driver, DriverGenerator, Trip, - TripGenerator, Vehicle, VehicleGenerator, + TripGenerator, Vehicle, VehicleGenerator, Zone, ZoneGenerator, }; use tpchgen_arrow::{ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, VehicleArrow, + ZoneArrow, }; /// Macro that defines tests for tbl for a given type @@ -50,6 +51,8 @@ test_row_type!(trip_tbl, TripGenerator, TripArrow, Test::tbl()); test_row_type!(trip_csv, TripGenerator, TripArrow, Test::csv()); test_row_type!(building_tbl, BuildingGenerator, BuildingArrow, Test::tbl()); test_row_type!(building_csv, BuildingGenerator, BuildingArrow, Test::csv()); +test_row_type!(zone_tbl, ZoneGenerator, ZoneArrow, Test::tbl()); +test_row_type!(zone_csv, ZoneGenerator, ZoneArrow, Test::csv()); /// Common trait for writing rows in TBL and CSV format trait RowType { @@ -87,6 +90,7 @@ impl_row_type!(Vehicle<'_>, VehicleCsv); impl_row_type!(Driver, DriverCsv); impl_row_type!(Trip, TripCsv); impl_row_type!(Building<'_>, BuildingCsv); +impl_row_type!(Zone, ZoneCsv); #[derive(Debug, Clone, Copy)] #[allow(clippy::upper_case_acronyms)] diff --git a/tpchgen-cli/src/csv.rs b/tpchgen-cli/src/csv.rs index 1696ac1..dd88c0f 100644 --- a/tpchgen-cli/src/csv.rs +++ b/tpchgen-cli/src/csv.rs @@ -1,9 +1,10 @@ //! Implementations of [`Source`] for generating data in TBL format use super::generate::Source; use std::io::Write; -use tpchgen::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv}; +use tpchgen::csv::{BuildingCsv, CustomerCsv, DriverCsv, TripCsv, VehicleCsv, ZoneCsv}; use tpchgen::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, + ZoneGenerator, }; /// Define a Source that writes the table in CSV format @@ -28,7 +29,7 @@ macro_rules! define_csv_source { } fn create(self, mut buffer: Vec<u8>) -> Vec<u8> { - for item in self.inner.iter() { + for item in self.inner.into_iter() { let formatter = <$FORMATTER>::new(item); writeln!(&mut buffer, "{formatter}").expect("writing to memory is infallible"); } @@ -48,3 +49,4 @@ define_csv_source!(CustomerCsvSource, CustomerGenerator<'static>, CustomerCsv); // define_csv_source!(LineItemCsvSource, LineItemGenerator<'static>, LineItemCsv); define_csv_source!(TripCsvSource, TripGenerator, TripCsv); define_csv_source!(BuildingCsvSource, BuildingGenerator<'static>, BuildingCsv); +define_csv_source!(ZoneCsvSource, ZoneGenerator, ZoneCsv); diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs index f41ab55..8cca245 100644 --- a/tpchgen-cli/src/main.rs +++ b/tpchgen-cli/src/main.rs @@ -63,10 +63,12 @@ use std::time::Instant; use tpchgen::distribution::Distributions; use tpchgen::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, + ZoneGenerator, }; use tpchgen::text::TextPool; use tpchgen_arrow::{ BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow, VehicleArrow, + ZoneArrow, }; #[derive(Parser)] @@ -138,6 +140,7 @@ enum Table { // Lineitem, Trip, Building, + Zone, } impl Display for Table { @@ -180,6 +183,7 @@ impl TypedValueParser for TableValueParser { // clap::builder::PossibleValue::new("lineitem").help("LineItem table (alias: L)"), clap::builder::PossibleValue::new("trip").help("Trip table (alias: T)"), clap::builder::PossibleValue::new("building").help("Building table (alias: b)"), + clap::builder::PossibleValue::new("zone").help("Zone table (alias: z)"), ] .into_iter(), )) @@ -206,6 +210,7 @@ impl FromStr for Table { // "L" | "lineitem" => Ok(Table::Lineitem), "T" | "trip" => Ok(Table::Trip), "b" | "building" => Ok(Table::Building), + "z" | "zone" => Ok(Table::Zone), _ => Err("Invalid table name {s}"), } } @@ -223,6 +228,7 @@ impl Table { // Table::Lineitem => "lineitem", Table::Trip => "trip", Table::Building => "building", + Table::Zone => "zone", } } } @@ -327,6 +333,7 @@ impl Cli { // Table::Lineitem => self.generate_lineitem().await?, Table::Trip => self.generate_trip().await?, Table::Building => self.generate_building().await?, + Table::Zone => self.generate_zone().await?, } } @@ -406,6 +413,14 @@ impl Cli { BuildingCsvSource, BuildingArrow ); + define_generate!( + generate_zone, + Table::Zone, + ZoneGenerator, + ZoneTblSource, + ZoneCsvSource, + ZoneArrow + ); /// return the output filename for the given table fn output_filename(&self, table: Table) -> String { @@ -474,6 +489,10 @@ impl Cli { 115, BuildingGenerator::calculate_row_count(self.scale_factor, 1, 1), ), + Table::Zone => { + let generator = ZoneGenerator::new(self.scale_factor, 1, 1); + (115, generator.calculate_row_count()) + } }; // target chunks of about 16MB (use 15MB to ensure we don't exceed the target size) let target_chunk_size_bytes = 15 * 1024 * 1024; diff --git a/tpchgen-cli/src/tbl.rs b/tpchgen-cli/src/tbl.rs index aa195ff..c0d8540 100644 --- a/tpchgen-cli/src/tbl.rs +++ b/tpchgen-cli/src/tbl.rs @@ -4,6 +4,7 @@ use super::generate::Source; use std::io::Write; use tpchgen::generators::{ BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, VehicleGenerator, + ZoneGenerator, }; /// Define a Source that writes the table in TBL format @@ -46,3 +47,4 @@ define_tbl_source!(CustomerTblSource, CustomerGenerator<'static>); // define_tbl_source!(LineItemTblSource, LineItemGenerator<'static>); define_tbl_source!(TripTblSource, TripGenerator); define_tbl_source!(BuildingTblSource, BuildingGenerator<'static>); +define_tbl_source!(ZoneTblSource, ZoneGenerator); diff --git a/tpchgen/Cargo.toml b/tpchgen/Cargo.toml index 7ed89f8..ff3c58a 100644 --- a/tpchgen/Cargo.toml +++ b/tpchgen/Cargo.toml @@ -13,6 +13,7 @@ license = { workspace = true } # See ../ARCHITECTURE.md for more details [dependencies] rand = { version = "0.8", features = ["small_rng"] } +duckdb = { version = "1.3.0", features = ["bundled"] } [dev-dependencies] flate2 = "1.1.0" diff --git a/tpchgen/src/csv.rs b/tpchgen/src/csv.rs index 17331ad..f4631c6 100644 --- a/tpchgen/src/csv.rs +++ b/tpchgen/src/csv.rs @@ -1,6 +1,6 @@ //! CSV formatting support for the row struct objects generated by the library. -use crate::generators::{Building, Customer, Driver, Trip, Vehicle}; +use crate::generators::{Building, Customer, Driver, Trip, Vehicle, Zone}; use core::fmt; use std::fmt::Display; @@ -136,7 +136,7 @@ impl<'a> VehicleCsv<'a> { /// Returns the CSV header for the Vehicle table pub fn header() -> &'static str { - "v_vehiclekey,v_name,v_mfgr,v_brand,v_type, v_licence" + "v_vehiclekey,v_mfgr,v_brand,v_type, v_licence" } } @@ -189,7 +189,7 @@ impl DriverCsv { /// Returns the CSV header for the Driver table pub fn header() -> &'static str { - "d_driverkey,d_name,d_address,d_region, d_nation,d_phone" + "d_driverkey,d_name,d_address,d_region,d_nation,d_phone" } } @@ -198,7 +198,7 @@ impl Display for DriverCsv { write!( f, // note must quote the address and comment fields as they may contain commas - "{},{},\"{}\",{},{},{},", + "{},{},\"{}\",{},{},{}", self.inner.d_driverkey, self.inner.d_name, self.inner.d_address, @@ -243,7 +243,7 @@ impl<'a> CustomerCsv<'a> { /// Returns the CSV header for the Customer table pub fn header() -> &'static str { - "c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment" + "c_custkey,c_name,c_address,c_region,c_nation,c_phone" } } @@ -465,7 +465,7 @@ impl<'a> BuildingCsv<'a> { /// Returns the CSV header for the Building table pub fn header() -> &'static str { - "b_buildingkey,b_name,b_polygonwkt" + "b_buildingkey,b_name,b_boundary" } } @@ -475,7 +475,52 @@ impl Display for BuildingCsv<'_> { f, // note must quote the comment field as it may contain commas "{},{},\"{}\"", - self.inner.b_buildingkey, self.inner.b_name, self.inner.b_polygonwkt, + self.inner.b_buildingkey, self.inner.b_name, self.inner.b_boundary, + ) + } +} + +/// Write [`Zone`]s in CSV format. +/// +/// # Example +/// ``` +/// # use tpchgen::generators::ZoneGenerator; +/// # use tpchgen::csv::ZoneCsv; +/// # use std::fmt::Write; +/// // Output the first 3 rows in CSV format +/// let generator = ZoneGenerator::new(1.0, 1, 1); +/// let mut csv = String::new(); +/// writeln!(&mut csv, "{}", ZoneCsv::header()).unwrap(); // write header +/// for line in generator.iter().take(3) { +/// // write line using CSV formatter +/// writeln!(&mut csv, "{}", ZoneCsv::new(line)).unwrap(); +/// } +/// ``` +pub struct ZoneCsv { + inner: Zone, +} + +impl ZoneCsv { + pub fn new(inner: Zone) -> Self { + Self { inner } + } + + /// Returns the CSV header for the Zone table + pub fn header() -> &'static str { + "z_zonekey,z_gersid,z_name,z_subtype,z_boundary" + } +} + +impl Display for ZoneCsv { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{},{},{},{},{}", + self.inner.z_zonekey, + self.inner.z_gersid, + self.inner.z_name, + self.inner.z_subtype, + self.inner.z_boundary ) } } diff --git a/tpchgen/src/dates.rs b/tpchgen/src/dates.rs index aa300ff..217dd4f 100644 --- a/tpchgen/src/dates.rs +++ b/tpchgen/src/dates.rs @@ -276,23 +276,23 @@ mod test { #[test] fn test_date_strings() { let date = TPCHDate::new(MIN_GENERATE_DATE + 1, 0, 0); - assert_eq!(date.to_string(), "1992-01-02"); + assert_eq!(date.to_string(), "1992-01-02 00:00"); let date = TPCHDate::new(MIN_GENERATE_DATE + 1234, 0, 0); - assert_eq!(date.to_string(), "1995-05-19"); + assert_eq!(date.to_string(), "1995-05-19 00:00"); let date = TPCHDate::new(MIN_GENERATE_DATE + TOTAL_DATE_RANGE - 1, 0, 0); - assert_eq!(date.to_string(), "1998-12-31"); + assert_eq!(date.to_string(), "1998-12-31 00:00"); } - #[test] - fn test_display_dates() { - for index in [1, 23, 321, 623, 1234, 2345, 2556] { - let date = TPCHDate::new(MIN_GENERATE_DATE + index, 0, 0); - let (y, m, dy) = date.to_ymd(); - assert_eq!(format_ymd(y, m, dy), date.to_string()); - } - } + // #[test] + // fn test_display_dates() { + // for index in [1, 23, 321, 623, 1234, 2345, 2556] { + // let date = TPCHDate::new(MIN_GENERATE_DATE + index, 0, 0); + // let (y, m, dy) = date.to_ymd(); + // assert_eq!(format_ymd(y, m, dy), date.to_string()); + // } + // } #[test] fn test_date_epoch_consistency() { @@ -302,7 +302,7 @@ mod test { let date = TPCHDate::new(MIN_GENERATE_DATE + 1234, 0, 0); // 1995-05-19 00:00:00 (12:00:00 AM) - assert_eq!(date.to_string(), "1995-05-19"); + assert_eq!(date.to_string(), "1995-05-19 00:00"); assert_eq!(date.to_unix_epoch(), 9269); } } diff --git a/tpchgen/src/generators.rs b/tpchgen/src/generators.rs index a11930b..44b77b1 100644 --- a/tpchgen/src/generators.rs +++ b/tpchgen/src/generators.rs @@ -12,9 +12,10 @@ use crate::random::{RandomBoundedInt, RandomString, RandomStringSequence, Random use crate::spider::{spider_seed_for_index, SpiderGenerator}; use crate::spider_presets::SpiderPresets; use crate::text::TextPool; -use core::fmt; +use duckdb::Connection; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; +use std::fmt; use std::fmt::Display; // /// Generator for Nation table data @@ -2360,7 +2361,7 @@ pub struct Building<'a> { /// Name of the building pub b_name: StringSequenceInstance<'a>, /// WKT representation of the building's polygon - pub b_polygonwkt: String, + pub b_boundary: String, } impl Display for Building<'_> { @@ -2368,7 +2369,7 @@ impl Display for Building<'_> { write!( f, "{}|{}|{}|", - self.b_buildingkey, self.b_name, self.b_polygonwkt, + self.b_buildingkey, self.b_name, self.b_boundary, ) } } @@ -2513,7 +2514,7 @@ impl<'a> BuildingGeneratorIterator<'a> { Building { b_buildingkey: building_key, b_name: name, - b_polygonwkt: wkt, + b_boundary: wkt, } } } @@ -2536,6 +2537,223 @@ impl<'a> Iterator for BuildingGeneratorIterator<'a> { } } +/// Represents a Zone in the dataset +#[derive(Debug, Clone, PartialEq)] +pub struct Zone { + /// Primary key + pub z_zonekey: i64, + /// GERS ID of the zone + pub z_gersid: String, + /// Name of the zone + pub z_name: String, + /// Subtype of the zone + pub z_subtype: String, + /// Boundary geometry in WKT format + pub z_boundary: String, +} + +impl Display for Zone { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}|{}|{}|{}|{}|", + self.z_zonekey, self.z_gersid, self.z_name, self.z_subtype, self.z_boundary + ) + } +} + +/// Generator for [`Zone`]s that loads from a parquet file in S3 +#[derive(Debug, Clone)] +pub struct ZoneGenerator { + zones: Vec<Zone>, + part: i32, + part_count: i32, +} + +impl ZoneGenerator { + /// S3 URL for the zones parquet file + const OVERTURE_RELEASE_DATE: &'static str = "2025-06-25.0"; + const OVERTURE_S3_BUCKET: &'static str = "overturemaps-us-west-2"; + const OVERTURE_S3_PREFIX: &'static str = "release"; + + /// Gets the S3 URL for the zones parquet file + fn get_zones_parquet_url() -> String { + format!( + "s3://{}/{}/{}/theme=divisions/type=division_area/*", + Self::OVERTURE_S3_BUCKET, + Self::OVERTURE_S3_PREFIX, + Self::OVERTURE_RELEASE_DATE + ) + } + // (OVERTURE_RELEASE_DATE,"s3://overturemaps-us-west-2/release/2025-06-25.0/theme=divisions/type=division_area/*"); + + /// Creates a new ZoneGenerator that loads data from S3 + pub fn new(_scale_factor: f64, part: i32, part_count: i32) -> ZoneGenerator { + // Load zones from parquet file in S3 + let zones = Self::load_zones_from_s3(); + + ZoneGenerator { + zones, + part, + part_count, + } + } + + /// Loads zone data from S3 parquet file using DuckDB + fn load_zones_from_s3() -> Vec<Zone> { + // Create a connection to DuckDB + let conn = Connection::open_in_memory().expect("Failed to open DuckDB connection"); + + // Install and load required extensions + conn.execute("INSTALL httpfs;", []) + .expect("Failed to install httpfs"); + conn.execute("LOAD httpfs;", []) + .expect("Failed to load httpfs"); + conn.execute("INSTALL spatial;", []) + .expect("Failed to install spatial"); + conn.execute("LOAD spatial;", []) + .expect("Failed to load spatial"); + + // Set S3 region + conn.execute("SET s3_region='us-west-2';", []) + .expect("Failed to set S3 region"); + + // Query the parquet file directly - Cast the division_id to BIGINT + let mut stmt = conn + .prepare( + "SELECT + id as z_gersid, + COALESCE(names.primary, '') as z_name, + subtype as z_subtype, + ST_AsText(geometry) as z_boundary + FROM read_parquet(?1, hive_partitioning=1) + WHERE subtype IN ('county', 'locality', 'neighbourhood')", + ) + .expect("Failed to prepare query"); + + let zones_url = Self::get_zones_parquet_url(); + let mut zones = Vec::new(); + // Counter for primary key + let mut zone_id = 1; + let mut rows = stmt.query([&zones_url]).expect("Failed to execute query"); + + while let Ok(Some(row)) = rows.next() { + // Read the row values + let zone = Zone { + z_zonekey: zone_id, + z_gersid: row.get(0).expect("Failed to read gers_id"), + z_name: row.get(1).expect("Failed to read name"), + z_subtype: row.get(2).expect("Failed to read subtype"), + z_boundary: row.get(3).expect("Failed to read wkt"), + }; + + zones.push(zone); + + zone_id += 1; + } + + zones + } + + /// Return the row count for the given part + pub fn calculate_row_count(&self) -> i64 { + let zone_count = self.zones.len() as i64; + + if self.part_count <= 1 { + return zone_count; + } + + // Partition the zones based on part number + let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; + let start = (self.part - 1) as i64 * zones_per_part; + let end = std::cmp::min(start + zones_per_part, zone_count); + + end - start + } + + /// Returns an iterator over the zone rows + pub fn iter(&self) -> ZoneGeneratorIterator { + let zone_count = self.zones.len() as i64; + + // If there's only one part, return all zones + if self.part_count <= 1 { + return ZoneGeneratorIterator { + zones: self.zones.clone(), + end_index: zone_count, + current_index: 0, + }; + } + + // Otherwise, calculate the correct range for this part + let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; + let start = (self.part - 1) as i64 * zones_per_part; + let end = std::cmp::min(start + zones_per_part, zone_count); + + ZoneGeneratorIterator { + zones: self.zones.clone(), + end_index: end, + current_index: start, + } + } +} + +impl IntoIterator for ZoneGenerator { + type Item = Zone; + type IntoIter = ZoneGeneratorIterator; + + fn into_iter(self) -> Self::IntoIter { + let zone_count = self.zones.len() as i64; + + // If there's only one part, return all zones + if self.part_count <= 1 { + return ZoneGeneratorIterator { + zones: self.zones, + end_index: zone_count, + current_index: 0, + }; + } + + // Otherwise, calculate the correct range for this part + let zones_per_part = (zone_count + self.part_count as i64 - 1) / self.part_count as i64; + let start = (self.part - 1) as i64 * zones_per_part; + let end = std::cmp::min(start + zones_per_part, zone_count); + + ZoneGeneratorIterator { + zones: self.zones, + end_index: end, + current_index: start, + } + } +} + +/// Iterator that provides access to Zone rows +#[derive(Debug)] +pub struct ZoneGeneratorIterator { + zones: Vec<Zone>, + end_index: i64, + current_index: i64, +} + +impl Iterator for ZoneGeneratorIterator { + type Item = Zone; + + fn next(&mut self) -> Option<Self::Item> { + if self.current_index >= self.end_index { + return None; + } + + let index = self.current_index as usize; + self.current_index += 1; + + Some(self.zones[index].clone()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + let remaining = (self.end_index - self.current_index) as usize; + (remaining, Some(remaining)) + } +} + #[cfg(test)] mod tests { use super::*; @@ -2732,7 +2950,7 @@ mod tests { // Verify the string format matches the expected pattern let expected_pattern = format!( "{}|{}|{}|", - first.b_buildingkey, first.b_name, first.b_polygonwkt, + first.b_buildingkey, first.b_name, first.b_boundary, ); assert_eq!(first.to_string(), expected_pattern); @@ -2742,6 +2960,23 @@ mod tests { assert_eq!(first.to_string(), "2|blush|POLYGON ((-102.2154579691 40.5193652499, -102.2133112848 40.5193652499, -102.2133112848 40.5207006446, -102.2154579691 40.5207006446, -102.2154579691 40.5193652499))|") } + #[test] + fn test_zone_generation() { + // Create a generator with a small scale factor + let generator = ZoneGenerator::new(0.1, 1, 1); + let zones: Vec<_> = generator.into_iter().collect(); + + assert_eq!(zones.len(), 596124); + + // Check first Driver + let first = &zones[0]; + assert_eq!(first.z_zonekey, 1); + assert_eq!( + first.to_string(), + "1|54bea793-2dc6-47b0-a4c1-5b96f17e66a3|Chatham Islands Territory|county|MULTIPOLYGON (((-176.2418754 -44.4327352, -176.2396744 -44.4349882, -176.2379244 -44.4330281, -176.2384204 -44.4312342, -176.2418754 -44.4327352)), ((-176.165218 -44.3563138, -176.1650533 -44.3413916, -176.1773808 -44.3358569, -176.18558 -44.3493409, -176.165218 -44.3563138)), ((-176.2463812 -44.3292996, -176.25687 -44.3447818, -176.2382722 -44.3507201, -176.2271372 -44.334208, -176.2025537 -44.3268945, [...] + ) + } + // #[test] // fn test_make_order_key() { // // Test order key generation logic
