Kontinuation commented on code in PR #593:
URL: https://github.com/apache/sedona-db/pull/593#discussion_r2788850485
##########
rust/sedona-spatial-join/src/exec.rs:
##########
@@ -443,121 +443,43 @@ impl ExecutionPlan for SpatialJoinExec {
&self,
partition: usize,
context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- match &self.on {
- SpatialPredicate::KNearestNeighbors(_) =>
self.execute_knn(partition, context),
- _ => {
- // Regular spatial join logic - standard left=build,
right=probe semantics
- let session_config = context.session_config();
-
- // Regular join semantics: left is build, right is probe
- let (build_plan, probe_plan) = (&self.left, &self.right);
-
- // A OnceFut for preparing the spatial join components once.
- let once_fut_spatial_join_components = {
- let mut once_async =
self.once_async_spatial_join_components.lock();
- once_async
- .get_or_insert(OnceAsync::default())
- .try_once(|| {
- let build_side = build_plan;
-
- let num_partitions =
build_side.output_partitioning().partition_count();
- let mut build_streams =
Vec::with_capacity(num_partitions);
- for k in 0..num_partitions {
- let stream = build_side.execute(k,
Arc::clone(&context))?;
- build_streams.push(stream);
- }
-
- let probe_thread_count =
-
self.right.output_partitioning().partition_count();
- let spatial_join_components_builder =
SpatialJoinComponentsBuilder::new(
- Arc::clone(&context),
- build_side.schema(),
- self.on.clone(),
- self.join_type,
- probe_thread_count,
- self.metrics.clone(),
- self.seed,
- );
-
Ok(spatial_join_components_builder.build(build_streams))
- })?
- };
-
- let column_indices_after_projection = match &self.projection {
- Some(projection) => projection
- .iter()
- .map(|i| self.column_indices[*i].clone())
- .collect(),
- None => self.column_indices.clone(),
- };
-
- let probe_stream = probe_plan.execute(partition,
Arc::clone(&context))?;
-
- // For regular joins: probe is right side (index 1)
- let probe_side_ordered =
- self.maintains_input_order()[1] &&
self.right.output_ordering().is_some();
-
- Ok(Box::pin(SpatialJoinStream::new(
- partition,
- self.schema(),
- &self.on,
- self.filter.clone(),
- self.join_type,
- probe_stream,
- column_indices_after_projection,
- probe_side_ordered,
- session_config,
- context.runtime_env(),
- &self.metrics,
- once_fut_spatial_join_components,
- Arc::clone(&self.once_async_spatial_join_components),
- )))
- }
- }
- }
-}
-
-impl SpatialJoinExec {
- /// Execute KNN (K-Nearest Neighbors) spatial join with specialized logic
for asymmetric KNN semantics
- fn execute_knn(
- &self,
- partition: usize,
- context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let session_config = context.session_config();
- // Extract KNN predicate for type safety
- let knn_pred = match &self.on {
- SpatialPredicate::KNearestNeighbors(knn_pred) => knn_pred,
- _ => unreachable!("execute_knn called with non-KNN predicate"),
+ // Determine build/probe plans based on predicate type.
+ // For KNN joins, the probe/build assignment is dynamic based on the
KNN predicate's
+ // probe_side. For regular spatial joins, left is always build and
right is always probe.
+ let (build_plan, probe_plan) = match &self.on {
+ SpatialPredicate::KNearestNeighbors(knn_pred) =>
determine_knn_build_probe_plans(
+ knn_pred,
+ &self.left,
+ &self.right,
+ &self.join_schema,
+ )?,
+ _ => (&self.left, &self.right),
};
- // Determine which execution plan should be build vs probe using join
schema analysis
- let (build_plan, probe_plan) =
- determine_knn_build_probe_plans(knn_pred, &self.left, &self.right,
&self.join_schema)?;
-
- // Determine if probe plan is the left execution plan (for column
index swapping logic)
- let actual_probe_plan_is_left = std::ptr::eq(probe_plan.as_ref(),
self.left.as_ref());
+ // Determine which input index corresponds to the probe side for
ordering checks
+ let probe_is_left = std::ptr::eq(probe_plan.as_ref(),
self.left.as_ref());
+ let probe_input_index = if probe_is_left { 0 } else { 1 };
Review Comment:
That's terrible. I'll fix it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]