jiayuasu commented on code in PR #2946:
URL: https://github.com/apache/sedona/pull/2946#discussion_r3216723732
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
}
override def simpleString: String = s"$columnName ${predicateType.name}
$queryWindow"
}
+
+ /**
+ * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+ * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col,
lit_box)`).
+ *
+ * Per-file evaluation: walks the file's GeoParquet column metadata to find
the geometry column
+ * whose covering metadata points at `box2dColumnName`, then prunes using
that geometry column's
+ * recorded bbox.
+ *
+ * Both intersects and contains map to a file-level INTERSECTS check:
per-row containment
+ * implies per-row intersection, which implies the file's union envelope
must intersect the
+ * query box for any row to match. If no geometry column references this
Box2D column as its
+ * covering, the file is kept (cannot prune safely).
+ *
+ * @param box2dColumnName
+ * the Box2D column referenced by the predicate
+ * @param queryBox
+ * the literal Box2D from the predicate's RHS
+ */
+ case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+ extends GeoParquetSpatialFilter {
+
+ override def evaluate(columns: Map[String, GeometryFieldMetaData]):
Boolean = {
+ // Find the geometry column whose covering metadata points at this Box2D
column.
+ val matchingGeomEntry = columns.find { case (_, field) =>
+ field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+ }
+
+ matchingGeomEntry match {
+ case Some((_, field)) =>
+ // Use the geometry column's recorded bbox to prune. The union of
per-row Box2D values
+ // is a superset of the geometry column's bbox (covering boxes are
at least as wide as
+ // their geometries), so if the geom-column bbox does not intersect
the query box, no
+ // row's Box2D can intersect either. May leave some files unpruned
when Box2D values
+ // are conservatively wider than geometries, but never produces
false negatives.
Review Comment:
Valid concern — I went through it again and you are right: when per-row
Box2D > per-row geom envelope (which the spec permits), the union of Box2D
values is a superset of the file geom bbox, and using the geom bbox to prune
can produce false negatives. Addressed in 23829ab4 by:
1. Documenting the assumption explicitly in the Box2DLeafFilter scaladoc —
pushdown is sound when per-row Box2D = per-row geom envelope (true for
ST_Box2D(geom)-derived columns, including the auto-generated `<geom>_bbox`
path).
2. Adding an opt-out conf `spark.sedona.geoparquet.box2dFilterPushDown`
(default on). Users with non-minimal covering columns can disable it.
3. Filed #2949 to track the proper Parquet-column-statistics-based fix that
uses the Box2D columns own xmin/ymin/xmax/ymax stats. Once that lands the
opt-out can default off or be removed.
Sedonas own writer (#2886) produces exact envelopes via ST_Box2D(geom), so
the common path is sound; the conservative-wider case mostly arises with files
written by sedona-dbs Float32 writer where users would want this disabled until
#2949.
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
}
override def simpleString: String = s"$columnName ${predicateType.name}
$queryWindow"
}
+
+ /**
+ * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+ * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col,
lit_box)`).
+ *
+ * Per-file evaluation: walks the file's GeoParquet column metadata to find
the geometry column
+ * whose covering metadata points at `box2dColumnName`, then prunes using
that geometry column's
+ * recorded bbox.
+ *
+ * Both intersects and contains map to a file-level INTERSECTS check:
per-row containment
+ * implies per-row intersection, which implies the file's union envelope
must intersect the
+ * query box for any row to match. If no geometry column references this
Box2D column as its
+ * covering, the file is kept (cannot prune safely).
+ *
+ * @param box2dColumnName
+ * the Box2D column referenced by the predicate
+ * @param queryBox
+ * the literal Box2D from the predicate's RHS
+ */
+ case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+ extends GeoParquetSpatialFilter {
+
+ override def evaluate(columns: Map[String, GeometryFieldMetaData]):
Boolean = {
+ // Find the geometry column whose covering metadata points at this Box2D
column.
+ val matchingGeomEntry = columns.find { case (_, field) =>
+ field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+ }
+
+ matchingGeomEntry match {
+ case Some((_, field)) =>
+ // Use the geometry column's recorded bbox to prune. The union of
per-row Box2D values
+ // is a superset of the geometry column's bbox (covering boxes are
at least as wide as
+ // their geometries), so if the geom-column bbox does not intersect
the query box, no
+ // row's Box2D can intersect either. May leave some files unpruned
when Box2D values
+ // are conservatively wider than geometries, but never produces
false negatives.
+ val bbox = field.bbox.getOrElse(return true)
+ if (bbox.isEmpty) return true
+ val fileXMin = bbox(0)
+ val fileYMin = bbox(1)
+ val fileXMax = bbox(2)
+ val fileYMax = bbox(3)
+ !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax
+ || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax)
+ case None =>
+ // No geometry column references this Box2D column as covering —
cannot prune safely.
+ true
Review Comment:
Fixed in 23829ab4 — `find` replaced with a `collect`+`Seq(field)` pattern
that requires exactly one match. Multiple matches now fall back to keep-file
rather than picking an arbitrary one.
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala:
##########
@@ -42,10 +43,12 @@ import
org.apache.spark.sql.execution.datasources.PushableColumnBase
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormatBase
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.AndFilter
+import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DLeafFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter
-import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_Buffer,
ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance,
ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects,
ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sedona_sql.UDT.{Box2DUDT, GeometryUDT}
Review Comment:
Removed in 23829ab4.
##########
spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala:
##########
@@ -317,6 +318,73 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
assert(getPushedDownSpatialFilter(dfFiltered).isEmpty)
}
}
+
+ it("Push down ST_BoxIntersects against a Box2D covering column") {
+ val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+ try {
+ // Q1 region only (region 1, center +10/+10)
+ val q1Filter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0),
ST_Point(15.0, 15.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1))
+
+ // Window covering Q2 and Q4 (negative X) — should preserve regions 0
and 2
+ val leftHalfFilter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0),
ST_Point(-1.0, 20.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2))
+
+ // Disjoint window prunes everything
+ val disjointFilter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0),
ST_Point(200.0, 200.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty)
+ } finally {
+ FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+ }
+ }
+
+ it("Push down ST_BoxContains against a Box2D covering column") {
+ val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+ try {
+ // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the
file level. A tiny
+ // query box inside Q1 prunes everything except region 1.
+ val containsFilter =
+ "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0),
ST_Point(10.0, 10.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1))
+ } finally {
+ FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+ }
+ }
+ }
+
+ private def setupBox2DCoveringFixture()
+ : (DataFrame, String, Map[Int, Seq[GeoParquetMetaData]]) = {
+ val box2dParent =
+
Files.createTempDirectory("sedona_geoparquet_box2d_").toFile.getAbsolutePath
+ val box2dDir = box2dParent + "/data"
+ val withBox = df.withColumn("geom_bbox", expr("ST_Box2D(geom)"))
+
withBox.coalesce(1).write.partitionBy("region").format("geoparquet").save(box2dDir)
+ val box2dDf = sparkSession.read.format("geoparquet").load(box2dDir)
+ val box2dMetaMap = readGeoParquetMetaDataMap(box2dDir)
+ (box2dDf, box2dDir, box2dMetaMap)
+ }
+
+ private def verifyBox2DFilter(
+ box2dDf: DataFrame,
+ box2dMetaMap: Map[Int, Seq[GeoParquetMetaData]],
+ condition: String,
+ expectedPreservedRegions: Seq[Int]): Unit = {
+ val dfFiltered = box2dDf.where(condition)
+ val pushed = getPushedDownSpatialFilter(dfFiltered)
+ assert(pushed.isDefined, s"Expected filter push-down for: $condition")
+ val preserved = box2dMetaMap
+ .filter { case (_, metaDataList) =>
+ metaDataList.exists(metadata => pushed.get.evaluate(metadata.columns))
+ }
Review Comment:
Added in 23829ab4. `verifyBox2DFilter` now computes the expected result with
`spark.sedona.geoparquet.box2dFilterPushDown=false` and compares against the
pushed-down result, so any over-pruning regression would surface as a test
failure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]