[
https://issues.apache.org/jira/browse/SPARK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366632#comment-17366632
]
Vidmantas Drasutis commented on SPARK-35787:
--------------------------------------------
More details about our case - Scala code.
We loaded GEOjson file with polygons.
In data we have
GEOHash|[https://en.wikipedia.org/wiki/Geohash#:~:text=Geohash%20is%20a%20public%20domain,string%20of%20letters%20and%20digits.&text=Geohashing%20guarantees%20that%20the%20longer,spatially%20closer%20they%20are%20together],]
From those GEOhash`es we get central point of geohash and checking if that
point is within loaded polygons using "org.locationtech.jts.geom" library.
{code:java}
private def doWork(context: QueryContext, before: Stage.State, root:
Node[PolygonDefinition], hitCacheEnabled : Boolean): Stage.State = {
val getPolygons = udf((geohash: String) => {
PolygonHitTest.getPolygonsForGeoHashFromHierarchy(geohash, root,
hitCacheEnabled)
})
val result =
for (input <- before.df) yield {
input
.withColumn(polygons, getPolygons(col(geohash)))
.withColumn(polygon, explode(col(polygons)))
.drop(geohash, polygons)
}
SparkDebug.show(result, "Polygon mapping")
before.copy(df = result)
}
def getPolygonsForGeoHashFromHierarchy(geoHash: String, root:
Node[PolygonDefinition], hitCacheEnabled: Boolean = false): Seq[String] = {
val latLong = GeoHash.decodeHash(geoHash)
val point = geometryFactory.createPoint(new Coordinate(latLong.getLon,
latLong.getLat))
root.data.id :: getPolygonsForPointFromHierarchyV2(point,
root.children.toList, hitCacheEnabled)
}
private def getPolygonsForPointFromHierarchyV2(point: Point, hierarchy:
List[Node[PolygonDefinition]], hitCacheEnabled: Boolean): List[String] = {
for (node <- hierarchy) {
if (node.data.isPointWithinBoundingBox(point)) {
val hits = getPolygonsForPointFromHierarchyV2(point,
node.children.toList, hitCacheEnabled)
if (hits.isEmpty) {
if (node.data.isPointWithinPolygon(point, hitCacheEnabled)) {
return List(node.data.id)
}
} else {
return node.data.id :: hits
}
}
}
return Nil
}
private[this] lazy val hitCache: mutable.Set[Point] =
java.util.concurrent.ConcurrentHashMap.newKeySet[Point]().asScala
private[this] lazy val missCache: mutable.Set[Point] =
java.util.concurrent.ConcurrentHashMap.newKeySet[Point]().asScala
def isPointWithinPolygon(point: Point, hitCacheEnabled: Boolean): Boolean = {
if (hitCacheEnabled) {
if (hitCache.contains(point)) {
true
} else if (missCache.contains(point)) {
false
} else {
val hit = point.within(geometry)
if (hit) {
hitCache.add(point)
} else {
missCache.add(point)
}
hit
}
} else {
point.within(geometry)
}
}
{code}
*Note:* we have option to enable some cashing (hitCacheEnabled) we had always
turned off as processing requires more memory and this caching not always gives
any benefit. But when I enabled this polygoHitTest cashing - the query
performance of new and old Spark was/is same - fast.
But... still we have other product parts where we do not have the way what to
tweak and seeing slowdown.
> Does anyone has performance issue after upgrade from 3.0 to 3.1?
> ----------------------------------------------------------------
>
> Key: SPARK-35787
> URL: https://issues.apache.org/jira/browse/SPARK-35787
> Project: Spark
> Issue Type: Question
> Components: Spark Core
> Affects Versions: 3.1.2
> Reporter: Vidmantas Drasutis
> Priority: Major
> Attachments: Execution_plan_difference.png,
> spark_3.0_execution_plan_details_fast.txt,
> spark_3.1_execution_plan_details_slow.txt, spark_job_info_1.png,
> spark_job_info_2.png
>
>
> Hello.
>
> We had using spark 3.0.2 and query was executed in ~100 seconds.
> After we upgraded Spark to 3.1.1 (tried also 3.1.2 - same, slow performance)
> - our query execution time started taking ~260 seconds it is huge increase
> 250-300 % of execution time increase.
>
> We tried quite simple query.
> In query we using UDF (*org.apache.spark.sql.functions*)
> ) - which explodes data and do polygon hit test. Nothing changed in our code
> from query perspective.
> It is 1 VM box cluster
>
> Maybe anyone faced similar issue?
> Attached some details from spark dashboard.
>
> *Looks like it is UDF related slowdown. As queries which does not use UDF`s
> performance is same and which uses UDFs - starting from 3.1 performance
> decreased.*
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]