This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 603559ab [SEDONA-290] RDD Spatial Joins should follow the iterator
model (#851)
603559ab is described below
commit 603559ab4e51cc7a7b58d382ab020804a5bdd196
Author: Jia Yu <[email protected]>
AuthorDate: Tue Jun 6 23:52:49 2023 -0700
[SEDONA-290] RDD Spatial Joins should follow the iterator model (#851)
---
.../joinJudgement/DynamicIndexLookupJudgement.java | 112 +--------
.../sedona/core/joinJudgement/JudgementBase.java | 250 ++++++++++++++++++++-
.../joinJudgement/LeftIndexLookupJudgement.java | 62 +++--
.../core/joinJudgement/NestedLoopJudgement.java | 51 +++--
.../joinJudgement/RightIndexLookupJudgement.java | 62 +++--
.../sedona/core/spatialOperator/JoinQuery.java | 9 +-
6 files changed, 366 insertions(+), 180 deletions(-)
diff --git
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
index 0a651381..7767c969 100644
---
a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
+++
b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java
@@ -20,15 +20,11 @@
package org.apache.sedona.core.joinJudgement;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.utils.TimeUtils;
-import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
@@ -36,28 +32,16 @@ import org.locationtech.jts.index.SpatialIndex;
import org.locationtech.jts.index.quadtree.Quadtree;
import org.locationtech.jts.index.strtree.STRtree;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
public class DynamicIndexLookupJudgement<T extends Geometry, U extends
Geometry>
- extends JudgementBase
+ extends JudgementBase<T, U>
implements FlatMapFunction2<Iterator<U>, Iterator<T>, Pair<U, T>>,
Serializable
{
-
- private static final Logger log =
LogManager.getLogger(DynamicIndexLookupJudgement.class);
-
private final IndexType indexType;
private final JoinBuildSide joinBuildSide;
- private final Metric buildCount;
- private final Metric streamCount;
- private final Metric resultCount;
- private final Metric candidateCount;
/**
* @see JudgementBase
@@ -71,13 +55,9 @@ public class DynamicIndexLookupJudgement<T extends Geometry,
U extends Geometry>
Metric resultCount,
Metric candidateCount)
{
- super(spatialPredicate);
+ super(spatialPredicate, buildCount, streamCount, resultCount,
candidateCount);
this.indexType = indexType;
this.joinBuildSide = joinBuildSide;
- this.buildCount = buildCount;
- this.streamCount = streamCount;
- this.resultCount = resultCount;
- this.candidateCount = candidateCount;
}
@Override
@@ -112,84 +92,16 @@ public class DynamicIndexLookupJudgement<T extends
Geometry, U extends Geometry>
return new Iterator<Pair<U, T>>()
{
- // A batch of pre-computed matches
- private List<Pair<U, T>> batch = null;
- // An index of the element from 'batch' to return next
- private int nextIndex = 0;
-
- private int shapeCnt = 0;
-
@Override
public boolean hasNext()
{
- if (batch != null) {
- return true;
- }
- else {
- return populateNextBatch();
- }
+ return hasNextBase(spatialIndex, streamShapes, buildLeft);
}
@Override
public Pair<U, T> next()
{
- if (batch == null) {
- populateNextBatch();
- }
-
- if (batch != null) {
- final Pair<U, T> result = batch.get(nextIndex);
- nextIndex++;
- if (nextIndex >= batch.size()) {
- populateNextBatch();
- nextIndex = 0;
- }
- return result;
- }
-
- throw new NoSuchElementException();
- }
-
- private boolean populateNextBatch()
- {
- if (!streamShapes.hasNext()) {
- if (batch != null) {
- batch = null;
- }
- return false;
- }
-
- batch = new ArrayList<>();
-
- while (streamShapes.hasNext()) {
- shapeCnt++;
- streamCount.add(1);
- final Geometry streamShape = streamShapes.next();
- final List candidates =
spatialIndex.query(streamShape.getEnvelopeInternal());
- for (Object candidate : candidates) {
- candidateCount.add(1);
- final Geometry buildShape = (Geometry) candidate;
- if (buildLeft) {
- if (match(buildShape, streamShape)) {
- batch.add(Pair.of((U) buildShape, (T)
streamShape));
- resultCount.add(1);
- }
- }
- else {
- if (match(streamShape, buildShape)) {
- batch.add(Pair.of((U) streamShape, (T)
buildShape));
- resultCount.add(1);
- }
- }
- }
- logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
- if (!batch.isEmpty()) {
- return true;
- }
- }
-
- batch = null;
- return false;
+ return nextBase(spatialIndex, streamShapes, buildLeft);
}
@Override
@@ -227,20 +139,4 @@ public class DynamicIndexLookupJudgement<T extends
Geometry, U extends Geometry>
throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
}
}
-
- private void log(String message, Object... params)
- {
- if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
- final int partitionId = TaskContext.getPartitionId();
- final long threadId = Thread.currentThread().getId();
- log.info("[" + threadId + ", PID=" + partitionId + "] " +
String.format(message, params));
- }
- }
-
- private void logMilestone(long cnt, long threshold, String name)
- {
- if (cnt > 1 && cnt % threshold == 1) {
- log("[%s] Reached a milestone: %d", name, cnt);
- }
- }
}
diff --git
a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 54ff5d16..8b2d869f 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -19,11 +19,22 @@
package org.apache.sedona.core.joinJudgement;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.spatialOperator.SpatialPredicateEvaluators;
+import org.apache.spark.TaskContext;
import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.index.SpatialIndex;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
/**
* Base class for partition level join implementations.
@@ -31,19 +42,41 @@ import java.io.Serializable;
* Provides `match` method to test whether a given pair of geometries
satisfies join condition.
* <p>
*/
-abstract class JudgementBase
+abstract class JudgementBase<T extends Geometry, U extends Geometry>
implements Serializable
{
+ private static final Logger log =
LogManager.getLogger(JudgementBase.class);
private final SpatialPredicate spatialPredicate;
private transient SpatialPredicateEvaluators.SpatialPredicateEvaluator
evaluator;
+ protected final Metric buildCount;
+ protected final Metric streamCount;
+ protected final Metric resultCount;
+ protected final Metric candidateCount;
+
+ private int shapeCnt;
+
+ // A batch of pre-computed matches
+ private List<Pair<U, T>> batch = null;
+ // An index of the element from 'batch' to return next
+ private int nextIndex = 0;
/**
+ *
* @param spatialPredicate spatial predicate as join condition
+ * @param buildCount num of geometries in build side
+ * @param streamCount num of geometries in stream side
+ * @param resultCount num of join results
+ * @param candidateCount num of candidate pairs to be refined by their
real geometries
*/
- protected JudgementBase(SpatialPredicate spatialPredicate)
+ protected JudgementBase(SpatialPredicate spatialPredicate, Metric
buildCount, Metric streamCount, Metric resultCount, Metric candidateCount)
{
this.spatialPredicate = spatialPredicate;
+ this.buildCount = buildCount;
+ this.streamCount = streamCount;
+ this.resultCount = resultCount;
+ this.candidateCount = candidateCount;
+ this.shapeCnt = 0;
}
/**
@@ -58,8 +91,219 @@ abstract class JudgementBase
evaluator = SpatialPredicateEvaluators.create(spatialPredicate);
}
- public boolean match(Geometry left, Geometry right)
+ private boolean match(Geometry left, Geometry right)
{
return evaluator.eval(left, right);
}
+
+ /**
+ * Iterator model for the index-based join.
+ * It checks if there is a next match and populate it to the result.
+ * @param spatialIndex
+ * @param streamShapes
+ * @param buildLeft
+ * @return
+ */
+ protected boolean hasNextBase(SpatialIndex spatialIndex, Iterator<?
extends Geometry> streamShapes,
+ boolean buildLeft)
+ {
+ if (batch != null) {
+ return true;
+ }
+ else {
+ return populateNextBatch(spatialIndex, streamShapes, buildLeft);
+ }
+ }
+
+ /**
+ * Iterator model for the nest loop join.
+ * It checks if there is a next match and populate it to the result.
+ * @param buildShapes
+ * @param streamShapes
+ * @return
+ */
+ protected boolean hasNextBase(List<? extends Geometry> buildShapes,
Iterator<? extends Geometry> streamShapes)
+ {
+ if (batch != null) {
+ return true;
+ }
+ else {
+ return populateNextBatch(buildShapes, streamShapes);
+ }
+ }
+
+ /**
+ * Iterator model for the index-based join.
+ * It returns 1 pair in the current batch.
+ * Each batch contains a list of pairs of geometries that satisfy the join
condition.
+ * The current batch is the result of the current stream shape against all
the build shapes.
+ * @param spatialIndex
+ * @param streamShapes
+ * @param buildLeft
+ * @return
+ */
+ protected Pair<U, T> nextBase(SpatialIndex spatialIndex, Iterator<?
extends Geometry> streamShapes,
+ boolean buildLeft) {
+ if (batch == null) {
+ populateNextBatch(spatialIndex, streamShapes, buildLeft);
+ }
+
+ if (batch != null) {
+ final Pair<U, T> result = batch.get(nextIndex);
+ nextIndex++;
+ if (nextIndex >= batch.size()) {
+ populateNextBatch(spatialIndex, streamShapes, buildLeft);
+ nextIndex = 0;
+ }
+ return result;
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ /**
+ * Iterator model for the nest loop join.
+ * It returns 1 pair in the current batch.
+ * Each batch contains a list of pairs of geometries that satisfy the join
condition.
+ * The current batch is the result of the current stream shape against all
the build shapes.
+ * @param buildShapes
+ * @param streamShapes
+ * @return
+ */
+ protected Pair<U, T> nextBase(List<? extends Geometry> buildShapes,
Iterator<? extends Geometry> streamShapes) {
+ if (batch == null) {
+ populateNextBatch(buildShapes, streamShapes);
+ }
+
+ if (batch != null) {
+ final Pair<U, T> result = batch.get(nextIndex);
+ nextIndex++;
+ if (nextIndex >= batch.size()) {
+ populateNextBatch(buildShapes, streamShapes);
+ nextIndex = 0;
+ }
+ return result;
+ }
+
+ throw new NoSuchElementException();
+ }
+
+ /**
+ * Populates the next batch of matches given the current shape in the
stream side.
+ * It works as follows:
+ * 1. If there is no shape left in the stream side, it returns false.
+ * 2. If there are shapes left in the stream side, it uses the current
shape in the stream side to query the spatial index.
+ * The query result is a list of geometries in the build side that overlap
with the current shape in the stream side.
+ * The query result is flattened to a list of pairs of geometries
+ * 3. If there are no results, it returns false.
+ *
+ * @param spatialIndex spatial index of the build side
+ * @param streamShapes stream side geometries
+ * @param buildLeft whether the build side is left
+ * @return whether there is a next batch
+ */
+ private boolean populateNextBatch(SpatialIndex spatialIndex, Iterator<?
extends Geometry> streamShapes,
+ boolean buildLeft)
+ {
+ if (!streamShapes.hasNext()) {
+ if (batch != null) {
+ batch = null;
+ }
+ return false;
+ }
+
+ batch = new ArrayList<>();
+
+ while (streamShapes.hasNext()) {
+ shapeCnt++;
+ streamCount.add(1);
+ final Geometry streamShape = streamShapes.next();
+ final List candidates =
spatialIndex.query(streamShape.getEnvelopeInternal());
+ for (Object candidate : candidates) {
+ candidateCount.add(1);
+ final Geometry buildShape = (Geometry) candidate;
+ if (buildLeft) {
+ if (match(buildShape, streamShape)) {
+ batch.add(Pair.of((U) buildShape, (T) streamShape));
+ resultCount.add(1);
+ }
+ }
+ else {
+ if (match(streamShape, buildShape)) {
+ batch.add(Pair.of((U) streamShape, (T) buildShape));
+ resultCount.add(1);
+ }
+ }
+ }
+ logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+ if (!batch.isEmpty()) {
+ return true;
+ }
+ }
+
+ batch = null;
+ return false;
+ }
+
+ /**
+ * Populates the next batch of matches given the current shape in the
stream side.
+ * This is solely used for nested loop join.
+ * It works as follows:
+ * 1. If there is no shape left in the stream side, it returns false.
+ * 2. If there are shapes left in the stream side, it uses the current
shape in the stream side to query buildShapes
+ * The query result is a list of geometries in the build side that overlap
with the current shape in the stream side.
+ * The query result is flattened to a list of pairs of geometries
+ * 3. If there are no results, it returns false.
+ * @param buildShapes
+ * @param streamShapes
+ * @return
+ */
+ private boolean populateNextBatch(List<? extends Geometry> buildShapes,
Iterator<? extends Geometry> streamShapes)
+ {
+ if (!streamShapes.hasNext()) {
+ if (batch != null) {
+ batch = null;
+ }
+ return false;
+ }
+
+ batch = new ArrayList<>();
+
+ while (streamShapes.hasNext()) {
+ shapeCnt++;
+ streamCount.add(1);
+ final Geometry streamShape = streamShapes.next();
+ for (Object candidate : buildShapes) {
+ candidateCount.add(1);
+ final Geometry buildShape = (Geometry) candidate;
+ if (match(streamShape, buildShape)) {
+ batch.add(Pair.of((U) streamShape, (T) buildShape));
+ resultCount.add(1);
+ }
+ }
+ logMilestone(shapeCnt, 100 * 1000, "Streaming shapes");
+ if (!batch.isEmpty()) {
+ return true;
+ }
+ }
+
+ batch = null;
+ return false;
+ }
+
+ protected void log(String message, Object... params)
+ {
+ if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) {
+ final int partitionId = TaskContext.getPartitionId();
+ final long threadId = Thread.currentThread().getId();
+ log.info("[" + threadId + ", PID=" + partitionId + "] " +
String.format(message, params));
+ }
+ }
+
+ private void logMilestone(long cnt, long threshold, String name)
+ {
+ if (cnt > 1 && cnt % threshold == 1) {
+ log("[%s] Reached a milestone: %d", name, cnt);
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
index e424a878..1514571a 100644
---
a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
+++
b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java
@@ -20,54 +20,70 @@
package org.apache.sedona.core.joinJudgement;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
public class LeftIndexLookupJudgement<T extends Geometry, U extends Geometry>
- extends JudgementBase
- implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>,
Pair<T, U>>, Serializable
+ extends JudgementBase<T, U>
+ implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>,
Pair<U, T>>, Serializable
{
/**
* @see JudgementBase
*/
- public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate)
+ public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate,
+ Metric buildCount,
+ Metric streamCount,
+ Metric resultCount,
+ Metric candidateCount)
{
- super(spatialPredicate);
+ super(spatialPredicate, buildCount, streamCount, resultCount,
candidateCount);
}
@Override
- public Iterator<Pair<T, U>> call(Iterator<SpatialIndex> indexIterator,
Iterator<U> streamShapes)
+ public Iterator<Pair<U, T>> call(Iterator<SpatialIndex> indexIterator,
Iterator<U> streamShapes)
throws Exception
{
- List<Pair<T, U>> result = new ArrayList<>();
-
if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
- return result.iterator();
+ buildCount.add(0);
+ streamCount.add(0);
+ resultCount.add(0);
+ candidateCount.add(0);
+ return Collections.emptyIterator();
}
+ final boolean buildLeft = true;
+
initPartition();
- SpatialIndex treeIndex = indexIterator.next();
- while (streamShapes.hasNext()) {
- U streamShape = streamShapes.next();
- List<Geometry> candidates =
treeIndex.query(streamShape.getEnvelopeInternal());
- for (Geometry candidate : candidates) {
- // Refine phase. Use the real polygon (instead of its MBR) to
recheck the spatial relation.
- if (match(candidate, streamShape)) {
- result.add(Pair.of((T) candidate, streamShape));
- }
+ SpatialIndex spatialIndex = indexIterator.next();
+
+ return new Iterator<Pair<U, T>>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return hasNextBase(spatialIndex, streamShapes, buildLeft);
}
- }
- return result.iterator();
+
+ @Override
+ public Pair<U, T> next()
+ {
+ return nextBase(spatialIndex, streamShapes, buildLeft);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
}
diff --git
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
index ce29bef4..66138895 100644
---
a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
+++
b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java
@@ -20,14 +20,11 @@
package org.apache.sedona.core.joinJudgement;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.locationtech.jts.geom.Geometry;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -35,17 +32,19 @@ import java.util.Iterator;
import java.util.List;
public class NestedLoopJudgement<T extends Geometry, U extends Geometry>
- extends JudgementBase
+ extends JudgementBase<T, U>
implements FlatMapFunction2<Iterator<T>, Iterator<U>, Pair<U, T>>,
Serializable
{
- private static final Logger log =
LogManager.getLogger(NestedLoopJudgement.class);
-
/**
* @see JudgementBase
*/
- public NestedLoopJudgement(SpatialPredicate spatialPredicate)
+ public NestedLoopJudgement(SpatialPredicate spatialPredicate,
+ Metric buildCount,
+ Metric streamCount,
+ Metric resultCount,
+ Metric candidateCount)
{
- super(spatialPredicate);
+ super(spatialPredicate, buildCount, streamCount, resultCount,
candidateCount);
}
@Override
@@ -53,26 +52,38 @@ public class NestedLoopJudgement<T extends Geometry, U
extends Geometry>
throws Exception
{
if (!iteratorObject.hasNext() || !iteratorWindow.hasNext()) {
+ buildCount.add(0);
+ streamCount.add(0);
+ resultCount.add(0);
+ candidateCount.add(0);
return Collections.emptyIterator();
}
initPartition();
- List<Pair<U, T>> result = new ArrayList<>();
List<T> queryObjects = new ArrayList<>();
while (iteratorObject.hasNext()) {
queryObjects.add(iteratorObject.next());
}
- while (iteratorWindow.hasNext()) {
- U window = iteratorWindow.next();
- for (int i = 0; i < queryObjects.size(); i++) {
- T object = queryObjects.get(i);
- //log.warn("Check "+window.toText()+" with "+object.toText());
- if (match(window, object)) {
- result.add(Pair.of(window, object));
- }
+ return new Iterator<Pair<U, T>>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return hasNextBase(queryObjects, iteratorWindow);
}
- }
- return result.iterator();
+
+ @Override
+ public Pair<U, T> next()
+ {
+ return nextBase(queryObjects, iteratorWindow);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
}
diff --git
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
index f0215fdb..9cce4dc6 100644
---
a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
+++
b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java
@@ -20,54 +20,70 @@
package org.apache.sedona.core.joinJudgement;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
public class RightIndexLookupJudgement<T extends Geometry, U extends Geometry>
- extends JudgementBase
- implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>,
Pair<T, U>>, Serializable
+ extends JudgementBase<T, U>
+ implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>,
Pair<U, T>>, Serializable
{
/**
* @see JudgementBase
*/
- public RightIndexLookupJudgement(SpatialPredicate spatialPredicate)
+ public RightIndexLookupJudgement(SpatialPredicate spatialPredicate,
+ Metric buildCount,
+ Metric streamCount,
+ Metric resultCount,
+ Metric candidateCount)
{
- super(spatialPredicate);
+ super(spatialPredicate, buildCount, streamCount, resultCount,
candidateCount);
}
@Override
- public Iterator<Pair<T, U>> call(Iterator<T> streamShapes,
Iterator<SpatialIndex> indexIterator)
+ public Iterator<Pair<U, T>> call(Iterator<T> streamShapes,
Iterator<SpatialIndex> indexIterator)
throws Exception
{
- List<Pair<T, U>> result = new ArrayList<>();
-
if (!indexIterator.hasNext() || !streamShapes.hasNext()) {
- return result.iterator();
+ buildCount.add(0);
+ streamCount.add(0);
+ resultCount.add(0);
+ candidateCount.add(0);
+ return Collections.emptyIterator();
}
+ final boolean buildLeft = false;
+
initPartition();
- SpatialIndex treeIndex = indexIterator.next();
- while (streamShapes.hasNext()) {
- T streamShape = streamShapes.next();
- List<Geometry> candidates =
treeIndex.query(streamShape.getEnvelopeInternal());
- for (Geometry candidate : candidates) {
- // Refine phase. Use the real polygon (instead of its MBR) to
recheck the spatial relation.
- if (match(streamShape, candidate)) {
- result.add(Pair.of(streamShape, (U) candidate));
- }
+ SpatialIndex spatialIndex = indexIterator.next();
+
+ return new Iterator<Pair<U, T>>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return hasNextBase(spatialIndex, streamShapes, buildLeft);
}
- }
- return result.iterator();
+
+ @Override
+ public Pair<U, T> next()
+ {
+ return nextBase(spatialIndex, streamShapes, buildLeft);
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
}
}
diff --git
a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
index 7bda2b5c..bec77f9d 100644
--- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
+++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java
@@ -549,12 +549,14 @@ public class JoinQuery
if (joinParams.useIndex) {
if (rightRDD.indexedRDD != null) {
final RightIndexLookupJudgement judgement =
- new
RightIndexLookupJudgement(joinParams.spatialPredicate);
+ new
RightIndexLookupJudgement(joinParams.spatialPredicate,
+ buildCount, streamCount, resultCount,
candidateCount);
joinResult =
leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement);
}
else if (leftRDD.indexedRDD != null) {
final LeftIndexLookupJudgement judgement =
- new
LeftIndexLookupJudgement(joinParams.spatialPredicate);
+ new
LeftIndexLookupJudgement(joinParams.spatialPredicate,
+ buildCount, streamCount, resultCount,
candidateCount);
joinResult =
leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement);
}
else {
@@ -569,7 +571,8 @@ public class JoinQuery
}
}
else {
- NestedLoopJudgement judgement = new
NestedLoopJudgement(joinParams.spatialPredicate);
+ NestedLoopJudgement judgement = new
NestedLoopJudgement(joinParams.spatialPredicate,
+ buildCount, streamCount, resultCount, candidateCount);
joinResult =
rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD,
judgement);
}