On Sat, Jan 7, 2023 at 11:26 PM Jia Yu <[email protected]> wrote:

> Hi Trang,
>
> The slow join performance issue is mostly caused by too few partitions.
>
> The most straightforward way to increase the number of partitions is:
> repartition both input DataFrame right after you load them from disk.
>
> e.g.,
>
> var tripDf = spark.read(XXX)
> tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> var poiDf = spark.read(XXX)
> poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> Then perform the SQL spatial join
> ====
>
> If you want to use RDD API,
>
> Please read [1] and [2]
>
> [1]
> https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#dataframe-to-spatialrdd
> [2]
> https://sedona.apache.org/1.3.1-incubating/tutorial/sql/#spatialpairrdd-to-dataframe
>
> Thanks,
> Jia
>
>
>
> On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <[email protected]>
> wrote:
>
>> Hi,
>>
>> I'm newbie to Sedona and am running into performance issues using the
>> dataframe api to handle spatial joins with st_intersects.  From the
>> execution plan, I see that this is largely due to  too few partitions
>> getting used for the spatial join.
>>
>> I've tried to increase the partition size by setting:
>> sparkSession.conf.set("spark.sql.shuffle.partitions",
>> appConf.getInputPartitions.toString)
>> sparkSession.conf.set("geospark.join.numpartition",
>> appConf.getInputPartitions.toString)
>> sparkSession.conf.set("spark.default.parallelism",
>> appConf.getInputPartitions.toString)
>>
>> The setting changse seem to make minimal difference.
>>
>> I'm trying now to use convert the dataframes to be joined into
>> spatialRDDs so that I can set the number of partitions for the spatial join.
>> However, I am running into a different issue when I try to convert back
>> from the spatial joined result into a dataframe because the extra
>> attributes from the original dataframes are not getting propagated through
>> the join.
>>
>> I am using the Adapter class for the conversion.
>>
>>
>> val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns)
>> val rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
>>   f.getUserData.toString
>> })
>> tripRDD.analyze()
>>
>> val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
>> poiRDD.analyze()
>>
>>
>>
>> tripRDD.spatialPartitioning(GridType.KDBTREE, appConf.getInputPartitions)
>>
>>
>>
>>
>>
>> val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD, usingIndex,
>> spatialPredicate)
>>
>> val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
>>
>> df.show
>>
>>
>>
>>
>>
>>
>>
>> How can I get the original attributes to be propagated as part of the
>> join? I searched the documentation but couldn't find any documentation on
>> this.
>> By specifying the columns to be carried through in the
>> Adapter.toSpatialRdd, I assume that the attributes would be carried through
>> into the join as well.
>>
>> Here is the error I am seeing:
>> va.lang.RuntimeException: Error while encoding:
>> java.lang.RuntimeException: java.lang.String is not a valid external type
>> for schema of geometry
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else newInstance(class
>> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>> leftgeometry#2424
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
>> false, true) AS leftgeometry#2425
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true, false,
>> true) AS trip_id#2426
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
>> null else newInstance(class
>> org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
>> rightgeometry#2427
>>                at
>> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
>>                at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
>>                at
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
>>
>> Thanks,
>> Trang
>>
>

Reply via email to