On Sat, Jan 7, 2023 at 11:26 PM Jia Yu <[email protected]> wrote:
> Hi Trang, > > The slow join performance issue is mostly caused by too few partitions. > > The most straightforward way to increase the number of partitions is: > repartition both input DataFrame right after you load them from disk. > > e.g., > > var tripDf = spark.read(XXX) > tripDf = tripDf.repartition(tripDf.numPartitions * 5) > > var poiDf = spark.read(XXX) > poiDf = poiDf.repartition(poiDf.numPartitions * 5) > > Then perform the SQL spatial join > ==== > > If you want to use RDD API, > > Please read [1] and [2] > > [1] > https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd > [2] > https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe > > Thanks, > Jia > > > > On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <[email protected]> > wrote: > >> Hi, >> >> I'm newbie to Sedona and am running into performance issues using the >> dataframe api to handle spatial joins with st_intersects. From the >> execution plan, I see that this is largely due to too few partitions >> getting used for the spatial join. >> >> I've tried to increase the partition size by setting: >> sparkSession.conf.set("spark.sql.shuffle.partitions", >> appConf.getInputPartitions.toString) >> sparkSession.conf.set("geospark.join.numpartition", >> appConf.getInputPartitions.toString) >> sparkSession.conf.set("spark.default.parallelism", >> appConf.getInputPartitions.toString) >> >> The setting changse seem to make minimal difference. >> >> I'm trying now to use convert the dataframes to be joined into >> spatialRDDs so that I can set the number of partitions for the spatial join. >> However, I am running into a different issue when I try to convert back >> from the spatial joined result into a dataframe because the extra >> attributes from the original dataframes are not getting propagated through >> the join. >> >> I am using the Adapter class for the conversion. >> >> >> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) >> val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> { >> f.getUserData.toString >> }) >> tripRDD.analyze() >> >> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns) >> poiRDD.analyze() >> >> >> >> tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions) >> >> >> >> >> >> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex, >> spatialPredicate) >> >> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession) >> >> df.show >> >> >> >> >> >> >> >> How can I get the original attributes to be propagated as part of the >> join? I searched the documentation but couldn't find any documentation on >> this. >> By specifying the columns to be carried through in the >> Adapter.toSpatialRdd, I assume that the attributes would be carried through >> into the join as well. >> >> Here is the error I am seeing: >> va.lang.RuntimeException: Error while encoding: >> java.lang.RuntimeException: java.lang.String is not a valid external type >> for schema of geometry >> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) >> null else newInstance(class >> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS >> leftgeometry#2424 >> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) >> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, >> StringType, fromString, >> validateexternaltype(getexternalrowfield(assertnotnull(input[0, >> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true, >> false, true) AS leftgeometry#2425 >> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) >> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, >> StringType, fromString, >> validateexternaltype(getexternalrowfield(assertnotnull(input[0, >> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false, >> true) AS trip_id#2426 >> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) >> null else newInstance(class >> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS >> rightgeometry#2427 >> at >> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052) >> at >> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210) >> at >> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193) >> >> Thanks, >> Trang >> >
