This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 8b2afd6d [SEDONA-314] Support optimized join on ST_HausdorffDistance
(#878)
8b2afd6d is described below
commit 8b2afd6d018acb6fd2713d3c880acddef9e21918
Author: Nilesh Gajwani <[email protected]>
AuthorDate: Fri Jun 30 01:16:09 2023 -0700
[SEDONA-314] Support optimized join on ST_HausdorffDistance (#878)
---
docs/api/sql/Optimizer.md | 18 +++++++++--
.../sql/sedona_sql/expressions/Functions.scala | 3 +-
.../strategy/join/JoinQueryDetector.scala | 26 +++++++++++++++
.../sedona/sql/BroadcastIndexJoinSuite.scala | 37 ++++++++++++++++++++++
.../org/apache/sedona/sql/TestBaseScala.scala | 22 +++++++++++++
.../apache/sedona/sql/predicateJoinTestScala.scala | 34 ++++++++++++++++++++
6 files changed, 135 insertions(+), 5 deletions(-)
diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md
index 3fa0242b..1fe54ef7 100644
--- a/docs/api/sql/Optimizer.md
+++ b/docs/api/sql/Optimizer.md
@@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry,
false
## Distance join
-Introduction: Find geometries from A and geometries from B such that the
distance of each geometry pair is less or equal than a certain distance. It
supports the planar Euclidean distance calculator `ST_Distance` and the
meter-based geodesic distance calculators `ST_DistanceSpheroid` and
`ST_DistanceSphere`.
+Introduction: Find geometries from A and geometries from B such that the
distance of each geometry pair is less or equal than a certain distance. It
supports the planar Euclidean distance calculators `ST_Distance` and
`ST_HausdorffDistance` and the meter-based geodesic distance calculators
`ST_DistanceSpheroid` and `ST_DistanceSphere`.
Spark SQL Example for planar Euclidean distance:
@@ -57,6 +57,12 @@ FROM pointdf1, pointdf2
WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2
```
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
+```
+
*Consider ==intersects within a certain distance==*
```sql
SELECT *
@@ -64,6 +70,12 @@ FROM pointdf1, pointdf2
WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) <= 2
```
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
+```
+
Spark SQL Physical plan:
```
== Physical Plan ==
@@ -75,7 +87,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33:
geometry, 2.0, true
```
!!!warning
- If you use `ST_Distance` as the predicate, Sedona doesn't control the
distance's unit (degree or meter). It is same with the geometry. If your
coordinates are in the longitude and latitude system, the unit of `distance`
should be degree instead of meter or mile. To change the geometry's unit,
please either transform the coordinate reference system to a meter-based
system. See [ST_Transform](Function.md#st_transform). If you don't want to
transform your data, please consider using `ST_Di [...]
+ If you use planar euclidean distance functions like `ST_Distance` or
`ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's
unit (degree or meter). It is same with the geometry. If your coordinates are
in the longitude and latitude system, the unit of `distance` should be degree
instead of meter or mile. To change the geometry's unit, please either
transform the coordinate reference system to a meter-based system. See
[ST_Transform](Function.md#st_transform). If [...]
Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid`
(works for `ST_DistanceSphere` too):
@@ -126,7 +138,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight,
BuildRight, false ST_Con
+- FileScan csv
```
-This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`
or `ST_DistanceSphere`:
+This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`,
`ST_DistanceSphere` or `ST_HausdorffDistance`:
```scala
pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"),
expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
diff --git
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index c661fc92..df2d6155 100644
---
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -1036,7 +1036,7 @@ case class ST_BoundingDiagonal(inputExpressions:
Seq[Expression])
}
case class ST_HausdorffDistance(inputExpressions: Seq[Expression])
- extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance))
{
+ extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance),
inferrableFunction2(Functions.hausdorffDistance)) {
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) =
{
copy(inputExpressions = newChildren)
}
@@ -1062,4 +1062,3 @@ case class ST_Degrees(inputExpressions: Seq[Expression])
copy(inputExpressions = newChildren)
}
}
-
diff --git
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index 8a8f411b..d6d4a2bc 100644
---
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -149,6 +149,32 @@ class JoinQueryDetector(sparkSession: SparkSession)
extends Strategy {
Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
case Some(And(_, LessThan(ST_DistanceSpheroid(Seq(leftShape,
rightShape)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
+ //ST_HausdorffDistanceDefault
+ case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape)), distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape)), distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)),
distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape,
rightShape)), distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape,
rightShape)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ //ST_HausdorffDistanceDensityFrac
+ case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape, densityFrac)), distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape, densityFrac)), distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape,
rightShape, densityFrac)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape,
densityFrac)), distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape,
densityFrac)), distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape,
rightShape, densityFrac)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case _ =>
None
}
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
index 6f569128..d2232278 100644
---
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
+++
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
@@ -350,6 +350,43 @@ class BroadcastIndexJoinSuite extends TestBaseScala {
assert(distanceJoinDf.count() == expected)
})
}
+
+ it("Passed ST_HausdorffDistance with densityFrac <= distance in a
broadcast join") {
+ val sampleCount = 100
+ val distance = 1.0
+ val densityFrac = 0.5
+ val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+ val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+ val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance,
0.5, true)
+
+ var distanceJoinDF = pointDf.alias("pointDf").join(
+ broadcast(polygonDf).alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape,
$densityFrac) <= $distance"))
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect{case p:
BroadcastIndexJoinExec => p}.size == 1)
+ assert(distanceJoinDF.count() == expected)
+
+ distanceJoinDF =
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"),
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape,
$densityFrac) <= $distance"))
+
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
+ assert(distanceJoinDF.count() == expected)
+ }
+
+ it("Passed ST_HausdorffDistance <= distance in a broadcast join") {
+ val sampleCount = 200
+ val distance = 2.0
+ val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+ val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+ val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0,
true)
+
+ var distanceJoinDF = pointDf.alias("pointDf").join(
+ broadcast(polygonDf).alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <=
$distance"))
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
+ assert(distanceJoinDF.count() == expected)
+
+ distanceJoinDF =
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"),
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <=
$distance"))
+
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
+ assert(distanceJoinDF.count() == expected)
+ }
}
describe("Sedona-SQL Broadcast Index Join Test for left semi joins") {
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 21f31a3b..dc6936dd 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -21,6 +21,7 @@ package org.apache.sedona.sql
import com.google.common.math.DoubleMath
import org.apache.log4j.{Level, Logger}
import org.apache.sedona.common.sphere.{Haversine, Spheroid}
+import org.apache.sedona.common.Functions.hausdorffDistance
import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.locationtech.jts.geom.{CoordinateSequence,
CoordinateSequenceComparator}
@@ -117,4 +118,25 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll
{
}).sum
}
+ protected def bruteForceDistanceJoinHausdorff(sampleCount: Int, distance:
Double, densityFrac: Double, intersects: Boolean): Int = {
+ val inputPolygon = buildPolygonDf.limit(sampleCount).collect()
+ val inputPoint = buildPointDf.limit(sampleCount).collect()
+ inputPoint.map(row => {
+ val point = row.getAs[org.locationtech.jts.geom.Point](0)
+ inputPolygon.map(row => {
+ val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0)
+ if (densityFrac == 0) {
+ if (intersects)
+ if (hausdorffDistance(point, polygon) <= distance) 1 else 0
+ else
+ if (hausdorffDistance(point, polygon) < distance) 1 else 0
+ } else {
+ if (intersects)
+ if (hausdorffDistance(point, polygon, densityFrac) <= distance) 1
else 0
+ else
+ if (hausdorffDistance(point, polygon, densityFrac) < distance) 1
else 0
+ }
+ }).sum
+ }).sum
+ }
}
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index 66ee94ca..efa7280b 100644
---
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -403,7 +403,41 @@ class predicateJoinTestScala extends TestBaseScala {
assert(distanceJoinDf.queryExecution.sparkPlan.collect { case p:
DistanceJoinExec => p }.size === 1)
assert(distanceJoinDf.count() == expected)
})
+ }
+
+ it("Passed ST_HausdorffDistance in a spatial join") {
+ val sampleCount = 100
+ val distanceCandidates = Seq(1, 2, 5, 10)
+ val densityFrac = 0.6
+ val inputPoint = buildPointDf.limit(sampleCount).repartition(5)
+ val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3)
+ distanceCandidates.foreach(distance => {
+
+ //DensityFrac specified, <= distance
+ val expectedDensityIntersects =
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, true)
+ val distanceDensityIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) <= $distance"))
+ assert(distanceDensityIntersectsDF.queryExecution.sparkPlan.collect {
case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDensityIntersectsDF.count() ==
expectedDensityIntersects)
+
+ //DensityFrac specified, < distance
+ val expectedDensityNoIntersect =
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false)
+ val distanceDensityNoIntersectDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) < $distance"))
+ assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect {
case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDensityNoIntersectDF.count() ==
expectedDensityNoIntersect)
+
+ //DensityFrac not specified, <= distance
+ val expectedDefaultIntersects =
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, true)
+ val distanceDefaultIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) <= $distance"))
+ assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect {
case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDefaultIntersectsDF.count() ==
expectedDefaultIntersects)
+
+ //DensityFrac not specified, < distance
+ val expectedDefaultNoIntersects =
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false)
+ val distanceDefaultNoIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) < $distance"))
+ assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect
{ case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDefaultIntersectsDF.count() ==
expectedDefaultNoIntersects)
+ })
}
}
}