Kontinuation commented on code in PR #465:
URL: https://github.com/apache/sedona-db/pull/465#discussion_r2697720594


##########
rust/sedona-spatial-join-gpu/src/index/spatial_index_builder.rs:
##########
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::ensure_binary_array;
+use crate::utils::join_utils::need_produce_result_in_final;
+use crate::{
+    evaluated_batch::EvaluatedBatch,
+    index::{spatial_index::SpatialIndex, BuildPartition},
+    operand_evaluator::create_operand_evaluator,
+    spatial_predicate::SpatialPredicate,
+};
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat;
+use arrow_array::RecordBatch;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use datafusion_physical_plan::metrics;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, 
MetricBuilder};
+use futures::StreamExt;
+use parking_lot::Mutex;
+use sedona_common::SpatialJoinOptions;
+use sedona_libgpuspatial::GpuSpatial;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub struct SpatialIndexBuilder {
+    spatial_predicate: SpatialPredicate,
+    options: SpatialJoinOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    build_batch: EvaluatedBatch,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct SpatialJoinBuildMetrics {
+    // Total time for concatenating build-side batches
+    pub(crate) concat_time: metrics::Time,
+    /// Total time for loading build-side geometries to GPU
+    pub(crate) load_time: metrics::Time,
+    /// Total time for collecting build-side of join
+    pub(crate) build_time: metrics::Time,
+}
+
+impl SpatialJoinBuildMetrics {
+    pub fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
+        Self {
+            concat_time: 
MetricBuilder::new(metrics).subset_time("concat_time", partition),
+            load_time: MetricBuilder::new(metrics).subset_time("load_time", 
partition),
+            build_time: MetricBuilder::new(metrics).subset_time("build_time", 
partition),
+        }
+    }
+}
+
+impl SpatialIndexBuilder {
+    pub fn new(
+        spatial_predicate: SpatialPredicate,
+        options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            spatial_predicate,
+            options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            build_batch: EvaluatedBatch::default(),
+        }
+    }
+    /// Build visited bitmaps for tracking left-side indices in outer joins.
+    fn build_visited_bitmap(&mut self) -> 
Result<Option<Mutex<BooleanBufferBuilder>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let total_rows = self.build_batch.batch.num_rows();
+
+        let mut bitmap = BooleanBufferBuilder::new(total_rows);
+        bitmap.append_n(total_rows, false);
+
+        Ok(Some(Mutex::new(bitmap)))
+    }
+
+    pub fn finish(mut self) -> Result<SpatialIndex> {
+        if self.build_batch.batch.num_rows() == 0 {
+            return SpatialIndex::new_empty(
+                EvaluatedBatch::default(),
+                self.spatial_predicate,
+                self.options,
+                AtomicUsize::new(self.probe_threads_count),
+            );
+        }
+
+        let mut gs = GpuSpatial::new()
+            .and_then(|mut gs| {
+                gs.init(
+                    self.probe_threads_count as u32,
+                    self.options.gpu.device_id as i32,
+                )?;
+                gs.clear()?;
+                Ok(gs)
+            })
+            .map_err(|e| {
+                DataFusionError::Execution(format!("Failed to initialize GPU 
context {e:?}"))
+            })?;
+
+        let build_timer = self.metrics.build_time.timer();
+        // Ensure the spatial index is clear before building
+        gs.clear().map_err(|e| {
+            DataFusionError::Execution(format!("Failed to clear GPU spatial 
index {e:?}"))
+        })?;
+        // Add rectangles from build side to the spatial index
+        gs.push_build(&self.build_batch.geom_array.rects)
+            .map_err(|e| {
+                DataFusionError::Execution(format!(
+                    "Failed to add geometries to GPU spatial index {e:?}"
+                ))
+            })?;
+        gs.finish_building().map_err(|e| {
+            DataFusionError::Execution(format!("Failed to build spatial index 
on GPU {e:?}"))
+        })?;
+        build_timer.done();
+
+        let num_rows = self.build_batch.batch.num_rows();
+
+        log::info!("Total build side rows: {}", num_rows);
+
+        let geom_array = self.build_batch.geom_array.geometry_array.clone();
+
+        let load_timer = self.metrics.load_time.timer();
+        gs.load_build_array(&geom_array).map_err(|e| {
+            DataFusionError::Execution(format!("GPU spatial query failed: 
{:?}", e))
+        })?;
+        load_timer.done();
+
+        let visited_left_side = self.build_visited_bitmap()?;
+        // Build index for rectangle queries
+        Ok(SpatialIndex::new(
+            create_operand_evaluator(&self.spatial_predicate, 
self.options.clone()),
+            self.build_batch,
+            visited_left_side,
+            Arc::new(gs),
+            AtomicUsize::new(self.probe_threads_count),
+        ))
+    }
+
+    pub async fn add_partitions(&mut self, partitions: Vec<BuildPartition>) -> 
Result<()> {
+        let mut indexed_batches: Vec<EvaluatedBatch> = Vec::new();
+        for partition in partitions {
+            let mut stream = partition.build_side_batch_stream;
+            while let Some(batch) = stream.next().await {
+                indexed_batches.push(batch?)
+            }
+        }
+
+        let concat_timer = self.metrics.concat_time.timer();
+        let all_record_batches: Vec<&RecordBatch> =
+            indexed_batches.iter().map(|batch| &batch.batch).collect();
+
+        if all_record_batches.is_empty() {
+            return Err(DataFusionError::Internal(
+                "Build side has no batches".into(),
+            ));
+        }
+
+        // 2. Extract the schema from the first batch
+        let schema = all_record_batches[0].schema();
+
+        // 3. Pass the slice of references (&[&RecordBatch])
+        self.build_batch.batch = arrow::compute::concat_batches(&schema, 
all_record_batches)
+            .map_err(|e| {
+                DataFusionError::Execution(format!("Failed to concatenate left 
batches: {}", e))
+            })?;

Review Comment:
   I wonder if it is necessary to concatenate all the batches and 
`gs.push_build` only once. It would be easier for us to unify the CPU and GPU 
based spatial join if we support `push_build` multiple times.
   
   There might be some performance implications switching to multiple 
`gc.push_build` calls. If it is not feasible, we need to be aware that 
concatenating batches doubles the memory requirement, so we need to reserve 
more memory for building GPU indexes.



##########
c/sedona-libgpuspatial/src/lib.rs:
##########
@@ -77,197 +140,411 @@ impl GpuSpatialContext {
         #[cfg(gpu_available)]
         {
             Ok(Self {
-                joiner: None,
-                context: None,
+                rt_engine: None,
+                index: None,
+                refiner: None,
                 initialized: false,
             })
         }
     }
 
-    pub fn init(&mut self) -> Result<()> {
+    pub fn init(&mut self, concurrency: u32, device_id: i32) -> Result<()> {
         #[cfg(not(gpu_available))]
         {
+            let _ = (concurrency, device_id);
             Err(GpuSpatialError::GpuNotAvailable)
         }
 
         #[cfg(gpu_available)]
         {
-            let mut joiner = GpuSpatialJoinerWrapper::new();
-
             // Get PTX path from OUT_DIR
             let out_path = std::path::PathBuf::from(env!("OUT_DIR"));
             let ptx_root = out_path.join("share/gpuspatial/shaders");
             let ptx_root_str = ptx_root
                 .to_str()
                 .ok_or_else(|| GpuSpatialError::Init("Invalid PTX 
path".to_string()))?;
 
-            // Initialize with concurrency of 1 for now
-            joiner.init(1, ptx_root_str)?;
+            let rt_engine = GpuSpatialRTEngineWrapper::try_new(device_id, 
ptx_root_str)?;
 
-            // Create context
-            let mut ctx = GpuSpatialJoinerContext {
-                last_error: std::ptr::null(),
-                private_data: std::ptr::null_mut(),
-                build_indices: std::ptr::null_mut(),
-                stream_indices: std::ptr::null_mut(),
-            };
-            joiner.create_context(&mut ctx);
+            self.rt_engine = Some(Arc::new(Mutex::new(rt_engine)));
+
+            let index = GpuSpatialIndexFloat2DWrapper::try_new(
+                self.rt_engine.as_ref().unwrap(),
+                concurrency,
+            )?;
+
+            self.index = Some(index);
+
+            let refiner =
+                
GpuSpatialRefinerWrapper::try_new(self.rt_engine.as_ref().unwrap(), 
concurrency)?;
+            self.refiner = Some(refiner);
 
-            self.joiner = Some(joiner);
-            self.context = Some(ctx);
             self.initialized = true;
             Ok(())
         }
     }
 
-    #[cfg(gpu_available)]
-    pub fn get_joiner_mut(&mut self) -> Option<&mut GpuSpatialJoinerWrapper> {
-        self.joiner.as_mut()
-    }
+    pub fn is_gpu_available() -> bool {
+        #[cfg(not(gpu_available))]
+        {
+            false
+        }
+        #[cfg(gpu_available)]
+        {
+            let nvml = match Nvml::init() {
+                Ok(instance) => instance,
+                Err(_) => return false,
+            };
 
-    #[cfg(gpu_available)]
-    pub fn get_context_mut(&mut self) -> Option<&mut GpuSpatialJoinerContext> {
-        self.context.as_mut()
+            // Check if the device count is greater than zero
+            match nvml.device_count() {
+                Ok(count) => count > 0,
+                Err(_) => false,
+            }
+        }
     }
 
     pub fn is_initialized(&self) -> bool {
         self.initialized
     }
 
-    /// Perform spatial join between two geometry arrays
-    pub fn spatial_join(
-        &mut self,
-        left_geom: arrow_array::ArrayRef,
-        right_geom: arrow_array::ArrayRef,
-        predicate: SpatialPredicate,
-    ) -> Result<(Vec<u32>, Vec<u32>)> {
+    /// Clear previous build data
+    pub fn clear(&mut self) -> Result<()> {
         #[cfg(not(gpu_available))]
         {
-            let _ = (left_geom, right_geom, predicate);
             Err(GpuSpatialError::GpuNotAvailable)
         }
-
         #[cfg(gpu_available)]
         {
             if !self.initialized {
-                return Err(GpuSpatialError::Init("Context not 
initialized".into()));
+                return Err(GpuSpatialError::Init("GpuSpatial not 
initialized".into()));
             }
 
-            let joiner = self
-                .joiner
+            let index = self
+                .index
                 .as_mut()
-                .ok_or_else(|| GpuSpatialError::Init("GPU joiner not 
available".into()))?;
+                .ok_or_else(|| GpuSpatialError::Init("GPU index is not 
available".into()))?;
 
             // Clear previous build data
-            joiner.clear();
-
-            // Push build data (left side)
-            log::info!(
-                "DEBUG: Pushing {} geometries to GPU (build side)",
-                left_geom.len()
-            );
-            log::info!("DEBUG: Left array data type: {:?}", 
left_geom.data_type());
-            if let Some(binary_arr) = left_geom
-                .as_any()
-                .downcast_ref::<arrow_array::BinaryArray>()
-            {
-                log::info!("DEBUG: Left binary array has {} values", 
binary_arr.len());
-                if binary_arr.len() > 0 {
-                    let first_wkb = binary_arr.value(0);
-                    log::info!(
-                        "DEBUG: First left WKB length: {}, first bytes: {:?}",
-                        first_wkb.len(),
-                        &first_wkb[..8.min(first_wkb.len())]
-                    );
-                }
-            }
+            index.clear();
+            Ok(())
+        }
+    }
+
+    pub fn push_build(&mut self, rects: &[Rect<f32>]) -> Result<()> {
+        #[cfg(not(gpu_available))]
+        {
+            let _ = rects;
+            Err(GpuSpatialError::GpuNotAvailable)
+        }
+        #[cfg(gpu_available)]
+        {
+            let index = self
+                .index
+                .as_mut()
+                .ok_or_else(|| GpuSpatialError::Init("GPU index not 
available".into()))?;
 
-            joiner.push_build(&left_geom, 0, left_geom.len() as i64)?;
-            joiner.finish_building()?;
+            unsafe { index.push_build(rects.as_ptr() as *const f32, 
rects.len() as u32) }
+        }
+    }
+
+    pub fn finish_building(&mut self) -> Result<()> {
+        #[cfg(not(gpu_available))]
+        return Err(GpuSpatialError::GpuNotAvailable);
 
-            // Recreate context after building (required by libgpuspatial)
-            let mut new_context = 
libgpuspatial_glue_bindgen::GpuSpatialJoinerContext {
+        #[cfg(gpu_available)]
+        self.index
+            .as_mut()
+            .ok_or_else(|| GpuSpatialError::Init("GPU index not 
available".into()))?
+            .finish_building()
+    }
+
+    pub fn probe(&self, rects: &[Rect<f32>]) -> Result<(Vec<u32>, Vec<u32>)> {
+        #[cfg(not(gpu_available))]
+        {
+            let _ = rects;
+            Err(GpuSpatialError::GpuNotAvailable)
+        }
+
+        #[cfg(gpu_available)]
+        {
+            let index = self
+                .index
+                .as_ref()
+                .ok_or_else(|| GpuSpatialError::Init("GPU index not 
available".into()))?;
+            // Create context
+            let mut ctx = GpuSpatialIndexContext {
                 last_error: std::ptr::null(),
-                private_data: std::ptr::null_mut(),
                 build_indices: std::ptr::null_mut(),
-                stream_indices: std::ptr::null_mut(),
+                probe_indices: std::ptr::null_mut(),
             };
-            joiner.create_context(&mut new_context);
-            self.context = Some(new_context);
-            let context = self.context.as_mut().unwrap();
-            // Push stream data (right side) and perform join
-            let gpu_predicate = predicate.into();
-            joiner.push_stream(
-                context,
-                &right_geom,
-                0,
-                right_geom.len() as i64,
-                gpu_predicate,
-                0, // array_index_offset
-            )?;
+            index.create_context(&mut ctx);
 
-            // Get results
-            let build_indices = 
joiner.get_build_indices_buffer(context).to_vec();
-            let stream_indices = 
joiner.get_stream_indices_buffer(context).to_vec();
+            // Push stream data (probe side) and perform join
+            unsafe {
+                index.probe(&mut ctx, rects.as_ptr() as *const f32, 
rects.len() as u32)?;
+            }
 
-            Ok((build_indices, stream_indices))
+            // Get results
+            let build_indices = index.get_build_indices_buffer(&mut 
ctx).to_vec();
+            let probe_indices = index.get_probe_indices_buffer(&mut 
ctx).to_vec();
+            index.destroy_context(&mut ctx);
+            Ok((build_indices, probe_indices))

Review Comment:
   ctx will leak when `index.probe` fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to