This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc39b39 [SEDONA-315] Add optimized join support for 
ST_FrechetDistance (#879)
9fc39b39 is described below

commit 9fc39b39fee65bf715250ff9f1944c9d7b2f150f
Author: Nilesh Gajwani <[email protected]>
AuthorDate: Fri Jun 30 20:42:18 2023 -0700

    [SEDONA-315] Add optimized join support for ST_FrechetDistance (#879)
    
    Co-authored-by: Jia Yu <[email protected]>
---
 docs/api/sql/Optimizer.md                          | 18 +++++++++++++---
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |  1 +
 .../sql/sedona_sql/expressions/st_functions.scala  |  2 ++
 .../strategy/join/JoinQueryDetector.scala          | 13 +++++++++++
 .../sedona/sql/BroadcastIndexJoinSuite.scala       | 18 ++++++++++++++++
 .../org/apache/sedona/sql/TestBaseScala.scala      | 17 ++++++++++++++-
 .../apache/sedona/sql/predicateJoinTestScala.scala | 25 ++++++++++++++++++++--
 7 files changed, 88 insertions(+), 6 deletions(-)

diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md
index 1fe54ef7..03a4bffe 100644
--- a/docs/api/sql/Optimizer.md
+++ b/docs/api/sql/Optimizer.md
@@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry, 
false
 
 ## Distance join
 
-Introduction: Find geometries from A and geometries from B such that the 
distance of each geometry pair is less or equal than a certain distance. It 
supports the planar Euclidean distance calculators `ST_Distance` and 
`ST_HausdorffDistance` and the meter-based geodesic distance calculators 
`ST_DistanceSpheroid` and `ST_DistanceSphere`.
+Introduction: Find geometries from A and geometries from B such that the 
distance of each geometry pair is less or equal than a certain distance. It 
supports the planar Euclidean distance calculators `ST_Distance`, 
`ST_HausdorffDistance`, `ST_FrechetDistance` and the meter-based geodesic 
distance calculators `ST_DistanceSpheroid` and `ST_DistanceSphere`.
 
 Spark SQL Example for planar Euclidean distance:
 
@@ -63,6 +63,12 @@ FROM pointDf, polygonDF
 WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
 ```
 
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) < 2
+```
+
 *Consider ==intersects within a certain distance==*
 ```sql
 SELECT *
@@ -76,6 +82,12 @@ FROM pointDf, polygonDF
 WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
 ```
 
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
+```
+
 Spark SQL Physical plan:
 ```
 == Physical Plan ==
@@ -87,7 +99,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33: 
geometry, 2.0, true
 ```
 
 !!!warning
-       If you use planar euclidean distance functions like `ST_Distance` or 
`ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's 
unit (degree or meter). It is same with the geometry. If your coordinates are 
in the longitude and latitude system, the unit of `distance` should be degree 
instead of meter or mile. To change the geometry's unit, please either 
transform the coordinate reference system to a meter-based system. See 
[ST_Transform](Function.md#st_transform). If  [...]
+       If you use planar euclidean distance functions like `ST_Distance`, 
`ST_HausdorffDistance` or `ST_FrechetDistance` as the predicate, Sedona doesn't 
control the distance's unit (degree or meter). It is same with the geometry. If 
your coordinates are in the longitude and latitude system, the unit of 
`distance` should be degree instead of meter or mile. To change the geometry's 
unit, please either transform the coordinate reference system to a meter-based 
system. See [ST_Transform](Function [...]
 
 Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid` 
(works for `ST_DistanceSphere` too):
 
@@ -138,7 +150,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight, 
BuildRight, false ST_Con
       +- FileScan csv
 ```
 
-This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`, 
`ST_DistanceSphere` or `ST_HausdorffDistance`:
+This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`, 
`ST_DistanceSphere`, `ST_HausdorffDistance` or `ST_FrechetDistance`:
 
 ```scala
 pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"), 
expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
diff --git a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala 
b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index df6134b3..31b11a53 100644
--- a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -154,6 +154,7 @@ object Catalog {
     function[ST_NRings](),
     function[ST_Translate](0.0),
     function[ST_FrechetDistance](),
+    function[ST_Affine](null, null, null, null, null, null),
     function[ST_Affine](),
     function[ST_BoundingDiagonal](),
     function[ST_Angle](),
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
index 210529a9..881d4fd4 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/st_functions.scala
@@ -335,6 +335,7 @@ object st_functions extends DataFrameAPI {
   def ST_FrechetDistance(g1: Column, g2: Column): Column = 
wrapExpression[ST_FrechetDistance](g1, g2)
 
   def ST_FrechetDistance(g1: String, g2: String): Column = 
wrapExpression[ST_FrechetDistance](g1, g2)
+
   def ST_Affine(geometry: Column, a: Column, b: Column, c: Column, d: Column, 
e: Column, f: Column, g: Column, h: Column, i: Column, xOff: Column, yOff: 
Column, zOff: Column): Column =
     wrapExpression[ST_Affine](geometry, a, b, c, d, e, f, g, h, i, xOff, yOff, 
zOff)
 
@@ -377,3 +378,4 @@ object st_functions extends DataFrameAPI {
   def ST_HausdorffDistance(g1: String, g2: String, densityFrac: Double) = 
wrapExpression[ST_HausdorffDistance](g1, g2, densityFrac);
 
 }
+ 
\ No newline at end of file
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index d6d4a2bc..2715e950 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -175,6 +175,19 @@ class JoinQueryDetector(sparkSession: SparkSession) 
extends Strategy {
           Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
         case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, 
rightShape, densityFrac)), distance))) =>
           Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        //ST_FrechetDistance
+        case Some(LessThanOrEqual(ST_FrechetDistance(Seq(leftShape, 
rightShape)), distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThanOrEqual(ST_FrechetDistance(Seq(leftShape, 
rightShape)), distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThanOrEqual(ST_FrechetDistance(Seq(leftShape, 
rightShape)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(LessThan(ST_FrechetDistance(Seq(leftShape, rightShape)), 
distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThan(ST_FrechetDistance(Seq(leftShape, rightShape)), 
distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThan(ST_FrechetDistance(Seq(leftShape, 
rightShape)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
         case _ =>
           None
       }
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
index d2232278..c22ba601 100644
--- 
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
+++ 
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
@@ -387,6 +387,24 @@ class BroadcastIndexJoinSuite extends TestBaseScala {
       assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
       assert(distanceJoinDF.count() == expected)
     }
+
+    it("Passed ST_FrechetDistance <= distance") {
+      val sampleCount = 200
+      val distance = 2.0
+      val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+      val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+      val expected = bruteForceDistanceJoinFrechet(sampleCount, distance, true)
+
+      var distanceJoinDF = pointDf.alias("pointDf").join(
+        broadcast(polygonDf).alias("polygonDF"), 
expr(s"ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <= 
$distance"))
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
+      assert(distanceJoinDF.count() == expected)
+
+      distanceJoinDF = 
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), 
expr(s"ST_FrechetDistance(pointDf.pointshape, polygonDf.polygonshape) <= 
$distance"))
+
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
+      assert(distanceJoinDF.count() == expected)
+    }
   }
 
   describe("Sedona-SQL Broadcast Index Join Test for left semi joins") {
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index dc6936dd..46978a21 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -21,7 +21,7 @@ package org.apache.sedona.sql
 import com.google.common.math.DoubleMath
 import org.apache.log4j.{Level, Logger}
 import org.apache.sedona.common.sphere.{Haversine, Spheroid}
-import org.apache.sedona.common.Functions.hausdorffDistance
+import org.apache.sedona.common.Functions.{hausdorffDistance, frechetDistance}
 import org.apache.sedona.spark.SedonaContext
 import org.apache.spark.sql.DataFrame
 import org.locationtech.jts.geom.{CoordinateSequence, 
CoordinateSequenceComparator}
@@ -139,4 +139,19 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll 
{
       }).sum
     }).sum
   }
+
+  protected def bruteForceDistanceJoinFrechet(sampleCount: Int, distance: 
Double, intersects: Boolean): Int = {
+    val inputPolygon = buildPolygonDf.limit(sampleCount).collect()
+    val inputPoint = buildPointDf.limit(sampleCount).collect()
+    inputPoint.map(row => {
+      val point = row.getAs[org.locationtech.jts.geom.Point](0)
+      inputPolygon.map(row => {
+        val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0)
+        if (intersects)
+          if (frechetDistance(point, polygon) <= distance) 1 else 0
+        else
+          if (frechetDistance(point, polygon) < distance) 1 else 0
+      }).sum
+    }).sum
+  }
 }
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index efa7280b..749399ad 100644
--- 
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++ 
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -422,7 +422,7 @@ class predicateJoinTestScala extends TestBaseScala {
 
         //DensityFrac specified, < distance
         val expectedDensityNoIntersect = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false)
-        val distanceDensityNoIntersectDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) < $distance"))
+        val distanceDensityNoIntersectDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) <= $distance"))
         assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect { 
case p: DistanceJoinExec => p }.size === 1)
         assert(distanceDensityNoIntersectDF.count() == 
expectedDensityNoIntersect)
 
@@ -434,10 +434,31 @@ class predicateJoinTestScala extends TestBaseScala {
 
         //DensityFrac not specified, < distance
         val expectedDefaultNoIntersects = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false)
-        val distanceDefaultNoIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) < $distance"))
+        val distanceDefaultNoIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) <= $distance"))
         assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect 
{ case p: DistanceJoinExec => p }.size === 1)
         assert(distanceDefaultIntersectsDF.count() == 
expectedDefaultNoIntersects)
       })
     }
+
+    it("Passed ST_FrechetDistance in a spatial join") {
+      val sampleCount = 200
+      val distanceCandidates = Seq(1, 2, 5, 10)
+      val inputPoint = buildPointDf.limit(sampleCount).repartition(5)
+      val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3)
+
+      distanceCandidates.foreach(distance => {
+        // <= distance
+        val expectedIntersects = bruteForceDistanceJoinFrechet(sampleCount, 
distance, true)
+        val distanceDefaultIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_FrechetDistance(pointDF.pointshape, polygonDF.polygonshape) <= 
$distance"))
+        assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect { 
case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDefaultIntersectsDF.count() == expectedIntersects)
+
+        // < distance
+        val expectedNoIntersects = bruteForceDistanceJoinFrechet(sampleCount, 
distance, false)
+        val distanceDefaultNoIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_FrechetDistance(pointDF.pointshape, polygonDF.polygonshape) <= 
$distance"))
+        assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect 
{ case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDefaultIntersectsDF.count() == expectedNoIntersects)
+      })
+    }
   }
 }

Reply via email to