This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 603559ab [SEDONA-290] RDD Spatial Joins should follow the iterator 
model (#851)
603559ab is described below

commit 603559ab4e51cc7a7b58d382ab020804a5bdd196
Author: Jia Yu <[email protected]>
AuthorDate: Tue Jun 6 23:52:49 2023 -0700

    [SEDONA-290] RDD Spatial Joins should follow the iterator model (#851)
---
 .../joinJudgement/DynamicIndexLookupJudgement.java | 112 +--------
 .../sedona/core/joinJudgement/JudgementBase.java   | 250 ++++++++++++++++++++-
 .../joinJudgement/LeftIndexLookupJudgement.java    |  62 +++--
 .../core/joinJudgement/NestedLoopJudgement.java    |  51 +++--
 .../joinJudgement/RightIndexLookupJudgement.java   |  62 +++--
 .../sedona/core/spatialOperator/JoinQuery.java     |   9 +-
 6 files changed, 366 insertions(+), 180 deletions(-)

diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
index 0a651381..7767c969 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
@@ -20,15 +20,11 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.sedona.core.enums.IndexType;
 import org.apache.sedona.core.enums.JoinBuildSide;
 import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.sedona.core.utils.TimeUtils;
-import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Envelope;
 import org.locationtech.jts.geom.Geometry;
@@ -36,28 +32,16 @@ import org.locationtech.jts.index.SpatialIndex;
 import org.locationtech.jts.index.quadtree.Quadtree;
 import org.locationtech.jts.index.strtree.STRtree;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
 
 public class DynamicIndexLookupJudgement<T extends Geometry, U extends 
Geometry>
-        extends JudgementBase
+        extends JudgementBase<T, U>
         implements FlatMapFunction2<Iterator<U>, Iterator<T>, Pair<U, T>>, 
Serializable
 {
-
-    private static final Logger log = 
LogManager.getLogger(DynamicIndexLookupJudgement.class);
-
     private final IndexType indexType;
     private final JoinBuildSide joinBuildSide;
-    private final Metric buildCount;
-    private final Metric streamCount;
-    private final Metric resultCount;
-    private final Metric candidateCount;
 
     /**
      * @see JudgementBase
@@ -71,13 +55,9 @@ public class DynamicIndexLookupJudgement<T extends Geometry, 
U extends Geometry>
             Metric resultCount,
             Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
         this.indexType = indexType;
         this.joinBuildSide = joinBuildSide;
-        this.buildCount = buildCount;
-        this.streamCount = streamCount;
-        this.resultCount = resultCount;
-        this.candidateCount = candidateCount;
     }
 
     @Override
@@ -112,84 +92,16 @@ public class DynamicIndexLookupJudgement<T extends 
Geometry, U extends Geometry>
 
         return new Iterator<Pair<U, T>>()
         {
-            // A batch of pre-computed matches
-            private List<Pair<U, T>> batch = null;
-            // An index of the element from 'batch' to return next
-            private int nextIndex = 0;
-
-            private int shapeCnt = 0;
-
             @Override
             public boolean hasNext()
             {
-                if (batch != null) {
-                    return true;
-                }
-                else {
-                    return populateNextBatch();
-                }
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
 
             @Override
             public Pair<U, T> next()
             {
-                if (batch == null) {
-                    populateNextBatch();
-                }
-
-                if (batch != null) {
-                    final Pair<U, T> result = batch.get(nextIndex);
-                    nextIndex++;
-                    if (nextIndex >= batch.size()) {
-                        populateNextBatch();
-                        nextIndex = 0;
-                    }
-                    return result;
-                }
-
-                throw new NoSuchElementException();
-            }
-
-            private boolean populateNextBatch()
-            {
-                if (!streamShapes.hasNext()) {
-                    if (batch != null) {
-                        batch = null;
-                    }
-                    return false;
-                }
-
-                batch = new ArrayList<>();
-
-                while (streamShapes.hasNext()) {
-                    shapeCnt++;
-                    streamCount.add(1);
-                    final Geometry streamShape = streamShapes.next();
-                    final List candidates = 
spatialIndex.query(streamShape.getEnvelopeInternal());
-                    for (Object candidate : candidates) {
-                        candidateCount.add(1);
-                        final Geometry buildShape = (Geometry) candidate;
-                        if (buildLeft) {
-                            if (match(buildShape, streamShape)) {
-                                batch.add(Pair.of((U) buildShape, (T) 
streamShape));
-                                resultCount.add(1);
-                            }
-                        }
-                        else {
-                            if (match(streamShape, buildShape)) {
-                                batch.add(Pair.of((U) streamShape, (T) 
buildShape));
-                                resultCount.add(1);
-                            }
-                        }
-                    }
-                    logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
-                    if (!batch.isEmpty()) {
-                        return true;
-                    }
-                }
-
-                batch = null;
-                return false;
+                return nextBase(spatialIndex, streamShapes, buildLeft);
             }
 
             @Override
@@ -227,20 +139,4 @@ public class DynamicIndexLookupJudgement<T extends 
Geometry, U extends Geometry>
                 throw new IllegalArgumentException("Unsupported index type: " 
+ indexType);
         }
     }
-
-    private void log(String message, Object... params)
-    {
-        if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
-            final int partitionId = TaskContext.getPartitionId();
-            final long threadId = Thread.currentThread().getId();
-            log.info("[" + threadId + ", PID=" + partitionId + "] " + 
String.format(message, params));
-        }
-    }
-
-    private void logMilestone(long cnt, long threshold, String name)
-    {
-        if (cnt > 1 && cnt % threshold == 1) {
-            log("[%s] Reached a milestone: %d", name, cnt);
-        }
-    }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 54ff5d16..8b2d869f 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -19,11 +19,22 @@
 
 package org.apache.sedona.core.joinJudgement;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.sedona.core.spatialOperator.SpatialPredicateEvaluators;
+import org.apache.spark.TaskContext;
 import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.index.SpatialIndex;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * Base class for partition level join implementations.
@@ -31,19 +42,41 @@ import java.io.Serializable;
  * Provides `match` method to test whether a given pair of geometries 
satisfies join condition.
  * <p>
  */
-abstract class JudgementBase
+abstract class JudgementBase<T extends Geometry, U extends Geometry>
         implements Serializable
 {
+    private static final Logger log = 
LogManager.getLogger(JudgementBase.class);
 
     private final SpatialPredicate spatialPredicate;
     private transient SpatialPredicateEvaluators.SpatialPredicateEvaluator 
evaluator;
+    protected final Metric buildCount;
+    protected final Metric streamCount;
+    protected final Metric resultCount;
+    protected final Metric candidateCount;
+
+    private int shapeCnt;
+
+    // A batch of pre-computed matches
+    private List<Pair<U, T>> batch = null;
+    // An index of the element from 'batch' to return next
+    private int nextIndex = 0;
 
     /**
+     *
      * @param spatialPredicate spatial predicate as join condition
+     * @param buildCount num of geometries in build side
+     * @param streamCount num of geometries in stream side
+     * @param resultCount num of join results
+     * @param candidateCount num of candidate pairs to be refined by their 
real geometries
      */
-    protected JudgementBase(SpatialPredicate spatialPredicate)
+    protected JudgementBase(SpatialPredicate spatialPredicate, Metric 
buildCount, Metric streamCount, Metric resultCount, Metric candidateCount)
     {
         this.spatialPredicate = spatialPredicate;
+        this.buildCount = buildCount;
+        this.streamCount = streamCount;
+        this.resultCount = resultCount;
+        this.candidateCount = candidateCount;
+        this.shapeCnt = 0;
     }
 
     /**
@@ -58,8 +91,219 @@ abstract class JudgementBase
         evaluator = SpatialPredicateEvaluators.create(spatialPredicate);
     }
 
-    public boolean match(Geometry left, Geometry right)
+    private boolean match(Geometry left, Geometry right)
     {
         return evaluator.eval(left, right);
     }
+
+    /**
+     * Iterator model for the index-based join.
+     * It checks if there is a next match and populate it to the result.
+     * @param spatialIndex
+     * @param streamShapes
+     * @param buildLeft
+     * @return
+     */
+    protected boolean hasNextBase(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft)
+    {
+        if (batch != null) {
+            return true;
+        }
+        else {
+            return populateNextBatch(spatialIndex, streamShapes, buildLeft);
+        }
+    }
+
+    /**
+     * Iterator model for the nest loop join.
+     * It checks if there is a next match and populate it to the result.
+     * @param buildShapes
+     * @param streamShapes
+     * @return
+     */
+    protected boolean hasNextBase(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes)
+    {
+        if (batch != null) {
+            return true;
+        }
+        else {
+            return populateNextBatch(buildShapes, streamShapes);
+        }
+    }
+
+    /**
+     * Iterator model for the index-based join.
+     * It returns 1 pair in the current batch.
+     * Each batch contains a list of pairs of geometries that satisfy the join 
condition.
+     * The current batch is the result of the current stream shape against all 
the build shapes.
+     * @param spatialIndex
+     * @param streamShapes
+     * @param buildLeft
+     * @return
+     */
+    protected Pair<U, T> nextBase(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft) {
+        if (batch == null) {
+            populateNextBatch(spatialIndex, streamShapes, buildLeft);
+        }
+
+        if (batch != null) {
+            final Pair<U, T> result = batch.get(nextIndex);
+            nextIndex++;
+            if (nextIndex >= batch.size()) {
+                populateNextBatch(spatialIndex, streamShapes, buildLeft);
+                nextIndex = 0;
+            }
+            return result;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+    /**
+     * Iterator model for the nest loop join.
+     * It returns 1 pair in the current batch.
+     * Each batch contains a list of pairs of geometries that satisfy the join 
condition.
+     * The current batch is the result of the current stream shape against all 
the build shapes.
+     * @param buildShapes
+     * @param streamShapes
+     * @return
+     */
+    protected Pair<U, T> nextBase(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes) {
+        if (batch == null) {
+            populateNextBatch(buildShapes, streamShapes);
+        }
+
+        if (batch != null) {
+            final Pair<U, T> result = batch.get(nextIndex);
+            nextIndex++;
+            if (nextIndex >= batch.size()) {
+                populateNextBatch(buildShapes, streamShapes);
+                nextIndex = 0;
+            }
+            return result;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+    /**
+     * Populates the next batch of matches given the current shape in the 
stream side.
+     * It works as follows:
+     * 1. If there is no shape left in the stream side, it returns false.
+     * 2. If there are shapes left in the stream side, it uses the current 
shape in the stream side to query the spatial index.
+     * The query result is a list of geometries in the build side that overlap 
with the current shape in the stream side.
+     * The query result is flattened to a list of pairs of geometries
+     * 3. If there are no results, it returns false.
+     *
+     * @param spatialIndex spatial index of the build side
+     * @param streamShapes stream side geometries
+     * @param buildLeft whether the build side is left
+     * @return whether there is a next batch
+     */
+    private boolean populateNextBatch(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft)
+    {
+        if (!streamShapes.hasNext()) {
+            if (batch != null) {
+                batch = null;
+            }
+            return false;
+        }
+
+        batch = new ArrayList<>();
+
+        while (streamShapes.hasNext()) {
+            shapeCnt++;
+            streamCount.add(1);
+            final Geometry streamShape = streamShapes.next();
+            final List candidates = 
spatialIndex.query(streamShape.getEnvelopeInternal());
+            for (Object candidate : candidates) {
+                candidateCount.add(1);
+                final Geometry buildShape = (Geometry) candidate;
+                if (buildLeft) {
+                    if (match(buildShape, streamShape)) {
+                        batch.add(Pair.of((U) buildShape, (T) streamShape));
+                        resultCount.add(1);
+                    }
+                }
+                else {
+                    if (match(streamShape, buildShape)) {
+                        batch.add(Pair.of((U) streamShape, (T) buildShape));
+                        resultCount.add(1);
+                    }
+                }
+            }
+            logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+            if (!batch.isEmpty()) {
+                return true;
+            }
+        }
+
+        batch = null;
+        return false;
+    }
+
+    /**
+     * Populates the next batch of matches given the current shape in the 
stream side.
+     * This is solely used for nested loop join.
+     * It works as follows:
+     * 1. If there is no shape left in the stream side, it returns false.
+     * 2. If there are shapes left in the stream side, it uses the current 
shape in the stream side to query buildShapes
+     * The query result is a list of geometries in the build side that overlap 
with the current shape in the stream side.
+     * The query result is flattened to a list of pairs of geometries
+     * 3. If there are no results, it returns false.
+     * @param buildShapes
+     * @param streamShapes
+     * @return
+     */
+    private boolean populateNextBatch(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes)
+    {
+        if (!streamShapes.hasNext()) {
+            if (batch != null) {
+                batch = null;
+            }
+            return false;
+        }
+
+        batch = new ArrayList<>();
+
+        while (streamShapes.hasNext()) {
+            shapeCnt++;
+            streamCount.add(1);
+            final Geometry streamShape = streamShapes.next();
+            for (Object candidate : buildShapes) {
+                candidateCount.add(1);
+                final Geometry buildShape = (Geometry) candidate;
+                if (match(streamShape, buildShape)) {
+                    batch.add(Pair.of((U) streamShape, (T) buildShape));
+                    resultCount.add(1);
+                }
+            }
+            logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+            if (!batch.isEmpty()) {
+                return true;
+            }
+        }
+
+        batch = null;
+        return false;
+    }
+
+    protected void log(String message, Object... params)
+    {
+        if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
+            final int partitionId = TaskContext.getPartitionId();
+            final long threadId = Thread.currentThread().getId();
+            log.info("[" + threadId + ", PID=" + partitionId + "] " + 
String.format(message, params));
+        }
+    }
+
+    private void logMilestone(long cnt, long threshold, String name)
+    {
+        if (cnt > 1 && cnt % threshold == 1) {
+            log("[%s] Reached a milestone: %d", name, cnt);
+        }
+    }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
index e424a878..1514571a 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
@@ -20,54 +20,70 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.index.SpatialIndex;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 public class LeftIndexLookupJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
-        implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, 
Pair<T, U>>, Serializable
+        extends JudgementBase<T, U>
+        implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, 
Pair<U, T>>, Serializable
 {
 
     /**
      * @see JudgementBase
      */
-    public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate)
+    public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate,
+            Metric buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
-    public Iterator<Pair<T, U>> call(Iterator<SpatialIndex> indexIterator, 
Iterator<U> streamShapes)
+    public Iterator<Pair<U, T>> call(Iterator<SpatialIndex> indexIterator, 
Iterator<U> streamShapes)
             throws Exception
     {
-        List<Pair<T, U>> result = new ArrayList<>();
-
         if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
-            return result.iterator();
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
+            return Collections.emptyIterator();
         }
 
+        final boolean buildLeft = true;
+
         initPartition();
 
-        SpatialIndex treeIndex = indexIterator.next();
-        while (streamShapes.hasNext()) {
-            U streamShape = streamShapes.next();
-            List<Geometry> candidates = 
treeIndex.query(streamShape.getEnvelopeInternal());
-            for (Geometry candidate : candidates) {
-                // Refine phase. Use the real polygon (instead of its MBR) to 
recheck the spatial relation.
-                if (match(candidate, streamShape)) {
-                    result.add(Pair.of((T) candidate, streamShape));
-                }
+        SpatialIndex spatialIndex = indexIterator.next();
+
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(spatialIndex, streamShapes, buildLeft);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
index ce29bef4..66138895 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
@@ -20,14 +20,11 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,17 +32,19 @@ import java.util.Iterator;
 import java.util.List;
 
 public class NestedLoopJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
+        extends JudgementBase<T, U>
         implements FlatMapFunction2<Iterator<T>, Iterator<U>, Pair<U, T>>, 
Serializable
 {
-    private static final Logger log = 
LogManager.getLogger(NestedLoopJudgement.class);
-
     /**
      * @see JudgementBase
      */
-    public NestedLoopJudgement(SpatialPredicate spatialPredicate)
+    public NestedLoopJudgement(SpatialPredicate spatialPredicate,
+            Metric buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
@@ -53,26 +52,38 @@ public class NestedLoopJudgement<T extends Geometry, U 
extends Geometry>
             throws Exception
     {
         if (!iteratorObject.hasNext() || !iteratorWindow.hasNext()) {
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
             return Collections.emptyIterator();
         }
 
         initPartition();
 
-        List<Pair<U, T>> result = new ArrayList<>();
         List<T> queryObjects = new ArrayList<>();
         while (iteratorObject.hasNext()) {
             queryObjects.add(iteratorObject.next());
         }
-        while (iteratorWindow.hasNext()) {
-            U window = iteratorWindow.next();
-            for (int i = 0; i < queryObjects.size(); i++) {
-                T object = queryObjects.get(i);
-                //log.warn("Check "+window.toText()+" with "+object.toText());
-                if (match(window, object)) {
-                    result.add(Pair.of(window, object));
-                }
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(queryObjects, iteratorWindow);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(queryObjects, iteratorWindow);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
index f0215fdb..9cce4dc6 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
@@ -20,54 +20,70 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.index.SpatialIndex;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 public class RightIndexLookupJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
-        implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, 
Pair<T, U>>, Serializable
+        extends JudgementBase<T, U>
+        implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, 
Pair<U, T>>, Serializable
 {
 
     /**
      * @see JudgementBase
      */
-    public RightIndexLookupJudgement(SpatialPredicate spatialPredicate)
+    public RightIndexLookupJudgement(SpatialPredicate spatialPredicate,
+            Metric buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
-    public Iterator<Pair<T, U>> call(Iterator<T> streamShapes, 
Iterator<SpatialIndex> indexIterator)
+    public Iterator<Pair<U, T>> call(Iterator<T> streamShapes, 
Iterator<SpatialIndex> indexIterator)
             throws Exception
     {
-        List<Pair<T, U>> result = new ArrayList<>();
-
         if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
-            return result.iterator();
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
+            return Collections.emptyIterator();
         }
 
+        final boolean buildLeft = false;
+
         initPartition();
 
-        SpatialIndex treeIndex = indexIterator.next();
-        while (streamShapes.hasNext()) {
-            T streamShape = streamShapes.next();
-            List<Geometry> candidates = 
treeIndex.query(streamShape.getEnvelopeInternal());
-            for (Geometry candidate : candidates) {
-                // Refine phase. Use the real polygon (instead of its MBR) to 
recheck the spatial relation.
-                if (match(streamShape, candidate)) {
-                    result.add(Pair.of(streamShape, (U) candidate));
-                }
+        SpatialIndex spatialIndex = indexIterator.next();
+
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(spatialIndex, streamShapes, buildLeft);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java 
b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
index 7bda2b5c..bec77f9d 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
@@ -549,12 +549,14 @@ public class JoinQuery
         if (joinParams.useIndex) {
             if (rightRDD.indexedRDD != null) {
                 final RightIndexLookupJudgement judgement =
-                        new 
RightIndexLookupJudgement(joinParams.spatialPredicate);
+                        new 
RightIndexLookupJudgement(joinParams.spatialPredicate,
+                                buildCount, streamCount, resultCount, 
candidateCount);
                 joinResult = 
leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
             }
             else if (leftRDD.indexedRDD != null) {
                 final LeftIndexLookupJudgement judgement =
-                        new 
LeftIndexLookupJudgement(joinParams.spatialPredicate);
+                        new 
LeftIndexLookupJudgement(joinParams.spatialPredicate,
+                                buildCount, streamCount, resultCount, 
candidateCount);
                 joinResult = 
leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
             }
             else {
@@ -569,7 +571,8 @@ public class JoinQuery
             }
         }
         else {
-            NestedLoopJudgement judgement = new 
NestedLoopJudgement(joinParams.spatialPredicate);
+            NestedLoopJudgement judgement = new 
NestedLoopJudgement(joinParams.spatialPredicate,
+                    buildCount, streamCount, resultCount, candidateCount);
             joinResult = 
rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, 
judgement);
         }
 

Reply via email to