[ 
https://issues.apache.org/jira/browse/SPARK-35787?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17366632#comment-17366632
 ] 

Vidmantas Drasutis commented on SPARK-35787:
--------------------------------------------

More details about our case -  Scala code.
We loaded GEOjson file with polygons.
In data we have  
GEOHash|[https://en.wikipedia.org/wiki/Geohash#:~:text=Geohash%20is%20a%20public%20domain,string%20of%20letters%20and%20digits.&text=Geohashing%20guarantees%20that%20the%20longer,spatially%20closer%20they%20are%20together],]
 From those GEOhash`es we get central point of geohash and checking if that 
point is within loaded polygons using  "org.locationtech.jts.geom" library.




 
{code:java}
private def doWork(context: QueryContext, before: Stage.State, root: 
Node[PolygonDefinition], hitCacheEnabled : Boolean): Stage.State = {

  val getPolygons = udf((geohash: String) => {
    PolygonHitTest.getPolygonsForGeoHashFromHierarchy(geohash, root, 
hitCacheEnabled)
  })

  val result =
    for (input <- before.df) yield {
      input
        .withColumn(polygons, getPolygons(col(geohash)))
        .withColumn(polygon, explode(col(polygons)))
        .drop(geohash, polygons)
    }

  SparkDebug.show(result, "Polygon mapping")
  before.copy(df = result)
}

def getPolygonsForGeoHashFromHierarchy(geoHash: String, root: 
Node[PolygonDefinition], hitCacheEnabled: Boolean = false): Seq[String] = {
  val latLong = GeoHash.decodeHash(geoHash)
  val point = geometryFactory.createPoint(new Coordinate(latLong.getLon, 
latLong.getLat))
  root.data.id :: getPolygonsForPointFromHierarchyV2(point, 
root.children.toList, hitCacheEnabled)
}

private def getPolygonsForPointFromHierarchyV2(point: Point, hierarchy: 
List[Node[PolygonDefinition]], hitCacheEnabled: Boolean): List[String] = {
  for (node <- hierarchy) {
    if (node.data.isPointWithinBoundingBox(point)) {
      val hits = getPolygonsForPointFromHierarchyV2(point, 
node.children.toList, hitCacheEnabled)
      if (hits.isEmpty) {
        if (node.data.isPointWithinPolygon(point, hitCacheEnabled)) {
          return List(node.data.id)
        }
      } else {
        return node.data.id :: hits
      }
    }
  }
  return Nil
}



private[this] lazy val hitCache: mutable.Set[Point] = 
java.util.concurrent.ConcurrentHashMap.newKeySet[Point]().asScala

private[this] lazy val missCache: mutable.Set[Point] = 
java.util.concurrent.ConcurrentHashMap.newKeySet[Point]().asScala

def isPointWithinPolygon(point: Point, hitCacheEnabled: Boolean): Boolean = {
  if (hitCacheEnabled) {
    if (hitCache.contains(point)) {
      true
    } else if (missCache.contains(point)) {
      false
    } else {
      val hit = point.within(geometry)
      if (hit) {
        hitCache.add(point)
      } else {
        missCache.add(point)
      }
      hit
    }
  } else {
    point.within(geometry)
  }
}

{code}

*Note:* we have option to enable some cashing (hitCacheEnabled) we had always 
turned off as processing requires more memory and this caching not always gives 
any benefit. But when I enabled this polygoHitTest cashing - the query 
performance of new and old Spark was/is same - fast.

But... still we have other product parts where we do not have the way what to 
tweak and seeing slowdown.

 

> Does anyone has performance issue after upgrade from 3.0 to 3.1?
> ----------------------------------------------------------------
>
>                 Key: SPARK-35787
>                 URL: https://issues.apache.org/jira/browse/SPARK-35787
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 3.1.2
>            Reporter: Vidmantas Drasutis
>            Priority: Major
>         Attachments: Execution_plan_difference.png, 
> spark_3.0_execution_plan_details_fast.txt, 
> spark_3.1_execution_plan_details_slow.txt, spark_job_info_1.png, 
> spark_job_info_2.png
>
>
> Hello.
>  
> We had using spark 3.0.2 and query was executed in ~100 seconds.
> After we upgraded Spark to 3.1.1 (tried also 3.1.2 - same, slow performance) 
> - our query execution time started taking ~260 seconds it is huge increase 
> 250-300 % of execution time increase.
>  
> We tried quite simple query.
> In query we using UDF (*org.apache.spark.sql.functions*)
> ) - which explodes data and do polygon hit test. Nothing changed in our code 
> from query perspective.
>  It is 1 VM box cluster
>  
> Maybe anyone faced similar issue?
> Attached some details from spark dashboard.
>  
> *Looks like it is UDF related slowdown. As queries which does not use UDF`s 
> performance is same and which uses UDFs - starting from 3.1 performance 
> decreased.*
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to