This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch iterator-model in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 1bb25a779e302b1b0a1bd9d6dfd4c00b904d8362 Author: Jia Yu <[email protected]> AuthorDate: Tue Jun 6 16:57:26 2023 -0700 Add the iterator method to all join types --- .../joinJudgement/DynamicIndexLookupJudgement.java | 104 +---------- .../sedona/core/joinJudgement/JudgementBase.java | 197 ++++++++++++++++++++- .../joinJudgement/LeftIndexLookupJudgement.java | 61 ++++--- .../core/joinJudgement/NestedLoopJudgement.java | 51 +++--- .../joinJudgement/RightIndexLookupJudgement.java | 62 ++++--- .../sedona/core/spatialOperator/JoinQuery.java | 9 +- 6 files changed, 312 insertions(+), 172 deletions(-) diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java index 0a651381..e75dbf47 100644 --- a/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java +++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/DynamicIndexLookupJudgement.java @@ -20,15 +20,11 @@ package org.apache.sedona.core.joinJudgement; import org.apache.commons.lang3.tuple.Pair; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.sedona.core.enums.IndexType; import org.apache.sedona.core.enums.JoinBuildSide; import org.apache.sedona.core.monitoring.Metric; import org.apache.sedona.core.spatialOperator.SpatialPredicate; import org.apache.sedona.core.utils.TimeUtils; -import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.FlatMapFunction2; import org.locationtech.jts.geom.Envelope; import org.locationtech.jts.geom.Geometry; @@ -36,22 +32,14 @@ import org.locationtech.jts.index.SpatialIndex; import org.locationtech.jts.index.quadtree.Quadtree; import org.locationtech.jts.index.strtree.STRtree; -import javax.annotation.Nullable; - import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; public class DynamicIndexLookupJudgement<T extends Geometry, U extends Geometry> - extends JudgementBase + extends JudgementBase<T, U> implements FlatMapFunction2<Iterator<U>, Iterator<T>, Pair<U, T>>, Serializable { - - private static final Logger log = LogManager.getLogger(DynamicIndexLookupJudgement.class); - private final IndexType indexType; private final JoinBuildSide joinBuildSide; private final Metric buildCount; @@ -71,7 +59,7 @@ public class DynamicIndexLookupJudgement<T extends Geometry, U extends Geometry> Metric resultCount, Metric candidateCount) { - super(spatialPredicate); + super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount); this.indexType = indexType; this.joinBuildSide = joinBuildSide; this.buildCount = buildCount; @@ -112,84 +100,16 @@ public class DynamicIndexLookupJudgement<T extends Geometry, U extends Geometry> return new Iterator<Pair<U, T>>() { - // A batch of pre-computed matches - private List<Pair<U, T>> batch = null; - // An index of the element from 'batch' to return next - private int nextIndex = 0; - - private int shapeCnt = 0; - @Override public boolean hasNext() { - if (batch != null) { - return true; - } - else { - return populateNextBatch(); - } + return hasNextBase(spatialIndex, streamShapes, buildLeft); } @Override public Pair<U, T> next() { - if (batch == null) { - populateNextBatch(); - } - - if (batch != null) { - final Pair<U, T> result = batch.get(nextIndex); - nextIndex++; - if (nextIndex >= batch.size()) { - populateNextBatch(); - nextIndex = 0; - } - return result; - } - - throw new NoSuchElementException(); - } - - private boolean populateNextBatch() - { - if (!streamShapes.hasNext()) { - if (batch != null) { - batch = null; - } - return false; - } - - batch = new ArrayList<>(); - - while (streamShapes.hasNext()) { - shapeCnt++; - streamCount.add(1); - final Geometry streamShape = streamShapes.next(); - final List candidates = spatialIndex.query(streamShape.getEnvelopeInternal()); - for (Object candidate : candidates) { - candidateCount.add(1); - final Geometry buildShape = (Geometry) candidate; - if (buildLeft) { - if (match(buildShape, streamShape)) { - batch.add(Pair.of((U) buildShape, (T) streamShape)); - resultCount.add(1); - } - } - else { - if (match(streamShape, buildShape)) { - batch.add(Pair.of((U) streamShape, (T) buildShape)); - resultCount.add(1); - } - } - } - logMilestone(shapeCnt, 100 * 1000, "Streaming shapes"); - if (!batch.isEmpty()) { - return true; - } - } - - batch = null; - return false; + return nextBase(spatialIndex, streamShapes, buildLeft); } @Override @@ -227,20 +147,4 @@ public class DynamicIndexLookupJudgement<T extends Geometry, U extends Geometry> throw new IllegalArgumentException("Unsupported index type: " + indexType); } } - - private void log(String message, Object... params) - { - if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) { - final int partitionId = TaskContext.getPartitionId(); - final long threadId = Thread.currentThread().getId(); - log.info("[" + threadId + ", PID=" + partitionId + "] " + String.format(message, params)); - } - } - - private void logMilestone(long cnt, long threshold, String name) - { - if (cnt > 1 && cnt % threshold == 1) { - log("[%s] Reached a milestone: %d", name, cnt); - } - } } diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java index 54ff5d16..1100a68d 100644 --- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java +++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java @@ -19,11 +19,22 @@ package org.apache.sedona.core.joinJudgement; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.sedona.core.monitoring.Metric; import org.apache.sedona.core.spatialOperator.SpatialPredicate; import org.apache.sedona.core.spatialOperator.SpatialPredicateEvaluators; +import org.apache.spark.TaskContext; import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.index.SpatialIndex; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; /** * Base class for partition level join implementations. @@ -31,19 +42,41 @@ import java.io.Serializable; * Provides `match` method to test whether a given pair of geometries satisfies join condition. * <p> */ -abstract class JudgementBase +abstract class JudgementBase<T extends Geometry, U extends Geometry> implements Serializable { + private static final Logger log = LogManager.getLogger(JudgementBase.class); private final SpatialPredicate spatialPredicate; private transient SpatialPredicateEvaluators.SpatialPredicateEvaluator evaluator; + protected final Metric buildCount; + protected final Metric streamCount; + protected final Metric resultCount; + protected final Metric candidateCount; + + private int shapeCnt; + + // A batch of pre-computed matches + private List<Pair<U, T>> batch = null; + // An index of the element from 'batch' to return next + private int nextIndex = 0; /** + * * @param spatialPredicate spatial predicate as join condition + * @param buildCount num of geometries in build side + * @param streamCount num of geometries in stream side + * @param resultCount num of join results + * @param candidateCount num of candidate pairs to be refined by their real geometries */ - protected JudgementBase(SpatialPredicate spatialPredicate) + protected JudgementBase(SpatialPredicate spatialPredicate, Metric buildCount, Metric streamCount, Metric resultCount, Metric candidateCount) { this.spatialPredicate = spatialPredicate; + this.buildCount = buildCount; + this.streamCount = streamCount; + this.resultCount = resultCount; + this.candidateCount = candidateCount; + this.shapeCnt = 0; } /** @@ -58,8 +91,166 @@ abstract class JudgementBase evaluator = SpatialPredicateEvaluators.create(spatialPredicate); } - public boolean match(Geometry left, Geometry right) + private boolean match(Geometry left, Geometry right) { return evaluator.eval(left, right); } + + protected boolean hasNextBase(SpatialIndex spatialIndex, Iterator<? extends Geometry> streamShapes, + boolean buildLeft) + { + if (batch != null) { + return true; + } + else { + return populateNextBatch(spatialIndex, streamShapes, buildLeft); + } + } + + protected boolean hasNextBase(List<? extends Geometry> buildShapes, Iterator<? extends Geometry> streamShapes) + { + if (batch != null) { + return true; + } + else { + return populateNextBatch(buildShapes, streamShapes); + } + } + + protected Pair<U, T> nextBase(SpatialIndex spatialIndex, Iterator<? extends Geometry> streamShapes, + boolean buildLeft) { + if (batch == null) { + populateNextBatch(spatialIndex, streamShapes, buildLeft); + } + + if (batch != null) { + final Pair<U, T> result = batch.get(nextIndex); + nextIndex++; + if (nextIndex >= batch.size()) { + populateNextBatch(spatialIndex, streamShapes, buildLeft); + nextIndex = 0; + } + return result; + } + + throw new NoSuchElementException(); + } + + protected Pair<U, T> nextBase(List<? extends Geometry> buildShapes, Iterator<? extends Geometry> streamShapes) { + if (batch == null) { + populateNextBatch(buildShapes, streamShapes); + } + + if (batch != null) { + final Pair<U, T> result = batch.get(nextIndex); + nextIndex++; + if (nextIndex >= batch.size()) { + populateNextBatch(buildShapes, streamShapes); + nextIndex = 0; + } + return result; + } + + throw new NoSuchElementException(); + } + + /** + * Populates the next batch of matches. + * It works as fo + * @param spatialIndex + * @param streamShapes + * @param buildLeft + * @return + */ + private boolean populateNextBatch(SpatialIndex spatialIndex, Iterator<? extends Geometry> streamShapes, + boolean buildLeft) + { + if (!streamShapes.hasNext()) { + if (batch != null) { + batch = null; + } + return false; + } + + batch = new ArrayList<>(); + + while (streamShapes.hasNext()) { + shapeCnt++; + streamCount.add(1); + final Geometry streamShape = streamShapes.next(); + final List candidates = spatialIndex.query(streamShape.getEnvelopeInternal()); + for (Object candidate : candidates) { + candidateCount.add(1); + final Geometry buildShape = (Geometry) candidate; + if (buildLeft) { + if (match(buildShape, streamShape)) { + batch.add(Pair.of((U) buildShape, (T) streamShape)); + resultCount.add(1); + } + } + else { + if (match(streamShape, buildShape)) { + batch.add(Pair.of((U) streamShape, (T) buildShape)); + resultCount.add(1); + } + } + } + logMilestone(shapeCnt, 100 * 1000, "Streaming shapes"); + if (!batch.isEmpty()) { + return true; + } + } + + batch = null; + return false; + } + + private boolean populateNextBatch(List<? extends Geometry> buildShapes, Iterator<? extends Geometry> streamShapes) + { + if (!streamShapes.hasNext()) { + if (batch != null) { + batch = null; + } + return false; + } + + batch = new ArrayList<>(); + + while (streamShapes.hasNext()) { + shapeCnt++; + streamCount.add(1); + final Geometry streamShape = streamShapes.next(); + for (Object candidate : buildShapes) { + candidateCount.add(1); + final Geometry buildShape = (Geometry) candidate; + if (match(streamShape, buildShape)) { + batch.add(Pair.of((U) streamShape, (T) buildShape)); + resultCount.add(1); + } + } + logMilestone(shapeCnt, 100 * 1000, "Streaming shapes"); + if (!batch.isEmpty()) { + return true; + } + } + + batch = null; + return false; + } + + private void log(String message, Object... params) + { + if (Level.INFO.isGreaterOrEqual(log.getEffectiveLevel())) { + final int partitionId = TaskContext.getPartitionId(); + final long threadId = Thread.currentThread().getId(); + log.info("[" + threadId + ", PID=" + partitionId + "] " + String.format(message, params)); + } + } + + private void logMilestone(long cnt, long threshold, String name) + { + if (cnt > 1 && cnt % threshold == 1) { + log("[%s] Reached a milestone: %d", name, cnt); + } + } } diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java index e424a878..6ab874f8 100644 --- a/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java +++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/LeftIndexLookupJudgement.java @@ -20,54 +20,69 @@ package org.apache.sedona.core.joinJudgement; import org.apache.commons.lang3.tuple.Pair; +import org.apache.sedona.core.monitoring.Metric; import org.apache.sedona.core.spatialOperator.SpatialPredicate; import org.apache.spark.api.java.function.FlatMapFunction2; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.index.SpatialIndex; -import javax.annotation.Nullable; - import java.io.Serializable; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; -import java.util.List; public class LeftIndexLookupJudgement<T extends Geometry, U extends Geometry> - extends JudgementBase - implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, Pair<T, U>>, Serializable + extends JudgementBase<T, U> + implements FlatMapFunction2<Iterator<SpatialIndex>, Iterator<U>, Pair<U, T>>, Serializable { /** * @see JudgementBase */ - public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate) + public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate, Metric buildCount, + Metric streamCount, + Metric resultCount, + Metric candidateCount) { - super(spatialPredicate); + super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount); } @Override - public Iterator<Pair<T, U>> call(Iterator<SpatialIndex> indexIterator, Iterator<U> streamShapes) + public Iterator<Pair<U, T>> call(Iterator<SpatialIndex> indexIterator, Iterator<U> streamShapes) throws Exception { - List<Pair<T, U>> result = new ArrayList<>(); - if (!indexIterator.hasNext() || !streamShapes.hasNext()) { - return result.iterator(); + buildCount.add(0); + streamCount.add(0); + resultCount.add(0); + candidateCount.add(0); + return Collections.emptyIterator(); } + final boolean buildLeft = true; + initPartition(); - SpatialIndex treeIndex = indexIterator.next(); - while (streamShapes.hasNext()) { - U streamShape = streamShapes.next(); - List<Geometry> candidates = treeIndex.query(streamShape.getEnvelopeInternal()); - for (Geometry candidate : candidates) { - // Refine phase. Use the real polygon (instead of its MBR) to recheck the spatial relation. - if (match(candidate, streamShape)) { - result.add(Pair.of((T) candidate, streamShape)); - } + SpatialIndex spatialIndex = indexIterator.next(); + + return new Iterator<Pair<U, T>>() + { + @Override + public boolean hasNext() + { + return hasNextBase(spatialIndex, streamShapes, buildLeft); } - } - return result.iterator(); + + @Override + public Pair<U, T> next() + { + return nextBase(spatialIndex, streamShapes, buildLeft); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; } } diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java index ce29bef4..66138895 100644 --- a/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java +++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/NestedLoopJudgement.java @@ -20,14 +20,11 @@ package org.apache.sedona.core.joinJudgement; import org.apache.commons.lang3.tuple.Pair; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; +import org.apache.sedona.core.monitoring.Metric; import org.apache.sedona.core.spatialOperator.SpatialPredicate; import org.apache.spark.api.java.function.FlatMapFunction2; import org.locationtech.jts.geom.Geometry; -import javax.annotation.Nullable; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -35,17 +32,19 @@ import java.util.Iterator; import java.util.List; public class NestedLoopJudgement<T extends Geometry, U extends Geometry> - extends JudgementBase + extends JudgementBase<T, U> implements FlatMapFunction2<Iterator<T>, Iterator<U>, Pair<U, T>>, Serializable { - private static final Logger log = LogManager.getLogger(NestedLoopJudgement.class); - /** * @see JudgementBase */ - public NestedLoopJudgement(SpatialPredicate spatialPredicate) + public NestedLoopJudgement(SpatialPredicate spatialPredicate, + Metric buildCount, + Metric streamCount, + Metric resultCount, + Metric candidateCount) { - super(spatialPredicate); + super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount); } @Override @@ -53,26 +52,38 @@ public class NestedLoopJudgement<T extends Geometry, U extends Geometry> throws Exception { if (!iteratorObject.hasNext() || !iteratorWindow.hasNext()) { + buildCount.add(0); + streamCount.add(0); + resultCount.add(0); + candidateCount.add(0); return Collections.emptyIterator(); } initPartition(); - List<Pair<U, T>> result = new ArrayList<>(); List<T> queryObjects = new ArrayList<>(); while (iteratorObject.hasNext()) { queryObjects.add(iteratorObject.next()); } - while (iteratorWindow.hasNext()) { - U window = iteratorWindow.next(); - for (int i = 0; i < queryObjects.size(); i++) { - T object = queryObjects.get(i); - //log.warn("Check "+window.toText()+" with "+object.toText()); - if (match(window, object)) { - result.add(Pair.of(window, object)); - } + return new Iterator<Pair<U, T>>() + { + @Override + public boolean hasNext() + { + return hasNextBase(queryObjects, iteratorWindow); } - } - return result.iterator(); + + @Override + public Pair<U, T> next() + { + return nextBase(queryObjects, iteratorWindow); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; } } diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java index f0215fdb..9cce4dc6 100644 --- a/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java +++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/RightIndexLookupJudgement.java @@ -20,54 +20,70 @@ package org.apache.sedona.core.joinJudgement; import org.apache.commons.lang3.tuple.Pair; +import org.apache.sedona.core.monitoring.Metric; import org.apache.sedona.core.spatialOperator.SpatialPredicate; import org.apache.spark.api.java.function.FlatMapFunction2; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.index.SpatialIndex; -import javax.annotation.Nullable; - import java.io.Serializable; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; -import java.util.List; public class RightIndexLookupJudgement<T extends Geometry, U extends Geometry> - extends JudgementBase - implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, Pair<T, U>>, Serializable + extends JudgementBase<T, U> + implements FlatMapFunction2<Iterator<T>, Iterator<SpatialIndex>, Pair<U, T>>, Serializable { /** * @see JudgementBase */ - public RightIndexLookupJudgement(SpatialPredicate spatialPredicate) + public RightIndexLookupJudgement(SpatialPredicate spatialPredicate, + Metric buildCount, + Metric streamCount, + Metric resultCount, + Metric candidateCount) { - super(spatialPredicate); + super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount); } @Override - public Iterator<Pair<T, U>> call(Iterator<T> streamShapes, Iterator<SpatialIndex> indexIterator) + public Iterator<Pair<U, T>> call(Iterator<T> streamShapes, Iterator<SpatialIndex> indexIterator) throws Exception { - List<Pair<T, U>> result = new ArrayList<>(); - if (!indexIterator.hasNext() || !streamShapes.hasNext()) { - return result.iterator(); + buildCount.add(0); + streamCount.add(0); + resultCount.add(0); + candidateCount.add(0); + return Collections.emptyIterator(); } + final boolean buildLeft = false; + initPartition(); - SpatialIndex treeIndex = indexIterator.next(); - while (streamShapes.hasNext()) { - T streamShape = streamShapes.next(); - List<Geometry> candidates = treeIndex.query(streamShape.getEnvelopeInternal()); - for (Geometry candidate : candidates) { - // Refine phase. Use the real polygon (instead of its MBR) to recheck the spatial relation. - if (match(streamShape, candidate)) { - result.add(Pair.of(streamShape, (U) candidate)); - } + SpatialIndex spatialIndex = indexIterator.next(); + + return new Iterator<Pair<U, T>>() + { + @Override + public boolean hasNext() + { + return hasNextBase(spatialIndex, streamShapes, buildLeft); } - } - return result.iterator(); + + @Override + public Pair<U, T> next() + { + return nextBase(spatialIndex, streamShapes, buildLeft); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; } } diff --git a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java index 7bda2b5c..bec77f9d 100644 --- a/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java +++ b/core/src/main/java/org/apache/sedona/core/spatialOperator/JoinQuery.java @@ -549,12 +549,14 @@ public class JoinQuery if (joinParams.useIndex) { if (rightRDD.indexedRDD != null) { final RightIndexLookupJudgement judgement = - new RightIndexLookupJudgement(joinParams.spatialPredicate); + new RightIndexLookupJudgement(joinParams.spatialPredicate, + buildCount, streamCount, resultCount, candidateCount); joinResult = leftRDD.spatialPartitionedRDD.zipPartitions(rightRDD.indexedRDD, judgement); } else if (leftRDD.indexedRDD != null) { final LeftIndexLookupJudgement judgement = - new LeftIndexLookupJudgement(joinParams.spatialPredicate); + new LeftIndexLookupJudgement(joinParams.spatialPredicate, + buildCount, streamCount, resultCount, candidateCount); joinResult = leftRDD.indexedRDD.zipPartitions(rightRDD.spatialPartitionedRDD, judgement); } else { @@ -569,7 +571,8 @@ public class JoinQuery } } else { - NestedLoopJudgement judgement = new NestedLoopJudgement(joinParams.spatialPredicate); + NestedLoopJudgement judgement = new NestedLoopJudgement(joinParams.spatialPredicate, + buildCount, streamCount, resultCount, candidateCount); joinResult = rightRDD.spatialPartitionedRDD.zipPartitions(leftRDD.spatialPartitionedRDD, judgement); }
