Hi Trang,

1. For Sedona SQL join, you usually don't need to set other parameters via
conf. The repartition will simply work. The new num partitions might be
preserved in the final result and it is something Sedona tried to achieve
but not guaranteed.
2. For RDD Join, as shown in the two links I gave you:

var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")

This will bring field names to SpatialRDD (saved in SpatialRDD.fieldNames
attribute).

When you finish your spatial join on two SpatialRDD and get joinResult
(which is a pairRdd), run.

import scala.collection.JavaConversions._
var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames,
rightRdd.fieldNames, sparkSession)

This will give the dataframe with the original column names. If it doesn't
work, try to swap the fieldNames: Adapter.toDf(joinResultPairRDD,
rightRdd.fieldNames, leftRdd.fieldNames, sparkSession)


On Sat, Jan 7, 2023 at 11:30 PM Trang Nguyen <[email protected]> wrote:

> Hi Jia,
>
> Thanks for the quick response. I also tried to repartition trips as below
> to different values but never see the count I specified during the spatial
> join itself.
> Not exactly sure why but I am trying now against the spatial RDDs directly.
> Is there a way to propagate the custom fields across?
>
> Thanks
> Trang
>
> -----Original Message-----
> From: Jia Yu <[email protected]>
> Sent: Saturday, January 7, 2023 10:26 PM
> To: [email protected]
> Subject: Re: Propagating user defined attributes to spatial join
>
> Use Good Judgement: This email originated outside of INRIX Do not click on
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
> Hi Trang,
>
> The slow join performance issue is mostly caused by too few partitions.
>
> The most straightforward way to increase the number of partitions is:
> repartition both input DataFrame right after you load them from disk.
>
> e.g.,
>
> var tripDf = spark.read(XXX)
> tripDf = tripDf.repartition(tripDf.numPartitions * 5)
>
> var poiDf = spark.read(XXX)
> poiDf = poiDf.repartition(poiDf.numPartitions * 5)
>
> Then perform the SQL spatial join
> ====
>
> If you want to use RDD API,
>
> Please read [1] and [2]
>
> [1]
>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23dataframe-to-spatialrdd&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=dJmn0XS64LzkJTp%2Fq%2FEsQc%2Fy5hke%2B02HecrZyphTVp4%3D&reserved=0
> [2]
>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsedona.apache.org%2F1.3.1-incubating%2Ftutorial%2Fsql%2F%23spatialpairrdd-to-dataframe&data=05%7C01%7CTrang.Nguyen%40inrix.com%7C5af8964fe2b14c55229608daf14142e7%7C6ad2e4da8c924e588877ed06b8918379%7C0%7C0%7C638087559848338745%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=8v35stCSvME%2FCB%2FlgzjvAyT7%2BoNcgFoZtu58q5ImO7A%3D&reserved=0
>
> Thanks,
> Jia
>
>
>
> On Sat, Jan 7, 2023 at 11:14 PM Trang Nguyen <[email protected]>
> wrote:
>
> > Hi,
> >
> > I'm newbie to Sedona and am running into performance issues using the
> > dataframe api to handle spatial joins with st_intersects.  From the
> > execution plan, I see that this is largely due to  too few partitions
> > getting used for the spatial join.
> >
> > I've tried to increase the partition size by setting:
> > sparkSession.conf.set("spark.sql.shuffle.partitions",
> > appConf.getInputPartitions.toString)
> > sparkSession.conf.set("geospark.join.numpartition",
> > appConf.getInputPartitions.toString)
> > sparkSession.conf.set("spark.default.parallelism",
> > appConf.getInputPartitions.toString)
> >
> > The setting changse seem to make minimal difference.
> >
> > I'm trying now to use convert the dataframes to be joined into
> > spatialRDDs so that I can set the number of partitions for the spatial
> join.
> > However, I am running into a different issue when I try to convert
> > back from the spatial joined result into a dataframe because the extra
> > attributes from the original dataframes are not getting propagated
> > through the join.
> >
> > I am using the Adapter class for the conversion.
> >
> >
> > val tripRDD = Adapter.toSpatialRdd(trips, "geom", tripColumns) val
> > rddWithOtherAttributes = tripRDD.rawSpatialRDD.rdd.map[String](f=> {
> >   f.getUserData.toString
> > })
> > tripRDD.analyze()
> >
> > val poiRDD = Adapter.toSpatialRdd(poiDS.toDF, "geom", poiColumns)
> > poiRDD.analyze()
> >
> >
> >
> > tripRDD.spatialPartitioning(GridType.KDBTREE,
> > appConf.getInputPartitions)
> >
> >
> >
> >
> >
> > val joinRes = JoinQuery.SpatialJoinQueryFlat(tripRDD, poiRDD,
> > usingIndex,
> > spatialPredicate)
> >
> > val df = Adapter.toDf(joinRes, tripColumns, poiColumns, sparkSession)
> >
> > df.show
> >
> >
> >
> >
> >
> >
> >
> > How can I get the original attributes to be propagated as part of the
> > join? I searched the documentation but couldn't find any documentation
> > on this.
> > By specifying the columns to be carried through in the
> > Adapter.toSpatialRdd, I assume that the attributes would be carried
> > through into the join as well.
> >
> > Here is the error I am seeing:
> > va.lang.RuntimeException: Error while encoding:
> > java.lang.RuntimeException: java.lang.String is not a valid external
> > type for schema of geometry if (assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class
> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> > leftgeometry#2424
> > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
> > null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> > StringType, fromString,
> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]), 1, leftgeometry), StringType), true,
> > false, true) AS leftgeometry#2425 if (assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]).isNullAt) null else
> > staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
> > StringType, fromString,
> > validateexternaltype(getexternalrowfield(assertnotnull(input[0,
> > org.apache.spark.sql.Row, true]), 2, trip_id), StringType), true,
> > false,
> > true) AS trip_id#2426
> > if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt)
> > null else newInstance(class
> > org.apache.spark.sql.sedona_sql.UDT.GeometryUDT).serialize AS
> > rightgeometry#2427
> >                at
> >
> org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
> >                at
> >
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
> >                at
> > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.ap
> > ply(ExpressionEncoder.scala:193)
> >
> > Thanks,
> > Trang
> >
>

Reply via email to