This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch iterator-model
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 1bb25a779e302b1b0a1bd9d6dfd4c00b904d8362
Author: Jia Yu <[email protected]>
AuthorDate: Tue Jun 6 16:57:26 2023 -0700

    Add the iterator method to all join types
---
 .../joinJudgement/DynamicIndexLookupJudgement.java | 104 +----------
 .../sedona/core/joinJudgement/JudgementBase.java   | 197 ++++++++++++++++++++-
 .../joinJudgement/LeftIndexLookupJudgement.java    |  61 ++++---
 .../core/joinJudgement/NestedLoopJudgement.java    |  51 +++---
 .../joinJudgement/RightIndexLookupJudgement.java   |  62 ++++---
 .../sedona/core/spatialOperator/JoinQuery.java     |   9 +-
 6 files changed, 312 insertions(+), 172 deletions(-)

diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
index 0a651381..e75dbf47 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
@@ -20,15 +20,11 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.sedona.core.enums.IndexType;
 import org.apache.sedona.core.enums.JoinBuildSide;
 import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.sedona.core.utils.TimeUtils;
-import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Envelope;
 import org.locationtech.jts.geom.Geometry;
@@ -36,22 +32,14 @@ import org.locationtech.jts.index.SpatialIndex;
 import org.locationtech.jts.index.quadtree.Quadtree;
 import org.locationtech.jts.index.strtree.STRtree;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
 
 public class DynamicIndexLookupJudgement<T extends Geometry, U extends 
Geometry>
-        extends JudgementBase
+        extends JudgementBase<T, U>
         implements FlatMapFunction2<Iterator<U>, Iterator<T>, Pair<U, T>>, 
Serializable
 {
-
-    private static final Logger log = 
LogManager.getLogger(DynamicIndexLookupJudgement.class);
-
     private final IndexType indexType;
     private final JoinBuildSide joinBuildSide;
     private final Metric buildCount;
@@ -71,7 +59,7 @@ public class DynamicIndexLookupJudgement<T extends Geometry, 
U extends Geometry>
             Metric resultCount,
             Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
         this.indexType = indexType;
         this.joinBuildSide = joinBuildSide;
         this.buildCount = buildCount;
@@ -112,84 +100,16 @@ public class DynamicIndexLookupJudgement<T extends 
Geometry, U extends Geometry>
 
         return new Iterator<Pair<U, T>>()
         {
-            // A batch of pre-computed matches
-            private List<Pair<U, T>> batch = null;
-            // An index of the element from 'batch' to return next
-            private int nextIndex = 0;
-
-            private int shapeCnt = 0;
-
             @Override
             public boolean hasNext()
             {
-                if (batch != null) {
-                    return true;
-                }
-                else {
-                    return populateNextBatch();
-                }
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
 
             @Override
             public Pair<U, T> next()
             {
-                if (batch == null) {
-                    populateNextBatch();
-                }
-
-                if (batch != null) {
-                    final Pair<U, T> result = batch.get(nextIndex);
-                    nextIndex++;
-                    if (nextIndex >= batch.size()) {
-                        populateNextBatch();
-                        nextIndex = 0;
-                    }
-                    return result;
-                }
-
-                throw new NoSuchElementException();
-            }
-
-            private boolean populateNextBatch()
-            {
-                if (!streamShapes.hasNext()) {
-                    if (batch != null) {
-                        batch = null;
-                    }
-                    return false;
-                }
-
-                batch = new ArrayList<>();
-
-                while (streamShapes.hasNext()) {
-                    shapeCnt++;
-                    streamCount.add(1);
-                    final Geometry streamShape = streamShapes.next();
-                    final List candidates = 
spatialIndex.query(streamShape.getEnvelopeInternal());
-                    for (Object candidate : candidates) {
-                        candidateCount.add(1);
-                        final Geometry buildShape = (Geometry) candidate;
-                        if (buildLeft) {
-                            if (match(buildShape, streamShape)) {
-                                batch.add(Pair.of((U) buildShape, (T) 
streamShape));
-                                resultCount.add(1);
-                            }
-                        }
-                        else {
-                            if (match(streamShape, buildShape)) {
-                                batch.add(Pair.of((U) streamShape, (T) 
buildShape));
-                                resultCount.add(1);
-                            }
-                        }
-                    }
-                    logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
-                    if (!batch.isEmpty()) {
-                        return true;
-                    }
-                }
-
-                batch = null;
-                return false;
+                return nextBase(spatialIndex, streamShapes, buildLeft);
             }
 
             @Override
@@ -227,20 +147,4 @@ public class DynamicIndexLookupJudgement<T extends 
Geometry, U extends Geometry>
                 throw new IllegalArgumentException("Unsupported index type: " 
+ indexType);
         }
     }
-
-    private void log(String message, Object... params)
-    {
-        if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
-            final int partitionId = TaskContext.getPartitionId();
-            final long threadId = Thread.currentThread().getId();
-            log.info("[" + threadId + ", PID=" + partitionId + "] " + 
String.format(message, params));
-        }
-    }
-
-    private void logMilestone(long cnt, long threshold, String name)
-    {
-        if (cnt > 1 && cnt % threshold == 1) {
-            log("[%s] Reached a milestone: %d", name, cnt);
-        }
-    }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 54ff5d16..1100a68d 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -19,11 +19,22 @@
 
 package org.apache.sedona.core.joinJudgement;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.sedona.core.spatialOperator.SpatialPredicateEvaluators;
+import org.apache.spark.TaskContext;
 import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.index.SpatialIndex;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 /**
  * Base class for partition level join implementations.
@@ -31,19 +42,41 @@ import java.io.Serializable;
  * Provides `match` method to test whether a given pair of geometries 
satisfies join condition.
  * <p>
  */
-abstract class JudgementBase
+abstract class JudgementBase<T extends Geometry, U extends Geometry>
         implements Serializable
 {
+    private static final Logger log = 
LogManager.getLogger(JudgementBase.class);
 
     private final SpatialPredicate spatialPredicate;
     private transient SpatialPredicateEvaluators.SpatialPredicateEvaluator 
evaluator;
+    protected final Metric buildCount;
+    protected final Metric streamCount;
+    protected final Metric resultCount;
+    protected final Metric candidateCount;
+
+    private int shapeCnt;
+
+    // A batch of pre-computed matches
+    private List<Pair<U, T>> batch = null;
+    // An index of the element from 'batch' to return next
+    private int nextIndex = 0;
 
     /**
+     *
      * @param spatialPredicate spatial predicate as join condition
+     * @param buildCount num of geometries in build side
+     * @param streamCount num of geometries in stream side
+     * @param resultCount num of join results
+     * @param candidateCount num of candidate pairs to be refined by their 
real geometries
      */
-    protected JudgementBase(SpatialPredicate spatialPredicate)
+    protected JudgementBase(SpatialPredicate spatialPredicate, Metric 
buildCount, Metric streamCount, Metric resultCount, Metric candidateCount)
     {
         this.spatialPredicate = spatialPredicate;
+        this.buildCount = buildCount;
+        this.streamCount = streamCount;
+        this.resultCount = resultCount;
+        this.candidateCount = candidateCount;
+        this.shapeCnt = 0;
     }
 
     /**
@@ -58,8 +91,166 @@ abstract class JudgementBase
         evaluator = SpatialPredicateEvaluators.create(spatialPredicate);
     }
 
-    public boolean match(Geometry left, Geometry right)
+    private boolean match(Geometry left, Geometry right)
     {
         return evaluator.eval(left, right);
     }
+
+    protected boolean hasNextBase(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft)
+    {
+        if (batch != null) {
+            return true;
+        }
+        else {
+            return populateNextBatch(spatialIndex, streamShapes, buildLeft);
+        }
+    }
+
+    protected boolean hasNextBase(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes)
+    {
+        if (batch != null) {
+            return true;
+        }
+        else {
+            return populateNextBatch(buildShapes, streamShapes);
+        }
+    }
+
+    protected Pair<U, T> nextBase(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft) {
+        if (batch == null) {
+            populateNextBatch(spatialIndex, streamShapes, buildLeft);
+        }
+
+        if (batch != null) {
+            final Pair<U, T> result = batch.get(nextIndex);
+            nextIndex++;
+            if (nextIndex >= batch.size()) {
+                populateNextBatch(spatialIndex, streamShapes, buildLeft);
+                nextIndex = 0;
+            }
+            return result;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+    protected Pair<U, T> nextBase(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes) {
+        if (batch == null) {
+            populateNextBatch(buildShapes, streamShapes);
+        }
+
+        if (batch != null) {
+            final Pair<U, T> result = batch.get(nextIndex);
+            nextIndex++;
+            if (nextIndex >= batch.size()) {
+                populateNextBatch(buildShapes, streamShapes);
+                nextIndex = 0;
+            }
+            return result;
+        }
+
+        throw new NoSuchElementException();
+    }
+
+    /**
+     * Populates the next batch of matches.
+     * It works as fo
+     * @param spatialIndex
+     * @param streamShapes
+     * @param buildLeft
+     * @return
+     */
+    private boolean populateNextBatch(SpatialIndex spatialIndex, Iterator<? 
extends Geometry> streamShapes,
+            boolean buildLeft)
+    {
+        if (!streamShapes.hasNext()) {
+            if (batch != null) {
+                batch = null;
+            }
+            return false;
+        }
+
+        batch = new ArrayList<>();
+
+        while (streamShapes.hasNext()) {
+            shapeCnt++;
+            streamCount.add(1);
+            final Geometry streamShape = streamShapes.next();
+            final List candidates = 
spatialIndex.query(streamShape.getEnvelopeInternal());
+            for (Object candidate : candidates) {
+                candidateCount.add(1);
+                final Geometry buildShape = (Geometry) candidate;
+                if (buildLeft) {
+                    if (match(buildShape, streamShape)) {
+                        batch.add(Pair.of((U) buildShape, (T) streamShape));
+                        resultCount.add(1);
+                    }
+                }
+                else {
+                    if (match(streamShape, buildShape)) {
+                        batch.add(Pair.of((U) streamShape, (T) buildShape));
+                        resultCount.add(1);
+                    }
+                }
+            }
+            logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+            if (!batch.isEmpty()) {
+                return true;
+            }
+        }
+
+        batch = null;
+        return false;
+    }
+
+    private boolean populateNextBatch(List<? extends Geometry> buildShapes, 
Iterator<? extends Geometry> streamShapes)
+    {
+        if (!streamShapes.hasNext()) {
+            if (batch != null) {
+                batch = null;
+            }
+            return false;
+        }
+
+        batch = new ArrayList<>();
+
+        while (streamShapes.hasNext()) {
+            shapeCnt++;
+            streamCount.add(1);
+            final Geometry streamShape = streamShapes.next();
+            for (Object candidate : buildShapes) {
+                candidateCount.add(1);
+                final Geometry buildShape = (Geometry) candidate;
+                if (match(streamShape, buildShape)) {
+                    batch.add(Pair.of((U) streamShape, (T) buildShape));
+                    resultCount.add(1);
+                }
+            }
+            logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+            if (!batch.isEmpty()) {
+                return true;
+            }
+        }
+
+        batch = null;
+        return false;
+    }
+
+    private void log(String message, Object... params)
+    {
+        if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
+            final int partitionId = TaskContext.getPartitionId();
+            final long threadId = Thread.currentThread().getId();
+            log.info("[" + threadId + ", PID=" + partitionId + "] " + 
String.format(message, params));
+        }
+    }
+
+    private void logMilestone(long cnt, long threshold, String name)
+    {
+        if (cnt > 1 && cnt % threshold == 1) {
+            log("[%s] Reached a milestone: %d", name, cnt);
+        }
+    }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
index e424a878..6ab874f8 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
@@ -20,54 +20,69 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.index.SpatialIndex;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 public class LeftIndexLookupJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
-        implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, 
Pair<T, U>>, Serializable
+        extends JudgementBase<T, U>
+        implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, 
Pair<U, T>>, Serializable
 {
 
     /**
      * @see JudgementBase
      */
-    public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate)
+    public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate, Metric 
buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
-    public Iterator<Pair<T, U>> call(Iterator<SpatialIndex> indexIterator, 
Iterator<U> streamShapes)
+    public Iterator<Pair<U, T>> call(Iterator<SpatialIndex> indexIterator, 
Iterator<U> streamShapes)
             throws Exception
     {
-        List<Pair<T, U>> result = new ArrayList<>();
-
         if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
-            return result.iterator();
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
+            return Collections.emptyIterator();
         }
 
+        final boolean buildLeft = true;
+
         initPartition();
 
-        SpatialIndex treeIndex = indexIterator.next();
-        while (streamShapes.hasNext()) {
-            U streamShape = streamShapes.next();
-            List<Geometry> candidates = 
treeIndex.query(streamShape.getEnvelopeInternal());
-            for (Geometry candidate : candidates) {
-                // Refine phase. Use the real polygon (instead of its MBR) to 
recheck the spatial relation.
-                if (match(candidate, streamShape)) {
-                    result.add(Pair.of((T) candidate, streamShape));
-                }
+        SpatialIndex spatialIndex = indexIterator.next();
+
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(spatialIndex, streamShapes, buildLeft);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
index ce29bef4..66138895 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
@@ -20,14 +20,11 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -35,17 +32,19 @@ import java.util.Iterator;
 import java.util.List;
 
 public class NestedLoopJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
+        extends JudgementBase<T, U>
         implements FlatMapFunction2<Iterator<T>, Iterator<U>, Pair<U, T>>, 
Serializable
 {
-    private static final Logger log = 
LogManager.getLogger(NestedLoopJudgement.class);
-
     /**
      * @see JudgementBase
      */
-    public NestedLoopJudgement(SpatialPredicate spatialPredicate)
+    public NestedLoopJudgement(SpatialPredicate spatialPredicate,
+            Metric buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
@@ -53,26 +52,38 @@ public class NestedLoopJudgement<T extends Geometry, U 
extends Geometry>
             throws Exception
     {
         if (!iteratorObject.hasNext() || !iteratorWindow.hasNext()) {
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
             return Collections.emptyIterator();
         }
 
         initPartition();
 
-        List<Pair<U, T>> result = new ArrayList<>();
         List<T> queryObjects = new ArrayList<>();
         while (iteratorObject.hasNext()) {
             queryObjects.add(iteratorObject.next());
         }
-        while (iteratorWindow.hasNext()) {
-            U window = iteratorWindow.next();
-            for (int i = 0; i < queryObjects.size(); i++) {
-                T object = queryObjects.get(i);
-                //log.warn("Check "+window.toText()+" with "+object.toText());
-                if (match(window, object)) {
-                    result.add(Pair.of(window, object));
-                }
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(queryObjects, iteratorWindow);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(queryObjects, iteratorWindow);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
index f0215fdb..9cce4dc6 100644
--- 
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
+++ 
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
@@ -20,54 +20,70 @@
 package org.apache.sedona.core.joinJudgement;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
 import org.apache.sedona.core.spatialOperator.SpatialPredicate;
 import org.apache.spark.api.java.function.FlatMapFunction2;
 import org.locationtech.jts.geom.Geometry;
 import org.locationtech.jts.index.SpatialIndex;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
 
 public class RightIndexLookupJudgement<T extends Geometry, U extends Geometry>
-        extends JudgementBase
-        implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, 
Pair<T, U>>, Serializable
+        extends JudgementBase<T, U>
+        implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, 
Pair<U, T>>, Serializable
 {
 
     /**
      * @see JudgementBase
      */
-    public RightIndexLookupJudgement(SpatialPredicate spatialPredicate)
+    public RightIndexLookupJudgement(SpatialPredicate spatialPredicate,
+            Metric buildCount,
+            Metric streamCount,
+            Metric resultCount,
+            Metric candidateCount)
     {
-        super(spatialPredicate);
+        super(spatialPredicate, buildCount, streamCount, resultCount, 
candidateCount);
     }
 
     @Override
-    public Iterator<Pair<T, U>> call(Iterator<T> streamShapes, 
Iterator<SpatialIndex> indexIterator)
+    public Iterator<Pair<U, T>> call(Iterator<T> streamShapes, 
Iterator<SpatialIndex> indexIterator)
             throws Exception
     {
-        List<Pair<T, U>> result = new ArrayList<>();
-
         if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
-            return result.iterator();
+            buildCount.add(0);
+            streamCount.add(0);
+            resultCount.add(0);
+            candidateCount.add(0);
+            return Collections.emptyIterator();
         }
 
+        final boolean buildLeft = false;
+
         initPartition();
 
-        SpatialIndex treeIndex = indexIterator.next();
-        while (streamShapes.hasNext()) {
-            T streamShape = streamShapes.next();
-            List<Geometry> candidates = 
treeIndex.query(streamShape.getEnvelopeInternal());
-            for (Geometry candidate : candidates) {
-                // Refine phase. Use the real polygon (instead of its MBR) to 
recheck the spatial relation.
-                if (match(streamShape, candidate)) {
-                    result.add(Pair.of(streamShape, (U) candidate));
-                }
+        SpatialIndex spatialIndex = indexIterator.next();
+
+        return new Iterator<Pair<U, T>>()
+        {
+            @Override
+            public boolean hasNext()
+            {
+                return hasNextBase(spatialIndex, streamShapes, buildLeft);
             }
-        }
-        return result.iterator();
+
+            @Override
+            public Pair<U, T> next()
+            {
+                return nextBase(spatialIndex, streamShapes, buildLeft);
+            }
+
+            @Override
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+        };
     }
 }
diff --git 
a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java 
b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
index 7bda2b5c..bec77f9d 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
@@ -549,12 +549,14 @@ public class JoinQuery
         if (joinParams.useIndex) {
             if (rightRDD.indexedRDD != null) {
                 final RightIndexLookupJudgement judgement =
-                        new 
RightIndexLookupJudgement(joinParams.spatialPredicate);
+                        new 
RightIndexLookupJudgement(joinParams.spatialPredicate,
+                                buildCount, streamCount, resultCount, 
candidateCount);
                 joinResult = 
leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
             }
             else if (leftRDD.indexedRDD != null) {
                 final LeftIndexLookupJudgement judgement =
-                        new 
LeftIndexLookupJudgement(joinParams.spatialPredicate);
+                        new 
LeftIndexLookupJudgement(joinParams.spatialPredicate,
+                                buildCount, streamCount, resultCount, 
candidateCount);
                 joinResult = 
leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
             }
             else {
@@ -569,7 +571,8 @@ public class JoinQuery
             }
         }
         else {
-            NestedLoopJudgement judgement = new 
NestedLoopJudgement(joinParams.spatialPredicate);
+            NestedLoopJudgement judgement = new 
NestedLoopJudgement(joinParams.spatialPredicate,
+                    buildCount, streamCount, resultCount, candidateCount);
             joinResult = 
rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, 
judgement);
         }
 

Reply via email to