Hi Trang, The slow join performance issue is mostly caused by too few partitions.
The most straightforward way to increase the number of partitions is: repartition both input DataFrame right after you load them from disk. e.g., var tripDf = spark.read(XXX) tripDf = tripDf.repartition(tripDf.numPartitions * 5) var poiDf = spark.read(XXX) poiDf = poiDf.repartition(poiDf.numPartitions * 5) Then perform the SQL spatial join ==== If you want to use RDD API, Please read [1] and [2] [1] https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd [2] https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe Thanks, Jia On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <[email protected]> wrote: > Hi, > > I'm newbie to Sedona and am running into performance issues using the > dataframe api to handle spatial joins with st_intersects. From the > execution plan, I see that this is largely due to too few partitions > getting used for the spatial join. > > I've tried to increase the partition size by setting: > sparkSession.conf.set("spark.sql.shuffle.partitions", > appConf.getInputPartitions.toString) > sparkSession.conf.set("geospark.join.numpartition", > appConf.getInputPartitions.toString) > sparkSession.conf.set("spark.default.parallelism", > appConf.getInputPartitions.toString) > > The setting changse seem to make minimal difference. > > I'm trying now to use convert the dataframes to be joined into spatialRDDs > so that I can set the number of partitions for the spatial join. > However, I am running into a different issue when I try to convert back > from the spatial joined result into a dataframe because the extra > attributes from the original dataframes are not getting propagated through > the join. > > I am using the Adapter class for the conversion. > > > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) > val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> { > f.getUserData.toString > }) > tripRDD.analyze() > > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns) > poiRDD.analyze() > > > > tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions) > > > > > > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex, > spatialPredicate) > > val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession) > > df.show > > > > > > > > How can I get the original attributes to be propagated as part of the > join? I searched the documentation but couldn't find any documentation on > this. > By specifying the columns to be carried through in the > Adapter.toSpatialRdd, I assume that the attributes would be carried through > into the join as well. > > Here is the error I am seeing: > va.lang.RuntimeException: Error while encoding: > java.lang.RuntimeException: java.lang.String is not a valid external type > for schema of geometry > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS > leftgeometry#2424 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, > StringType, fromString, > validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, > false, true) AS leftgeometry#2425 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, > StringType, fromString, > validateexternaltype(getexternalrowfield(assertnotnull(input[0, > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false, > true) AS trip_id#2426 > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null > else newInstance(class > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS > rightgeometry#2427 > at > org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193) > > Thanks, > Trang >
