Hi,
I'm newbie to Sedona and am running into performance issues using the dataframe
api to handle spatial joins with st_intersects. From the execution plan, I see
that this is largely due to too few partitions getting used for the spatial
join.
I've tried to increase the partition size by setting:
sparkSession.conf.set("spark.sql.shuffle.partitions",
appConf.getInputPartitions.toString)
sparkSession.conf.set("geospark.join.numpartition",
appConf.getInputPartitions.toString)
sparkSession.conf.set("spark.default.parallelism",
appConf.getInputPartitions.toString)
The setting changse seem to make minimal difference.
I'm trying now to use convert the dataframes to be joined into spatialRDDs so
that I can set the number of partitions for the spatial join.
However, I am running into a different issue when I try to convert back from
the spatial joined result into a dataframe because the extra attributes from
the original dataframes are not getting propagated through the join.
I am using the Adapter class for the conversion.
val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
f.getUserData.toString
})
tripRDD.analyze()
val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
poiRDD.analyze()
tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)
val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex,
spatialPredicate)
val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
df.show
How can I get the original attributes to be propagated as part of the join? I
searched the documentation but couldn't find any documentation on this.
By specifying the columns to be carried through in the Adapter.toSpatialRdd, I
assume that the attributes would be carried through into the join as well.
Here is the error I am seeing:
va.lang.RuntimeException: Error while encoding: java.lang.RuntimeException:
java.lang.String is not a valid external type for schema of geometry
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
leftgeometry#2424
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, false,
true) AS leftgeometry#2425
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false, true)
AS trip_id#2426
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else
newInstance(class org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
rightgeometry#2427
at
org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
Thanks,
Trang