This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ceb6b1  perf: Cache geometries in KNN queries to eliminate redundant 
WKB conversions (#53)
7ceb6b1 is described below

commit 7ceb6b1da95308c592638e3e473f40d4c06d9134
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Sep 11 08:11:20 2025 -0700

    perf: Cache geometries in KNN queries to eliminate redundant WKB 
conversions (#53)
---
 benchmarks/test_knn.py                | 226 ++++++++++++++++++++++++++++++++++
 rust/sedona-spatial-join/src/index.rs |  74 ++++++++---
 2 files changed, 282 insertions(+), 18 deletions(-)

diff --git a/benchmarks/test_knn.py b/benchmarks/test_knn.py
new file mode 100644
index 0000000..524363a
--- /dev/null
+++ b/benchmarks/test_knn.py
@@ -0,0 +1,226 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import pytest
+from test_bench_base import TestBenchBase
+from sedonadb.testing import SedonaDB
+
+
+class TestBenchKNN(TestBenchBase):
+    def setup_class(self):
+        """Setup test data for KNN benchmarks"""
+        self.sedonadb = SedonaDB.create_or_skip()
+
+        # Create building-like polygons (index side - fewer, larger geometries)
+        # Note: Dataset sizes are limited to avoid performance issues observed 
when processing
+        # very large synthetic datasets. Large synthetic datasets have been 
observed to cause
+        # memory pressure or performance degradation in DataFusion operations.
+        building_options = {
+            "geom_type": "Polygon",
+            "target_rows": 2_000,  # Reasonable size for benchmarking
+            "vertices_per_linestring_range": [4, 8],
+            "size_range": [0.001, 0.01],
+            "seed": 42,
+        }
+
+        building_query = f"""
+            SELECT
+                geometry as geom,
+                round(random() * 1000) as building_id,
+                'Building_' || cast(round(random() * 1000) as varchar) as name
+            FROM sd_random_geometry('{json.dumps(building_options)}')
+        """
+        building_tab = self.sedonadb.execute_and_collect(building_query)
+        self.sedonadb.create_table_arrow("knn_buildings", building_tab)
+
+        # Create trip pickup points (probe side - many small geometries)
+        trip_options = {
+            "geom_type": "Point",
+            "target_rows": 10_000,
+            "seed": 43,
+        }
+
+        trip_query = f"""
+            SELECT
+                geometry as geom,
+                round(random() * 100000) as trip_id
+            FROM sd_random_geometry('{json.dumps(trip_options)}')
+        """
+        trip_tab = self.sedonadb.execute_and_collect(trip_query)
+        self.sedonadb.create_table_arrow("knn_trips", trip_tab)
+
+        # Create a smaller test dataset for quick tests
+        small_building_query = """
+            SELECT * FROM knn_buildings LIMIT 1000
+        """
+        small_building_tab = 
self.sedonadb.execute_and_collect(small_building_query)
+        self.sedonadb.create_table_arrow("knn_buildings_small", 
small_building_tab)
+
+        small_trip_query = """
+            SELECT * FROM knn_trips LIMIT 5000
+        """
+        small_trip_tab = self.sedonadb.execute_and_collect(small_trip_query)
+        self.sedonadb.create_table_arrow("knn_trips_small", small_trip_tab)
+
+    @pytest.mark.parametrize("k", [1, 5, 10])
+    @pytest.mark.parametrize("use_spheroid", [False, True])
+    @pytest.mark.parametrize("dataset_size", ["small", "large"])
+    def test_knn_performance(self, benchmark, k, use_spheroid, dataset_size):
+        """Benchmark KNN query performance with different parameters"""
+
+        if dataset_size == "small":
+            trip_table = "knn_trips_small"
+            building_table = "knn_buildings_small"
+            trip_limit = 100  # Test with 100 trips
+        else:
+            trip_table = "knn_trips_small"
+            building_table = "knn_buildings"
+            trip_limit = 500
+
+        spheroid_str = "TRUE" if use_spheroid else "FALSE"
+
+        def run_knn_query():
+            query = f"""
+                WITH trip_sample AS (
+                    SELECT trip_id, geom as trip_geom
+                    FROM {trip_table}
+                    LIMIT {trip_limit}
+                ),
+                building_with_geom AS (
+                    SELECT building_id, name, geom as building_geom
+                    FROM {building_table}
+                )
+                SELECT
+                    t.trip_id,
+                    b.building_id,
+                    b.name,
+                    ST_Distance(t.trip_geom, b.building_geom) as distance
+                FROM trip_sample t
+                JOIN building_with_geom b ON ST_KNN(t.trip_geom, 
b.building_geom, {k}, {spheroid_str})
+                ORDER BY t.trip_id, distance
+            """
+            result = self.sedonadb.execute_and_collect(query)
+            return len(result)  # Return result count for verification
+
+        # Run the benchmark
+        result_count = benchmark(run_knn_query)
+
+        # Verify we got the expected number of results (trips * k)
+        expected_count = trip_limit * k
+        assert result_count == expected_count, (
+            f"Expected {expected_count} results, got {result_count}"
+        )
+
+    @pytest.mark.parametrize("k", [1, 5, 10, 20])
+    def test_knn_scalability_by_k(self, benchmark, k):
+        """Test how KNN performance scales with increasing k values"""
+
+        def run_knn_query():
+            query = f"""
+                WITH trip_sample AS (
+                    SELECT trip_id, geom as trip_geom
+                    FROM knn_trips_small
+                    LIMIT 50  -- Small sample for k scaling test
+                )
+                SELECT
+                    COUNT(*) as result_count
+                FROM trip_sample t
+                JOIN knn_buildings_small b ON ST_KNN(t.trip_geom, b.geom, {k}, 
FALSE)
+            """
+            result = self.sedonadb.execute_and_collect(query)
+            return result.to_pandas().iloc[0]["result_count"]
+
+        result_count = benchmark(run_knn_query)
+        expected_count = 50 * k  # 50 trips * k neighbors each
+        assert result_count == expected_count, (
+            f"Expected {expected_count} results, got {result_count}"
+        )
+
+    def test_knn_correctness(self):
+        """Verify KNN returns results in correct distance order"""
+
+        # Test with a known point and verify ordering
+        query = """
+            WITH test_point AS (
+                SELECT ST_Point(0.0, 0.0) as query_geom
+            )
+            SELECT
+                ST_Distance(test_point.query_geom, b.geom) as distance,
+                b.building_id
+            FROM test_point
+            JOIN knn_buildings_small b ON ST_KNN(test_point.query_geom, 
b.geom, 5, FALSE)
+            ORDER BY distance
+        """
+
+        result = self.sedonadb.execute_and_collect(query).to_pandas()
+
+        # Verify we got 5 results
+        assert len(result) == 5, f"Expected 5 results, got {len(result)}"
+
+        # Verify distances are in ascending order
+        distances = result["distance"].tolist()
+        assert distances == sorted(distances), (
+            f"Results not ordered by distance: {distances}"
+        )
+
+        # Verify all distances are non-negative
+        assert all(d >= 0 for d in distances), f"Found negative distances: 
{distances}"
+
+    def test_knn_tie_breaking(self):
+        """Test KNN behavior with tie-breaking when geometries have equal 
distances"""
+
+        # Create test data with known equal distances
+        setup_query = """
+            WITH test_points AS (
+                SELECT 1 as id, ST_Point(1.0, 0.0) as geom
+                UNION ALL
+                SELECT 2 as id, ST_Point(-1.0, 0.0) as geom
+                UNION ALL
+                SELECT 3 as id, ST_Point(0.0, 1.0) as geom
+                UNION ALL
+                SELECT 4 as id, ST_Point(0.0, -1.0) as geom
+                UNION ALL
+                SELECT 5 as id, ST_Point(2.0, 0.0) as geom
+            )
+            SELECT * FROM test_points
+        """
+        tie_test_tab = self.sedonadb.execute_and_collect(setup_query)
+        self.sedonadb.create_table_arrow("knn_tie_test", tie_test_tab)
+
+        # Query for 2 nearest neighbors from origin - should get 2 of the 4 
equidistant points
+        query = """
+            WITH query_point AS (
+                SELECT ST_Point(0.0, 0.0) as geom
+            )
+            SELECT
+                t.id,
+                ST_Distance(query_point.geom, t.geom) as distance
+            FROM query_point
+            JOIN knn_tie_test t ON ST_KNN(query_point.geom, t.geom, 2, FALSE)
+            ORDER BY distance, t.id
+        """
+
+        result = self.sedonadb.execute_and_collect(query).to_pandas()
+
+        # Should get exactly 2 results
+        assert len(result) == 2, f"Expected 2 results, got {len(result)}"
+
+        # Both should be at distance 1.0 (the 4 equidistant points)
+        distances = result["distance"].tolist()
+        assert all(abs(d - 1.0) < 1e-6 for d in distances), (
+            f"Expected distances ~1.0, got {distances}"
+        )
diff --git a/rust/sedona-spatial-join/src/index.rs 
b/rust/sedona-spatial-join/src/index.rs
index c683593..59335f9 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -235,6 +235,32 @@ impl SpatialIndexBuilder {
         geom_idx_vec
     }
 
+    /// Build cached geometries for KNN queries to avoid repeated WKB 
conversions
+    /// Returns both geometries and total WKB size for memory estimation
+    fn build_cached_geometries(indexed_batches: &[IndexedBatch]) -> 
(Vec<Geometry<f64>>, usize) {
+        let mut geometries = Vec::new();
+        let mut total_wkb_size = 0;
+
+        for indexed_batch in indexed_batches.iter() {
+            for wkb_opt in indexed_batch.geom_array.wkbs().iter() {
+                if let Some(wkb) = wkb_opt.as_ref() {
+                    if let Ok(geom) = item_to_geometry(wkb) {
+                        geometries.push(geom);
+                        total_wkb_size += wkb.buf().len();
+                    }
+                }
+            }
+        }
+
+        (geometries, total_wkb_size)
+    }
+
+    /// Estimate the memory usage of cached geometries based on WKB size with 
overhead
+    fn estimate_geometry_memory(wkb_size: usize) -> usize {
+        // Use WKB size as base + overhead for geo::Geometry objects
+        wkb_size * 2
+    }
+
     /// Finish building and return the completed SpatialIndex.
     pub fn finish(mut self, schema: SchemaRef) -> Result<SpatialIndex> {
         if self.indexed_batches.is_empty() {
@@ -271,6 +297,16 @@ impl SpatialIndexBuilder {
             ConcurrentReservation::try_new(REFINER_RESERVATION_PREALLOC_SIZE, 
refiner_reservation)
                 .unwrap();
 
+        // Pre-compute geometries for KNN queries to avoid repeated 
WKB-to-geometry conversions
+        let (cached_geometries, total_wkb_size) =
+            Self::build_cached_geometries(&self.indexed_batches);
+
+        // Reserve memory for cached geometries using WKB size with overhead
+        let geometry_memory_estimate = 
Self::estimate_geometry_memory(total_wkb_size);
+        let geometry_consumer = 
MemoryConsumer::new("SpatialJoinGeometryCache");
+        let mut geometry_reservation = 
geometry_consumer.register(&self.memory_pool);
+        geometry_reservation.try_grow(geometry_memory_estimate)?;
+
         Ok(SpatialIndex {
             schema,
             evaluator,
@@ -283,6 +319,8 @@ impl SpatialIndexBuilder {
             visited_left_side,
             probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
             reservation: self.reservation,
+            cached_geometries,
+            cached_geometry_reservation: geometry_reservation,
         })
     }
 }
@@ -331,6 +369,15 @@ pub(crate) struct SpatialIndex {
     /// Cleared on `SpatialIndex` drop
     #[expect(dead_code)]
     reservation: MemoryReservation,
+
+    /// Cached vector of geometries for KNN queries to avoid repeated 
WKB-to-geometry conversions
+    /// This is computed once during index building for performance 
optimization
+    cached_geometries: Vec<Geometry<f64>>,
+
+    /// Memory reservation for tracking the memory usage of cached geometries
+    /// Cleared on `SpatialIndex` drop
+    #[expect(dead_code)]
+    cached_geometry_reservation: MemoryReservation,
 }
 
 /// Indexed batch containing the original record batch and the evaluated 
geometry array.
@@ -385,6 +432,7 @@ impl SpatialIndex {
         );
         let refiner_reservation = reservation.split(0);
         let refiner_reservation = ConcurrentReservation::try_new(0, 
refiner_reservation).unwrap();
+        let cached_geometry_reservation = reservation.split(0);
         let rtree = RTreeBuilder::<f32>::new(0).finish::<HilbertSort>();
         Self {
             schema,
@@ -398,6 +446,8 @@ impl SpatialIndex {
             visited_left_side: None,
             probe_threads_counter,
             reservation,
+            cached_geometries: Vec::new(),
+            cached_geometry_reservation,
         }
     }
 
@@ -507,20 +557,8 @@ impl SpatialIndex {
             }
         };
 
-        // Create a vector of geometries for all indexed items
-        let mut geometries: Vec<Geometry<f64>> = Vec::new();
-        let mut geometry_to_position: Vec<(i32, i32)> = Vec::new();
-
-        for (batch_idx, indexed_batch) in 
self.indexed_batches.iter().enumerate() {
-            for (row_idx, wkb_opt) in 
indexed_batch.geom_array.wkbs().iter().enumerate() {
-                if let Some(wkb) = wkb_opt.as_ref() {
-                    if let Ok(geom) = item_to_geometry(wkb) {
-                        geometries.push(geom);
-                        geometry_to_position.push((batch_idx as i32, row_idx 
as i32));
-                    }
-                }
-            }
-        }
+        // Use pre-computed cached geometries for performance
+        let geometries = &self.cached_geometries;
 
         if geometries.is_empty() {
             return Ok(JoinResultMetrics {
@@ -544,7 +582,7 @@ impl SpatialIndex {
             Some(k as usize),
             None, // no max_distance filter
             distance_metric.as_ref(),
-            &geometries,
+            geometries,
         );
 
         if initial_results.is_empty() {
@@ -657,10 +695,10 @@ impl SpatialIndex {
             }
         }
 
-        // Convert results to build_batch_positions
+        // Convert results to build_batch_positions using existing 
data_id_to_batch_pos mapping
         for &result_idx in &final_results {
-            if (result_idx as usize) < geometry_to_position.len() {
-                build_batch_positions.push(geometry_to_position[result_idx as 
usize]);
+            if (result_idx as usize) < self.data_id_to_batch_pos.len() {
+                
build_batch_positions.push(self.data_id_to_batch_pos[result_idx as usize]);
             }
         }
 

Reply via email to