Hi,

I'm newbie to Sedona and am running into performance issues using the dataframe 
api to handle spatial joins with st_intersects.  From the execution plan, I see 
that this is largely due to  too few partitions getting used for the spatial 
join.

I've tried to increase the partition size by setting:
sparkSession.conf.set("spark.sql.shuffle.partitions", 
appConf.getInputPartitions.toString)
sparkSession.conf.set("geospark.join.numpartition", 
appConf.getInputPartitions.toString)
sparkSession.conf.set("spark.default.parallelism", 
appConf.getInputPartitions.toString)

The setting changse seem to make minimal difference.

I'm trying now to use convert the dataframes to be joined into spatialRDDs so 
that I can set the number of partitions for the spatial join.
However, I am running into a different issue when I try to convert back from 
the spatial joined result into a dataframe because the extra attributes from 
the original dataframes are not getting propagated through the join.

I am using the Adapter class for the conversion.


val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
  f.getUserData.toString
})
tripRDD.analyze()

val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
poiRDD.analyze()



tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)





val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex, 
spatialPredicate)

val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)

df.show







How can I get the original attributes to be propagated as part of the join? I 
searched the documentation but couldn't find any documentation on this.
By specifying the columns to be carried through in the Adapter.toSpatialRdd, I 
assume that the attributes would be carried through into the join as well.

Here is the error I am seeing:
va.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
java.lang.String is not a valid external type for schema of geometry
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else 
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS 
leftgeometry#2424
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, false, 
true) AS leftgeometry#2425
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false, true) 
AS trip_id#2426
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else 
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS 
rightgeometry#2427
               at 
org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
               at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
               at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)

Thanks,
Trang

Reply via email to