This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc39b39 [SEDONA-315] Add optimized join support for
ST_FrechetDistance (#879)
9fc39b39 is described below
commit 9fc39b39fee65bf715250ff9f1944c9d7b2f150f
Author: Nilesh Gajwani <[email protected]>
AuthorDate: Fri Jun 30 20:42:18 2023 -0700
[SEDONA-315] Add optimized join support for ST_FrechetDistance (#879)
Co-authored-by: Jia Yu <[email protected]>
---
docs/api/sql/Optimizer.md | 18 +++++++++++++---
.../scala/org/apache/sedona/sql/UDF/Catalog.scala | 1 +
.../sql/sedona_sql/expressions/st_functions.scala | 2 ++
.../strategy/join/JoinQueryDetector.scala | 13 +++++++++++
.../sedona/sql/BroadcastIndexJoinSuite.scala | 18 ++++++++++++++++
.../org/apache/sedona/sql/TestBaseScala.scala | 17 ++++++++++++++-
.../apache/sedona/sql/predicateJoinTestScala.scala | 25 ++++++++++++++++++++--
7 files changed, 88 insertions(+), 6 deletions(-)
diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md
index 1fe54ef7..03a4bffe 100644
--- a/docs/api/sql/Optimizer.md
+++ b/docs/api/sql/Optimizer.md
@@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry,
false
## Distance join
-Introduction: Find geometries from A and geometries from B such that the
distance of each geometry pair is less or equal than a certain distance. It
supports the planar Euclidean distance calculators `ST_Distance` and
`ST_HausdorffDistance` and the meter-based geodesic distance calculators
`ST_DistanceSpheroid` and `ST_DistanceSphere`.
+Introduction: Find geometries from A and geometries from B such that the
distance of each geometry pair is less or equal than a certain distance. It
supports the planar Euclidean distance calculators `ST_Distance`,
`ST_HausdorffDistance`, `ST_FrechetDistance` and the meter-based geodesic
distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`.
Spark SQL Example for planar Euclidean distance:
@@ -63,6 +63,12 @@ FROM pointDf, polygonDF
WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
```
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) < 2
+```
+
*Consider ==intersects within a certain distance==*
```sql
SELECT *
@@ -76,6 +82,12 @@ FROM pointDf, polygonDF
WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
```
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
+```
+
Spark SQL Physical plan:
```
== Physical Plan ==
@@ -87,7 +99,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33:
geometry, 2.0, true
```
!!!warning
- If you use planar euclidean distance functions like `ST_Distance` or
`ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's
unit (degree or meter). It is same with the geometry. If your coordinates are
in the longitude and latitude system, the unit of `distance` should be degree
instead of meter or mile. To change the geometry's unit, please either
transform the coordinate reference system to a meter-based system. See
[ST_Transform](Function.md#st_transform). If [...]
+ If you use planar euclidean distance functions like `ST_Distance`,
`ST_HausdorffDistance` or `ST_FrechetDistance` as the predicate, Sedona doesn't
control the distance's unit (degree or meter). It is same with the geometry. If
your coordinates are in the longitude and latitude system, the unit of
`distance` should be degree instead of meter or mile. To change the geometry's
unit, please either transform the coordinate reference system to a meter-based
system. See [ST_Transform](Function [...]
Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid`
(works for `ST_DistanceSphere` too):
@@ -138,7 +150,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight,
BuildRight, false ST_Con
+- FileScan csv
```
-This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`,
`ST_DistanceSphere` or `ST_HausdorffDistance`:
+This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`,
`ST_DistanceSphere`, `ST_HausdorffDistance` or `ST_FrechetDistance`:
```scala
pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"),
expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
diff --git a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index df6134b3..31b11a53 100644
--- a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -154,6 +154,7 @@ object Catalog {
function[ST_NRings](),
function[ST_Translate](0.0),
function[ST_FrechetDistance](),
+ function[ST_Affine](null, null, null, null, null, null),
function[ST_Affine](),
function[ST_BoundingDiagonal](),
function[ST_Angle](),
diff --git
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
index 210529a9..881d4fd4 100644
---
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
+++
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
@@ -335,6 +335,7 @@ object st_functions extends DataFrameAPI {
def ST_FrechetDistance(g1: Column, g2: Column): Column =
wrapExpression[ST_FrechetDistance](g1, g2)
def ST_FrechetDistance(g1: String, g2: String): Column =
wrapExpression[ST_FrechetDistance](g1, g2)
+
def ST_Affine(geometry: Column, a: Column, b: Column, c: Column, d: Column,
e: Column, f: Column, g: Column, h: Column, i: Column, xOff: Column, yOff:
Column, zOff: Column): Column =
wrapExpression[ST_Affine](geometry, a, b, c, d, e, f, g, h, i, xOff, yOff,
zOff)
@@ -377,3 +378,4 @@ object st_functions extends DataFrameAPI {
def ST_HausdorffDistance(g1: String, g2: String, densityFrac: Double) =
wrapExpression[ST_HausdorffDistance](g1, g2, densityFrac);
}
+
\ No newline at end of file
diff --git
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index d6d4a2bc..2715e950 100644
---
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -175,6 +175,19 @@ class JoinQueryDetector(sparkSession: SparkSession)
extends Strategy {
Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape,
rightShape, densityFrac)), distance))) =>
Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ //ST_FrechetDistance
+ case Some(LessThanOrEqual(ST_FrechetDistance(Seq(leftShape,
rightShape)), distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThanOrEqual(ST_FrechetDistance(Seq(leftShape,
rightShape)), distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThanOrEqual(ST_FrechetDistance(Seq(leftShape,
rightShape)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(LessThan(ST_FrechetDistance(Seq(leftShape, rightShape)),
distance)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(LessThan(ST_FrechetDistance(Seq(leftShape, rightShape)),
distance), _)) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+ case Some(And(_, LessThan(ST_FrechetDistance(Seq(leftShape,
rightShape)), distance))) =>
+ Some(JoinQueryDetection(left, right, leftShape, rightShape,
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
case _ =>
None
}
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
index d2232278..c22ba601 100644
---
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
+++
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
@@ -387,6 +387,24 @@ class BroadcastIndexJoinSuite extends TestBaseScala {
assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
assert(distanceJoinDF.count() == expected)
}
+
+ it("Passed ST_FrechetDistance <= distance") {
+ val sampleCount = 200
+ val distance = 2.0
+ val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+ val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+ val expected = bruteForceDistanceJoinFrechet(sampleCount, distance, true)
+
+ var distanceJoinDF = pointDf.alias("pointDf").join(
+ broadcast(polygonDf).alias("polygonDF"),
expr(s"ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <=
$distance"))
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
+ assert(distanceJoinDF.count() == expected)
+
+ distanceJoinDF =
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"),
expr(s"ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <=
$distance"))
+
+ assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p:
BroadcastIndexJoinExec => p }.size == 1)
+ assert(distanceJoinDF.count() == expected)
+ }
}
describe("Sedona-SQL Broadcast Index Join Test for left semi joins") {
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index dc6936dd..46978a21 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -21,7 +21,7 @@ package org.apache.sedona.sql
import com.google.common.math.DoubleMath
import org.apache.log4j.{Level, Logger}
import org.apache.sedona.common.sphere.{Haversine, Spheroid}
-import org.apache.sedona.common.Functions.hausdorffDistance
+import org.apache.sedona.common.Functions.{hausdorffDistance, frechetDistance}
import org.apache.sedona.spark.SedonaContext
import org.apache.spark.sql.DataFrame
import org.locationtech.jts.geom.{CoordinateSequence,
CoordinateSequenceComparator}
@@ -139,4 +139,19 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll
{
}).sum
}).sum
}
+
+ protected def bruteForceDistanceJoinFrechet(sampleCount: Int, distance:
Double, intersects: Boolean): Int = {
+ val inputPolygon = buildPolygonDf.limit(sampleCount).collect()
+ val inputPoint = buildPointDf.limit(sampleCount).collect()
+ inputPoint.map(row => {
+ val point = row.getAs[org.locationtech.jts.geom.Point](0)
+ inputPolygon.map(row => {
+ val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0)
+ if (intersects)
+ if (frechetDistance(point, polygon) <= distance) 1 else 0
+ else
+ if (frechetDistance(point, polygon) < distance) 1 else 0
+ }).sum
+ }).sum
+ }
}
diff --git
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index efa7280b..749399ad 100644
---
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -422,7 +422,7 @@ class predicateJoinTestScala extends TestBaseScala {
//DensityFrac specified, < distance
val expectedDensityNoIntersect =
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false)
- val distanceDensityNoIntersectDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) < $distance"))
+ val distanceDensityNoIntersectDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) <= $distance"))
assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect {
case p: DistanceJoinExec => p }.size === 1)
assert(distanceDensityNoIntersectDF.count() ==
expectedDensityNoIntersect)
@@ -434,10 +434,31 @@ class predicateJoinTestScala extends TestBaseScala {
//DensityFrac not specified, < distance
val expectedDefaultNoIntersects =
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false)
- val distanceDefaultNoIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) < $distance"))
+ val distanceDefaultNoIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape,
$densityFrac) <= $distance"))
assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect
{ case p: DistanceJoinExec => p }.size === 1)
assert(distanceDefaultIntersectsDF.count() ==
expectedDefaultNoIntersects)
})
}
+
+ it("Passed ST_FrechetDistance in a spatial join") {
+ val sampleCount = 200
+ val distanceCandidates = Seq(1, 2, 5, 10)
+ val inputPoint = buildPointDf.limit(sampleCount).repartition(5)
+ val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3)
+
+ distanceCandidates.foreach(distance => {
+ // <= distance
+ val expectedIntersects = bruteForceDistanceJoinFrechet(sampleCount,
distance, true)
+ val distanceDefaultIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_FrechetDistance(pointDF.pointshape, polygonDF.polygonshape) <=
$distance"))
+ assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect {
case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDefaultIntersectsDF.count() == expectedIntersects)
+
+ // < distance
+ val expectedNoIntersects = bruteForceDistanceJoinFrechet(sampleCount,
distance, false)
+ val distanceDefaultNoIntersectsDF =
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"),
expr(s"ST_FrechetDistance(pointDF.pointshape, polygonDF.polygonshape) <=
$distance"))
+ assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect
{ case p: DistanceJoinExec => p }.size === 1)
+ assert(distanceDefaultIntersectsDF.count() == expectedNoIntersects)
+ })
+ }
}
}