This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git

commit f02617d6f4ee614676325811f34a8ca256ce2a7d
Author: Pranav Toggi <[email protected]>
AuthorDate: Thu Aug 21 10:30:50 2025 -0700

    [EWT-3199] Temporary partitioning of Zone generation via LIMIT and OFFSET 
in DuckDB (#8)
    
    * Use LIMIT/OFFSET approach
    
    * fmt fix
---
 spatialbench/Cargo.toml        |   1 +
 spatialbench/src/generators.rs | 225 ++++++++++++++++++++++-------------------
 2 files changed, 123 insertions(+), 103 deletions(-)

diff --git a/spatialbench/Cargo.toml b/spatialbench/Cargo.toml
index e679fec..8eac353 100644
--- a/spatialbench/Cargo.toml
+++ b/spatialbench/Cargo.toml
@@ -16,6 +16,7 @@ rand = { version = "0.8", features = ["small_rng"] }
 duckdb = { version = "1.3.0", features = ["bundled"] }
 geo = { workspace = true }
 geozero = { workspace = true }
+log = "0.4.27"
 
 [dev-dependencies]
 flate2 = "1.1.0"
diff --git a/spatialbench/src/generators.rs b/spatialbench/src/generators.rs
index 7bfc047..8ab801c 100644
--- a/spatialbench/src/generators.rs
+++ b/spatialbench/src/generators.rs
@@ -16,11 +16,13 @@ use duckdb::Connection;
 use geo::Geometry;
 use geo::Point;
 use geozero::{wkb::Wkb, ToGeo};
+use log::{debug, error, info};
 use rand::rngs::StdRng;
 use rand::{Rng, SeedableRng};
 use std::convert::TryInto;
 use std::fmt;
 use std::fmt::Display;
+use std::time::Instant;
 
 /// A Vehicle Manufacturer, formatted as `"Manufacturer#<n>"`
 #[derive(Debug, Clone, Copy, PartialEq)]
@@ -1426,7 +1428,6 @@ impl Display for Zone {
 #[derive(Debug, Clone)]
 pub struct ZoneGenerator {
     scale_factor: f64,
-    zones: Vec<Zone>,
     part: i32,
     part_count: i32,
 }
@@ -1448,41 +1449,68 @@ impl ZoneGenerator {
         )
     }
 
-    /// Creates a new ZoneGenerator that loads data from S3
-    pub fn new(scale_factor: f64, part: i32, part_count: i32) -> ZoneGenerator 
{
-        // construct temporary ZoneGenerator with empty zones
-        let mut generator = ZoneGenerator {
+    /// Create a new zone generator with streaming approach
+    pub fn new(scale_factor: f64, part: i32, part_count: i32) -> Self {
+        let start = Instant::now();
+        info!(
+            "Creating ZoneGenerator with scale_factor={}, part={}, 
part_count={}",
+            scale_factor, part, part_count
+        );
+        let elapsed = start.elapsed();
+        info!("ZoneGenerator created in {:?}", elapsed);
+
+        Self {
             scale_factor,
             part,
             part_count,
-            zones: Vec::new(),
-        };
+        }
+    }
 
-        let zones = generator.load_zones_from_s3();
-        generator.zones = zones;
+    /// Calculate zones per partition
+    fn calculate_zones_per_part(&self) -> i64 {
+        let total_zones = (self.scale_factor * Self::SCALE_BASE as f64).ceil() 
as i64;
+        (total_zones as f64 / self.part_count as f64).ceil() as i64
+    }
 
-        generator
+    /// Calculate offset for this partition
+    fn calculate_offset(&self) -> i64 {
+        let zones_per_part = self.calculate_zones_per_part();
+        (self.part - 1) as i64 * zones_per_part
     }
 
-    /// Loads zone data from S3 parquet file using DuckDB
-    fn load_zones_from_s3(&self) -> Vec<Zone> {
+    /// Load zones for this specific partition using LIMIT and OFFSET
+    fn load_partition_zones(&self) -> Result<Vec<Zone>, Box<dyn 
std::error::Error>> {
+        info!(
+            "Loading zones for partition {} of {}",
+            self.part, self.part_count
+        );
+        let start_total = Instant::now();
+
         // Create a connection to DuckDB
-        let conn = Connection::open_in_memory().expect("Failed to open DuckDB 
connection");
+        let t0 = Instant::now();
+        let conn = Connection::open_in_memory()?;
+        debug!("Opened DuckDB connection in {:?}", t0.elapsed());
 
         // Install and load required extensions
-        conn.execute("INSTALL httpfs;", [])
-            .expect("Failed to install httpfs");
-        conn.execute("LOAD httpfs;", [])
-            .expect("Failed to load httpfs");
-        conn.execute("INSTALL spatial;", [])
-            .expect("Failed to install spatial");
-        conn.execute("LOAD spatial;", [])
-            .expect("Failed to load spatial");
+        let t1 = Instant::now();
+        conn.execute("INSTALL httpfs;", [])?;
+        conn.execute("LOAD httpfs;", [])?;
+        conn.execute("INSTALL spatial;", [])?;
+        conn.execute("LOAD spatial;", [])?;
+        debug!(
+            "Installed and loaded DuckDB extensions in {:?}",
+            t1.elapsed()
+        );
 
+        // Calculate partition parameters
+        let zones_per_part = self.calculate_zones_per_part();
+        let offset = self.calculate_offset();
         let zones_url = Self::get_zones_parquet_url();
 
-        // Compute the limit based on scale factor
-        let limit = (self.scale_factor * Self::SCALE_BASE as f64).ceil() as 
i64;
+        info!(
+            "Partition {}: LIMIT {} OFFSET {} from {}",
+            self.part, zones_per_part, offset, zones_url
+        );
 
         let query = format!(
             "SELECT
@@ -1494,75 +1522,78 @@ impl ZoneGenerator {
                 ST_AsWKB(geometry) as z_boundary
              FROM read_parquet('{}', hive_partitioning=1)
              WHERE subtype IN ('localadmin', 'locality', 'neighborhood')
-             LIMIT {};",
-            zones_url, limit
+             LIMIT {} OFFSET {};",
+            zones_url, zones_per_part, offset
         );
+        debug!("Generated partition query: {}", query);
+
+        // Prepare + execute query
+        let t2 = Instant::now();
+        let mut stmt = conn.prepare(&query)?;
+        debug!("Prepared statement in {:?}", t2.elapsed());
 
-        let mut stmt = conn.prepare(&query).unwrap();
-        let mut rows = stmt.query([]).unwrap();
+        let t3 = Instant::now();
+        let mut rows = stmt.query([])?;
+        debug!("Executed query and got row iterator in {:?}", t3.elapsed());
 
+        // Iterate rows and parse geometries
         let mut zones = Vec::new();
-        // Counter for primary key
-        let mut zone_id = 1;
+        let mut zone_id = offset + 1;
 
+        let t4 = Instant::now();
         while let Ok(Some(row)) = rows.next() {
-            let wkb_bytes: Vec<u8> = row.get(5).unwrap();
-            let geometry: Geometry = Wkb(&wkb_bytes).to_geo().unwrap();
+            let z_gersid: String = row.get(0)?;
+            let z_country: String = row.get(1)?;
+            let z_region: String = row.get(2)?;
+            let z_name: String = row.get(3)?;
+            let z_subtype: String = row.get(4)?;
+            let wkb_bytes: Vec<u8> = row.get(5)?;
+            let geometry: Geometry = Wkb(&wkb_bytes).to_geo()?;
 
             zones.push(Zone {
                 z_zonekey: zone_id,
-                z_gersid: row.get(0).unwrap(),
-                z_country: row.get(1).unwrap(),
-                z_region: row.get(2).unwrap(),
-                z_name: row.get(3).unwrap(),
-                z_subtype: row.get(4).unwrap(),
+                z_gersid,
+                z_country,
+                z_region,
+                z_name,
+                z_subtype,
                 z_boundary: geometry,
             });
+
+            if zones.len() % 1000 == 0 {
+                debug!(
+                    "Processed {} rows for partition {}...",
+                    zones.len(),
+                    self.part
+                );
+            }
             zone_id += 1;
         }
 
-        zones
+        info!(
+            "Partition {} loaded: {} zones in {:?}",
+            self.part,
+            zones.len(),
+            t4.elapsed()
+        );
+
+        info!("Total partition load took {:?}", start_total.elapsed());
+        Ok(zones)
     }
 
     /// Return the row count for the given part
     pub fn calculate_row_count(&self) -> i64 {
-        let zone_count = self.zones.len() as i64;
-
-        if self.part_count <= 1 {
-            return zone_count;
-        }
-
-        // Partition the zones based on part number
-        let zones_per_part = (zone_count + self.part_count as i64 - 1) / 
self.part_count as i64;
-        let start = (self.part - 1) as i64 * zones_per_part;
-        let end = std::cmp::min(start + zones_per_part, zone_count);
+        let total_zones = (self.scale_factor * Self::SCALE_BASE as f64).ceil() 
as i64;
+        let zones_per_part = self.calculate_zones_per_part();
+        let offset = self.calculate_offset();
 
-        end - start
+        // Don't exceed total available zones
+        std::cmp::min(zones_per_part, total_zones - offset).max(0)
     }
 
     /// Returns an iterator over the zone rows
     pub fn iter(&self) -> ZoneGeneratorIterator {
-        let zone_count = self.zones.len() as i64;
-
-        // If there's only one part, return all zones
-        if self.part_count <= 1 {
-            return ZoneGeneratorIterator {
-                zones: self.zones.clone(),
-                end_index: zone_count,
-                current_index: 0,
-            };
-        }
-
-        // Otherwise, calculate the correct range for this part
-        let zones_per_part = (zone_count + self.part_count as i64 - 1) / 
self.part_count as i64;
-        let start = (self.part - 1) as i64 * zones_per_part;
-        let end = std::cmp::min(start + zones_per_part, zone_count);
-
-        ZoneGeneratorIterator {
-            zones: self.zones.clone(),
-            end_index: end,
-            current_index: start,
-        }
+        ZoneGeneratorIterator::new(self.clone())
     }
 }
 
@@ -1571,55 +1602,43 @@ impl IntoIterator for ZoneGenerator {
     type IntoIter = ZoneGeneratorIterator;
 
     fn into_iter(self) -> Self::IntoIter {
-        let zone_count = self.zones.len() as i64;
-
-        // If there's only one part, return all zones
-        if self.part_count <= 1 {
-            return ZoneGeneratorIterator {
-                zones: self.zones,
-                end_index: zone_count,
-                current_index: 0,
-            };
-        }
-
-        // Otherwise, calculate the correct range for this part
-        let zones_per_part = (zone_count + self.part_count as i64 - 1) / 
self.part_count as i64;
-        let start = (self.part - 1) as i64 * zones_per_part;
-        let end = std::cmp::min(start + zones_per_part, zone_count);
-
-        ZoneGeneratorIterator {
-            zones: self.zones,
-            end_index: end,
-            current_index: start,
-        }
+        self.iter()
     }
 }
 
-/// Iterator that provides access to Zone rows
+/// Iterator that generates Zone rows by loading partition data on-demand
 #[derive(Debug)]
 pub struct ZoneGeneratorIterator {
     zones: Vec<Zone>,
-    end_index: i64,
-    current_index: i64,
+    index: usize,
+}
+
+impl ZoneGeneratorIterator {
+    fn new(generator: ZoneGenerator) -> Self {
+        // Load zones for this partition only
+        let zones = generator.load_partition_zones().unwrap_or_else(|e| {
+            error!(
+                "Failed to load zones for partition {}: {}",
+                generator.part, e
+            );
+            Vec::new()
+        });
+
+        ZoneGeneratorIterator { zones, index: 0 }
+    }
 }
 
 impl Iterator for ZoneGeneratorIterator {
     type Item = Zone;
 
     fn next(&mut self) -> Option<Self::Item> {
-        if self.current_index >= self.end_index {
+        if self.index >= self.zones.len() {
             return None;
         }
 
-        let index = self.current_index as usize;
-        self.current_index += 1;
-
-        Some(self.zones[index].clone())
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        let remaining = (self.end_index - self.current_index) as usize;
-        (remaining, Some(remaining))
+        let zone = self.zones[self.index].clone();
+        self.index += 1;
+        Some(zone)
     }
 }
 

Reply via email to