Hi Trang,

The slow join performance issue is mostly caused by too few partitions.

The most straightforward way to increase the number of partitions is:
repartition both input DataFrame right after you load them from disk.

e.g.,

var tripDf = spark.read(XXX)
tripDf = tripDf.repartition(tripDf.numPartitions * 5)

var poiDf = spark.read(XXX)
poiDf = poiDf.repartition(poiDf.numPartitions * 5)

Then perform the SQL spatial join
====

If you want to use RDD API,

Please read [1] and [2]

[1]
https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd
[2]
https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe

Thanks,
Jia



On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <[email protected]> wrote:

> Hi,
>
> I'm newbie to Sedona and am running into performance issues using the
> dataframe api to handle spatial joins with st_intersects.  From the
> execution plan, I see that this is largely due to  too few partitions
> getting used for the spatial join.
>
> I've tried to increase the partition size by setting:
> sparkSession.conf.set("spark.sql.shuffle.partitions",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("geospark.join.numpartition",
> appConf.getInputPartitions.toString)
> sparkSession.conf.set("spark.default.parallelism",
> appConf.getInputPartitions.toString)
>
> The setting changse seem to make minimal difference.
>
> I'm trying now to use convert the dataframes to be joined into spatialRDDs
> so that I can set the number of partitions for the spatial join.
> However, I am running into a different issue when I try to convert back
> from the spatial joined result into a dataframe because the extra
> attributes from the original dataframes are not getting propagated through
> the join.
>
> I am using the Adapter class for the conversion.
>
>
> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
> val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>   f.getUserData.toString
> })
> tripRDD.analyze()
>
> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
> poiRDD.analyze()
>
>
>
> tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)
>
>
>
>
>
> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex,
> spatialPredicate)
>
> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
>
> df.show
>
>
>
>
>
>
>
> How can I get the original attributes to be propagated as part of the
> join? I searched the documentation but couldn't find any documentation on
> this.
> By specifying the columns to be carried through in the
> Adapter.toSpatialRdd, I assume that the attributes would be carried through
> into the join as well.
>
> Here is the error I am seeing:
> va.lang.RuntimeException: Error while encoding:
> java.lang.RuntimeException: java.lang.String is not a valid external type
> for schema of geometry
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else newInstance(class
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> leftgeometry#2424
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
> false, true) AS leftgeometry#2425
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> StringType, fromString,
> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false,
> true) AS trip_id#2426
> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null
> else newInstance(class
> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> rightgeometry#2427
>                at
> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
>                at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
>
> Thanks,
> Trang
>

Reply via email to