This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b2afd6d [SEDONA-314] Support optimized join on ST_HausdorffDistance 
(#878)
8b2afd6d is described below

commit 8b2afd6d018acb6fd2713d3c880acddef9e21918
Author: Nilesh Gajwani <[email protected]>
AuthorDate: Fri Jun 30 01:16:09 2023 -0700

    [SEDONA-314] Support optimized join on ST_HausdorffDistance (#878)
---
 docs/api/sql/Optimizer.md                          | 18 +++++++++--
 .../sql/sedona_sql/expressions/Functions.scala     |  3 +-
 .../strategy/join/JoinQueryDetector.scala          | 26 +++++++++++++++
 .../sedona/sql/BroadcastIndexJoinSuite.scala       | 37 ++++++++++++++++++++++
 .../org/apache/sedona/sql/TestBaseScala.scala      | 22 +++++++++++++
 .../apache/sedona/sql/predicateJoinTestScala.scala | 34 ++++++++++++++++++++
 6 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/docs/api/sql/Optimizer.md b/docs/api/sql/Optimizer.md
index 3fa0242b..1fe54ef7 100644
--- a/docs/api/sql/Optimizer.md
+++ b/docs/api/sql/Optimizer.md
@@ -46,7 +46,7 @@ RangeJoin polygonshape#20: geometry, pointshape#43: geometry, 
false
 
 ## Distance join
 
-Introduction: Find geometries from A and geometries from B such that the 
distance of each geometry pair is less or equal than a certain distance. It 
supports the planar Euclidean distance calculator `ST_Distance` and the 
meter-based geodesic distance calculators `ST_DistanceSpheroid` and 
`ST_DistanceSphere`.
+Introduction: Find geometries from A and geometries from B such that the 
distance of each geometry pair is less or equal than a certain distance. It 
supports the planar Euclidean distance calculators `ST_Distance` and 
`ST_HausdorffDistance` and the meter-based geodesic distance calculators 
`ST_DistanceSpheroid` and `ST_DistanceSphere`.
 
 Spark SQL Example for planar Euclidean distance:
 
@@ -57,6 +57,12 @@ FROM pointdf1, pointdf2
 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) < 2
 ```
 
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 0.3) < 2
+```
+
 *Consider ==intersects within a certain distance==*
 ```sql
 SELECT *
@@ -64,6 +70,12 @@ FROM pointdf1, pointdf2
 WHERE ST_Distance(pointdf1.pointshape1,pointdf2.pointshape2) <= 2
 ```
 
+```sql
+SELECT *
+FROM pointDf, polygonDF
+WHERE ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 2
+```
+
 Spark SQL Physical plan:
 ```
 == Physical Plan ==
@@ -75,7 +87,7 @@ DistanceJoin pointshape1#12: geometry, pointshape2#33: 
geometry, 2.0, true
 ```
 
 !!!warning
-       If you use `ST_Distance` as the predicate, Sedona doesn't control the 
distance's unit (degree or meter). It is same with the geometry. If your 
coordinates are in the longitude and latitude system, the unit of `distance` 
should be degree instead of meter or mile. To change the geometry's unit, 
please either transform the coordinate reference system to a meter-based 
system. See [ST_Transform](Function.md#st_transform). If you don't want to 
transform your data, please consider using `ST_Di [...]
+       If you use planar euclidean distance functions like `ST_Distance` or 
`ST_HausdorffDistance` as the predicate, Sedona doesn't control the distance's 
unit (degree or meter). It is same with the geometry. If your coordinates are 
in the longitude and latitude system, the unit of `distance` should be degree 
instead of meter or mile. To change the geometry's unit, please either 
transform the coordinate reference system to a meter-based system. See 
[ST_Transform](Function.md#st_transform). If  [...]
 
 Spark SQL Example for meter-based geodesic distance `ST_DistanceSpheroid` 
(works for `ST_DistanceSphere` too):
 
@@ -126,7 +138,7 @@ BroadcastIndexJoin pointshape#52: geometry, BuildRight, 
BuildRight, false ST_Con
       +- FileScan csv
 ```
 
-This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid` 
or `ST_DistanceSphere`:
+This also works for distance joins with `ST_Distance`, `ST_DistanceSpheroid`, 
`ST_DistanceSphere` or `ST_HausdorffDistance`:
 
 ```scala
 pointDf1.alias("pointDf1").join(broadcast(pointDf2).alias("pointDf2"), 
expr("ST_Distance(pointDf1.pointshape, pointDf2.pointshape) <= 2"))
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index c661fc92..df2d6155 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -1036,7 +1036,7 @@ case class ST_BoundingDiagonal(inputExpressions: 
Seq[Expression])
 }
 
 case class ST_HausdorffDistance(inputExpressions: Seq[Expression])
-  extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance)) 
{
+  extends InferredExpression(inferrableFunction3(Functions.hausdorffDistance), 
inferrableFunction2(Functions.hausdorffDistance)) {
   protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = 
{
     copy(inputExpressions = newChildren)
   }
@@ -1062,4 +1062,3 @@ case class ST_Degrees(inputExpressions: Seq[Expression])
     copy(inputExpressions = newChildren)
   }
 }
-
diff --git 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
index 8a8f411b..d6d4a2bc 100644
--- 
a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
+++ 
b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala
@@ -149,6 +149,32 @@ class JoinQueryDetector(sparkSession: SparkSession) 
extends Strategy {
           Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
         case Some(And(_, LessThan(ST_DistanceSpheroid(Seq(leftShape, 
rightShape)), distance))) =>
           Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, true, condition, Some(distance)))
+        //ST_HausdorffDistanceDefault
+        case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape)), distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape)), distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape)), 
distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, 
rightShape)), distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, 
rightShape)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        //ST_HausdorffDistanceDensityFrac
+        case Some(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape, densityFrac)), distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape, densityFrac)), distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThanOrEqual(ST_HausdorffDistance(Seq(leftShape, 
rightShape, densityFrac)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, 
densityFrac)), distance)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(LessThan(ST_HausdorffDistance(Seq(leftShape, rightShape, 
densityFrac)), distance), _)) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
+        case Some(And(_, LessThan(ST_HausdorffDistance(Seq(leftShape, 
rightShape, densityFrac)), distance))) =>
+          Some(JoinQueryDetection(left, right, leftShape, rightShape, 
SpatialPredicate.INTERSECTS, false, condition, Some(distance)))
         case _ =>
           None
       }
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
index 6f569128..d2232278 100644
--- 
a/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
+++ 
b/sql/common/src/test/scala/org/apache/sedona/sql/BroadcastIndexJoinSuite.scala
@@ -350,6 +350,43 @@ class BroadcastIndexJoinSuite extends TestBaseScala {
         assert(distanceJoinDf.count() == expected)
       })
     }
+
+    it("Passed ST_HausdorffDistance with densityFrac <= distance in a 
broadcast join") {
+      val sampleCount = 100
+      val distance = 1.0
+      val densityFrac = 0.5
+      val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+      val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+      val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 
0.5, true)
+
+      var distanceJoinDF = pointDf.alias("pointDf").join(
+      broadcast(polygonDf).alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 
$densityFrac) <= $distance"))
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect{case p: 
BroadcastIndexJoinExec => p}.size == 1)
+      assert(distanceJoinDF.count() == expected)
+
+      distanceJoinDF = 
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), 
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape, 
$densityFrac) <= $distance"))
+
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
+      assert(distanceJoinDF.count() == expected)
+    }
+
+    it("Passed ST_HausdorffDistance <= distance in a broadcast join") {
+      val sampleCount = 200
+      val distance = 2.0
+      val polygonDf = buildPolygonDf.limit(sampleCount).repartition(3)
+      val pointDf = buildPointDf.limit(sampleCount).repartition(5)
+      val expected = bruteForceDistanceJoinHausdorff(sampleCount, distance, 0, 
true)
+
+      var distanceJoinDF = pointDf.alias("pointDf").join(
+        broadcast(polygonDf).alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 
$distance"))
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
+      assert(distanceJoinDF.count() == expected)
+
+      distanceJoinDF = 
broadcast(pointDf).alias("pointDf").join(polygonDf.alias("polygonDf"), 
expr(s"ST_HausdorffDistance(pointDf.pointshape, polygonDf.polygonshape) <= 
$distance"))
+
+      assert(distanceJoinDF.queryExecution.sparkPlan.collect { case p: 
BroadcastIndexJoinExec => p }.size == 1)
+      assert(distanceJoinDF.count() == expected)
+    }
   }
 
   describe("Sedona-SQL Broadcast Index Join Test for left semi joins") {
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 21f31a3b..dc6936dd 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -21,6 +21,7 @@ package org.apache.sedona.sql
 import com.google.common.math.DoubleMath
 import org.apache.log4j.{Level, Logger}
 import org.apache.sedona.common.sphere.{Haversine, Spheroid}
+import org.apache.sedona.common.Functions.hausdorffDistance
 import org.apache.sedona.spark.SedonaContext
 import org.apache.spark.sql.DataFrame
 import org.locationtech.jts.geom.{CoordinateSequence, 
CoordinateSequenceComparator}
@@ -117,4 +118,25 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll 
{
     }).sum
   }
 
+  protected def bruteForceDistanceJoinHausdorff(sampleCount: Int, distance: 
Double, densityFrac: Double, intersects: Boolean): Int = {
+    val inputPolygon = buildPolygonDf.limit(sampleCount).collect()
+    val inputPoint = buildPointDf.limit(sampleCount).collect()
+    inputPoint.map(row => {
+      val point = row.getAs[org.locationtech.jts.geom.Point](0)
+      inputPolygon.map(row => {
+        val polygon = row.getAs[org.locationtech.jts.geom.Polygon](0)
+        if (densityFrac == 0) {
+          if (intersects)
+            if (hausdorffDistance(point, polygon) <= distance) 1 else 0
+          else
+            if (hausdorffDistance(point, polygon) < distance) 1 else 0
+        } else {
+          if (intersects)
+            if (hausdorffDistance(point, polygon, densityFrac) <= distance) 1 
else 0
+          else
+            if (hausdorffDistance(point, polygon, densityFrac) < distance) 1 
else 0
+        }
+      }).sum
+    }).sum
+  }
 }
diff --git 
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala 
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
index 66ee94ca..efa7280b 100644
--- 
a/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
+++ 
b/sql/common/src/test/scala/org/apache/sedona/sql/predicateJoinTestScala.scala
@@ -403,7 +403,41 @@ class predicateJoinTestScala extends TestBaseScala {
         assert(distanceJoinDf.queryExecution.sparkPlan.collect { case p: 
DistanceJoinExec => p }.size === 1)
         assert(distanceJoinDf.count() == expected)
       })
+    }
+
+    it("Passed ST_HausdorffDistance in a spatial join") {
+      val sampleCount = 100
+      val distanceCandidates = Seq(1, 2, 5, 10)
+      val densityFrac = 0.6
+      val inputPoint = buildPointDf.limit(sampleCount).repartition(5)
+      val inputPolygon = buildPolygonDf.limit(sampleCount).repartition(3)
 
+      distanceCandidates.foreach(distance => {
+
+        //DensityFrac specified, <= distance
+        val expectedDensityIntersects = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, true)
+        val distanceDensityIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) <= $distance"))
+        assert(distanceDensityIntersectsDF.queryExecution.sparkPlan.collect { 
case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDensityIntersectsDF.count() == 
expectedDensityIntersects)
+
+        //DensityFrac specified, < distance
+        val expectedDensityNoIntersect = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, densityFrac, false)
+        val distanceDensityNoIntersectDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) < $distance"))
+        assert(distanceDensityNoIntersectDF.queryExecution.sparkPlan.collect { 
case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDensityNoIntersectDF.count() == 
expectedDensityNoIntersect)
+
+        //DensityFrac not specified, <= distance
+        val expectedDefaultIntersects = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, true)
+        val distanceDefaultIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) <= $distance"))
+        assert(distanceDefaultIntersectsDF.queryExecution.sparkPlan.collect { 
case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDefaultIntersectsDF.count() == 
expectedDefaultIntersects)
+
+        //DensityFrac not specified, < distance
+        val expectedDefaultNoIntersects = 
bruteForceDistanceJoinHausdorff(sampleCount, distance, 0.0, false)
+        val distanceDefaultNoIntersectsDF = 
inputPoint.alias("pointDF").join(inputPolygon.alias("polygonDF"), 
expr(s"ST_HausdorffDistance(pointDF.pointshape, polygonDF.polygonshape, 
$densityFrac) < $distance"))
+        assert(distanceDefaultNoIntersectsDF.queryExecution.sparkPlan.collect 
{ case p: DistanceJoinExec => p }.size === 1)
+        assert(distanceDefaultIntersectsDF.count() == 
expectedDefaultNoIntersects)
+      })
     }
   }
 }

Reply via email to