This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 7ceb6b1 perf: Cache geometries in KNN queries to eliminate redundant
WKB conversions (#53)
7ceb6b1 is described below
commit 7ceb6b1da95308c592638e3e473f40d4c06d9134
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Sep 11 08:11:20 2025 -0700
perf: Cache geometries in KNN queries to eliminate redundant WKB
conversions (#53)
---
benchmarks/test_knn.py | 226 ++++++++++++++++++++++++++++++++++
rust/sedona-spatial-join/src/index.rs | 74 ++++++++---
2 files changed, 282 insertions(+), 18 deletions(-)
diff --git a/benchmarks/test_knn.py b/benchmarks/test_knn.py
new file mode 100644
index 0000000..524363a
--- /dev/null
+++ b/benchmarks/test_knn.py
@@ -0,0 +1,226 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import pytest
+from test_bench_base import TestBenchBase
+from sedonadb.testing import SedonaDB
+
+
+class TestBenchKNN(TestBenchBase):
+ def setup_class(self):
+ """Setup test data for KNN benchmarks"""
+ self.sedonadb = SedonaDB.create_or_skip()
+
+ # Create building-like polygons (index side - fewer, larger geometries)
+ # Note: Dataset sizes are limited to avoid performance issues observed
when processing
+ # very large synthetic datasets. Large synthetic datasets have been
observed to cause
+ # memory pressure or performance degradation in DataFusion operations.
+ building_options = {
+ "geom_type": "Polygon",
+ "target_rows": 2_000, # Reasonable size for benchmarking
+ "vertices_per_linestring_range": [4, 8],
+ "size_range": [0.001, 0.01],
+ "seed": 42,
+ }
+
+ building_query = f"""
+ SELECT
+ geometry as geom,
+ round(random() * 1000) as building_id,
+ 'Building_' || cast(round(random() * 1000) as varchar) as name
+ FROM sd_random_geometry('{json.dumps(building_options)}')
+ """
+ building_tab = self.sedonadb.execute_and_collect(building_query)
+ self.sedonadb.create_table_arrow("knn_buildings", building_tab)
+
+ # Create trip pickup points (probe side - many small geometries)
+ trip_options = {
+ "geom_type": "Point",
+ "target_rows": 10_000,
+ "seed": 43,
+ }
+
+ trip_query = f"""
+ SELECT
+ geometry as geom,
+ round(random() * 100000) as trip_id
+ FROM sd_random_geometry('{json.dumps(trip_options)}')
+ """
+ trip_tab = self.sedonadb.execute_and_collect(trip_query)
+ self.sedonadb.create_table_arrow("knn_trips", trip_tab)
+
+ # Create a smaller test dataset for quick tests
+ small_building_query = """
+ SELECT * FROM knn_buildings LIMIT 1000
+ """
+ small_building_tab =
self.sedonadb.execute_and_collect(small_building_query)
+ self.sedonadb.create_table_arrow("knn_buildings_small",
small_building_tab)
+
+ small_trip_query = """
+ SELECT * FROM knn_trips LIMIT 5000
+ """
+ small_trip_tab = self.sedonadb.execute_and_collect(small_trip_query)
+ self.sedonadb.create_table_arrow("knn_trips_small", small_trip_tab)
+
+ @pytest.mark.parametrize("k", [1, 5, 10])
+ @pytest.mark.parametrize("use_spheroid", [False, True])
+ @pytest.mark.parametrize("dataset_size", ["small", "large"])
+ def test_knn_performance(self, benchmark, k, use_spheroid, dataset_size):
+ """Benchmark KNN query performance with different parameters"""
+
+ if dataset_size == "small":
+ trip_table = "knn_trips_small"
+ building_table = "knn_buildings_small"
+ trip_limit = 100 # Test with 100 trips
+ else:
+ trip_table = "knn_trips_small"
+ building_table = "knn_buildings"
+ trip_limit = 500
+
+ spheroid_str = "TRUE" if use_spheroid else "FALSE"
+
+ def run_knn_query():
+ query = f"""
+ WITH trip_sample AS (
+ SELECT trip_id, geom as trip_geom
+ FROM {trip_table}
+ LIMIT {trip_limit}
+ ),
+ building_with_geom AS (
+ SELECT building_id, name, geom as building_geom
+ FROM {building_table}
+ )
+ SELECT
+ t.trip_id,
+ b.building_id,
+ b.name,
+ ST_Distance(t.trip_geom, b.building_geom) as distance
+ FROM trip_sample t
+ JOIN building_with_geom b ON ST_KNN(t.trip_geom,
b.building_geom, {k}, {spheroid_str})
+ ORDER BY t.trip_id, distance
+ """
+ result = self.sedonadb.execute_and_collect(query)
+ return len(result) # Return result count for verification
+
+ # Run the benchmark
+ result_count = benchmark(run_knn_query)
+
+ # Verify we got the expected number of results (trips * k)
+ expected_count = trip_limit * k
+ assert result_count == expected_count, (
+ f"Expected {expected_count} results, got {result_count}"
+ )
+
+ @pytest.mark.parametrize("k", [1, 5, 10, 20])
+ def test_knn_scalability_by_k(self, benchmark, k):
+ """Test how KNN performance scales with increasing k values"""
+
+ def run_knn_query():
+ query = f"""
+ WITH trip_sample AS (
+ SELECT trip_id, geom as trip_geom
+ FROM knn_trips_small
+ LIMIT 50 -- Small sample for k scaling test
+ )
+ SELECT
+ COUNT(*) as result_count
+ FROM trip_sample t
+ JOIN knn_buildings_small b ON ST_KNN(t.trip_geom, b.geom, {k},
FALSE)
+ """
+ result = self.sedonadb.execute_and_collect(query)
+ return result.to_pandas().iloc[0]["result_count"]
+
+ result_count = benchmark(run_knn_query)
+ expected_count = 50 * k # 50 trips * k neighbors each
+ assert result_count == expected_count, (
+ f"Expected {expected_count} results, got {result_count}"
+ )
+
+ def test_knn_correctness(self):
+ """Verify KNN returns results in correct distance order"""
+
+ # Test with a known point and verify ordering
+ query = """
+ WITH test_point AS (
+ SELECT ST_Point(0.0, 0.0) as query_geom
+ )
+ SELECT
+ ST_Distance(test_point.query_geom, b.geom) as distance,
+ b.building_id
+ FROM test_point
+ JOIN knn_buildings_small b ON ST_KNN(test_point.query_geom,
b.geom, 5, FALSE)
+ ORDER BY distance
+ """
+
+ result = self.sedonadb.execute_and_collect(query).to_pandas()
+
+ # Verify we got 5 results
+ assert len(result) == 5, f"Expected 5 results, got {len(result)}"
+
+ # Verify distances are in ascending order
+ distances = result["distance"].tolist()
+ assert distances == sorted(distances), (
+ f"Results not ordered by distance: {distances}"
+ )
+
+ # Verify all distances are non-negative
+ assert all(d >= 0 for d in distances), f"Found negative distances:
{distances}"
+
+ def test_knn_tie_breaking(self):
+ """Test KNN behavior with tie-breaking when geometries have equal
distances"""
+
+ # Create test data with known equal distances
+ setup_query = """
+ WITH test_points AS (
+ SELECT 1 as id, ST_Point(1.0, 0.0) as geom
+ UNION ALL
+ SELECT 2 as id, ST_Point(-1.0, 0.0) as geom
+ UNION ALL
+ SELECT 3 as id, ST_Point(0.0, 1.0) as geom
+ UNION ALL
+ SELECT 4 as id, ST_Point(0.0, -1.0) as geom
+ UNION ALL
+ SELECT 5 as id, ST_Point(2.0, 0.0) as geom
+ )
+ SELECT * FROM test_points
+ """
+ tie_test_tab = self.sedonadb.execute_and_collect(setup_query)
+ self.sedonadb.create_table_arrow("knn_tie_test", tie_test_tab)
+
+ # Query for 2 nearest neighbors from origin - should get 2 of the 4
equidistant points
+ query = """
+ WITH query_point AS (
+ SELECT ST_Point(0.0, 0.0) as geom
+ )
+ SELECT
+ t.id,
+ ST_Distance(query_point.geom, t.geom) as distance
+ FROM query_point
+ JOIN knn_tie_test t ON ST_KNN(query_point.geom, t.geom, 2, FALSE)
+ ORDER BY distance, t.id
+ """
+
+ result = self.sedonadb.execute_and_collect(query).to_pandas()
+
+ # Should get exactly 2 results
+ assert len(result) == 2, f"Expected 2 results, got {len(result)}"
+
+ # Both should be at distance 1.0 (the 4 equidistant points)
+ distances = result["distance"].tolist()
+ assert all(abs(d - 1.0) < 1e-6 for d in distances), (
+ f"Expected distances ~1.0, got {distances}"
+ )
diff --git a/rust/sedona-spatial-join/src/index.rs
b/rust/sedona-spatial-join/src/index.rs
index c683593..59335f9 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -235,6 +235,32 @@ impl SpatialIndexBuilder {
geom_idx_vec
}
+ /// Build cached geometries for KNN queries to avoid repeated WKB
conversions
+ /// Returns both geometries and total WKB size for memory estimation
+ fn build_cached_geometries(indexed_batches: &[IndexedBatch]) ->
(Vec<Geometry<f64>>, usize) {
+ let mut geometries = Vec::new();
+ let mut total_wkb_size = 0;
+
+ for indexed_batch in indexed_batches.iter() {
+ for wkb_opt in indexed_batch.geom_array.wkbs().iter() {
+ if let Some(wkb) = wkb_opt.as_ref() {
+ if let Ok(geom) = item_to_geometry(wkb) {
+ geometries.push(geom);
+ total_wkb_size += wkb.buf().len();
+ }
+ }
+ }
+ }
+
+ (geometries, total_wkb_size)
+ }
+
+ /// Estimate the memory usage of cached geometries based on WKB size with
overhead
+ fn estimate_geometry_memory(wkb_size: usize) -> usize {
+ // Use WKB size as base + overhead for geo::Geometry objects
+ wkb_size * 2
+ }
+
/// Finish building and return the completed SpatialIndex.
pub fn finish(mut self, schema: SchemaRef) -> Result<SpatialIndex> {
if self.indexed_batches.is_empty() {
@@ -271,6 +297,16 @@ impl SpatialIndexBuilder {
ConcurrentReservation::try_new(REFINER_RESERVATION_PREALLOC_SIZE,
refiner_reservation)
.unwrap();
+ // Pre-compute geometries for KNN queries to avoid repeated
WKB-to-geometry conversions
+ let (cached_geometries, total_wkb_size) =
+ Self::build_cached_geometries(&self.indexed_batches);
+
+ // Reserve memory for cached geometries using WKB size with overhead
+ let geometry_memory_estimate =
Self::estimate_geometry_memory(total_wkb_size);
+ let geometry_consumer =
MemoryConsumer::new("SpatialJoinGeometryCache");
+ let mut geometry_reservation =
geometry_consumer.register(&self.memory_pool);
+ geometry_reservation.try_grow(geometry_memory_estimate)?;
+
Ok(SpatialIndex {
schema,
evaluator,
@@ -283,6 +319,8 @@ impl SpatialIndexBuilder {
visited_left_side,
probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
reservation: self.reservation,
+ cached_geometries,
+ cached_geometry_reservation: geometry_reservation,
})
}
}
@@ -331,6 +369,15 @@ pub(crate) struct SpatialIndex {
/// Cleared on `SpatialIndex` drop
#[expect(dead_code)]
reservation: MemoryReservation,
+
+ /// Cached vector of geometries for KNN queries to avoid repeated
WKB-to-geometry conversions
+ /// This is computed once during index building for performance
optimization
+ cached_geometries: Vec<Geometry<f64>>,
+
+ /// Memory reservation for tracking the memory usage of cached geometries
+ /// Cleared on `SpatialIndex` drop
+ #[expect(dead_code)]
+ cached_geometry_reservation: MemoryReservation,
}
/// Indexed batch containing the original record batch and the evaluated
geometry array.
@@ -385,6 +432,7 @@ impl SpatialIndex {
);
let refiner_reservation = reservation.split(0);
let refiner_reservation = ConcurrentReservation::try_new(0,
refiner_reservation).unwrap();
+ let cached_geometry_reservation = reservation.split(0);
let rtree = RTreeBuilder::<f32>::new(0).finish::<HilbertSort>();
Self {
schema,
@@ -398,6 +446,8 @@ impl SpatialIndex {
visited_left_side: None,
probe_threads_counter,
reservation,
+ cached_geometries: Vec::new(),
+ cached_geometry_reservation,
}
}
@@ -507,20 +557,8 @@ impl SpatialIndex {
}
};
- // Create a vector of geometries for all indexed items
- let mut geometries: Vec<Geometry<f64>> = Vec::new();
- let mut geometry_to_position: Vec<(i32, i32)> = Vec::new();
-
- for (batch_idx, indexed_batch) in
self.indexed_batches.iter().enumerate() {
- for (row_idx, wkb_opt) in
indexed_batch.geom_array.wkbs().iter().enumerate() {
- if let Some(wkb) = wkb_opt.as_ref() {
- if let Ok(geom) = item_to_geometry(wkb) {
- geometries.push(geom);
- geometry_to_position.push((batch_idx as i32, row_idx
as i32));
- }
- }
- }
- }
+ // Use pre-computed cached geometries for performance
+ let geometries = &self.cached_geometries;
if geometries.is_empty() {
return Ok(JoinResultMetrics {
@@ -544,7 +582,7 @@ impl SpatialIndex {
Some(k as usize),
None, // no max_distance filter
distance_metric.as_ref(),
- &geometries,
+ geometries,
);
if initial_results.is_empty() {
@@ -657,10 +695,10 @@ impl SpatialIndex {
}
}
- // Convert results to build_batch_positions
+ // Convert results to build_batch_positions using existing
data_id_to_batch_pos mapping
for &result_idx in &final_results {
- if (result_idx as usize) < geometry_to_position.len() {
- build_batch_positions.push(geometry_to_position[result_idx as
usize]);
+ if (result_idx as usize) < self.data_id_to_batch_pos.len() {
+
build_batch_positions.push(self.data_id_to_batch_pos[result_idx as usize]);
}
}