Hello Jia,

thank you so much for your support.

We have been able to complete our task and to perform a few runs with
different number of partitions.
At the moment we obtained the best performance when running on 20 nodes and
setting the number of partitions to be 2000. With this configuration, it
took approximately 45 minutes to write the join's output.

Then we tried to perform the same join through broadcast as you suggested
to see whether we could achieve better results but actually we obtained the
following error when calling an action like broadcast_join.show() on the
output

Py4JJavaError: An error occurred while calling o699.showString.
: scala.MatchError: SpatialIndex polygonshape#422: geometry, QUADTREE,
[id=#3312]

We would be grateful if you can support us on this.


The broadcast join was performed as follows: broadcast_join =
points_df.alias('points').join(f.broadcast(polygons_df).alias('polygons'),
f.expr('ST_Contains(polygons.polygonshape, points.pointshape)'))

Attached you can find the pseudo code we used to test broadcast join.


Looking forward to hearing from you.


Kind regards,

Pietro Greselin


On Wed, 28 Jul 2021 at 00:46, Jia Yu <ji...@apache.org> wrote:

> Hi Pietro,
>
> A few tips to optimize your join:
>
> 1. Mix DF and RDD together and use RDD API for the join part. See the
> example here:
> https://github.com/apache/incubator-sedona/blob/master/binder/ApacheSedonaSQL_SpatialJoin_AirportsPerCountry.ipynb
>
> 2. When use point_rdd.spatialPartitioning(GridType.KDBTREE, 4), try to
> use a large number of partitions (say 1000 or more)
>
> If this approach doesn't work, consider broadcast join if needed.
> Broadcast the polygon side:
> https://sedona.apache.org/api/sql/Optimizer/#broadcast-join
>
> Thanks,
> Jia
>
>
> On Tue, Jul 27, 2021 at 2:21 AM pietro greselin <p.grese...@gmail.com>
> wrote:
>
>> To whom it may concern,
>>
>> we reported the following Sedona behaviour and would like to ask your
>> opinion on how we can otpimize it.
>>
>> Our aim is to perform a inner spatial join between a points_df and a
>> polygon_df when a point in points_df is contained in a polygon from
>> polygons_df.
>> Below you can find more details about the 2 dataframes we are considering:
>> - points_df: it contains 50mln events with latitude and longitude
>> approximated to the third decimal digit;
>> - polygon_df: it contains 10k multi-polygons having 600 vertexes on
>> average.
>>
>> The issue we are reporting is a very long computing time and the spatial
>> join query never completing even when running on cluster with 40 workers
>> with 4 cores each.
>> No error is being print by driver but we are receiving the following
>> warning:
>> WARN org.apache.sedona.core.spatialOperator.JoinQuery: UseIndex is true,
>> but no index exists. Will build index on the fly.
>>
>> Actually we were able to run successfully the same spatial join when only
>> considering a very small sample of events.
>> Do you have any suggestion on how we can archive the same result on
>> higher volumes of data or if there is a way we can optimize the join?
>>
>> Attached you can find the pseudo-code we are running.
>>
>> Looking forward to hearing from you.
>>
>> Kind regards,
>> Pietro Greselin
>>
>
def toSpatialPoints(df, lat_column_name, lon_column_name):
    df.createOrReplaceTempView("points_df")
    return (
        spark.sql("SELECT *, ST_Point({}, {}) AS pointshape FROM 
points_df".format(lon_column_name, lat_column_name))
        .drop(lat_column_name, lon_column_name)
    )

def toSpatialPolygons(df, wtk_column_name):
    df.createOrReplaceTempView("polygons_df")
    return (
        spark.sql("SELECT *, ST_GeomFromWKT({}) AS polygonshape FROM 
polygons_df".format(wtk_column_name))
        .drop(wtk_column_name)
    )


maps = spark.read.parquet(maps_path).select('AREA_ID', 'WKT')
polygons_df = toSpatialPolygons(maps, 'WKT')

events = spark.read.parquet(events_path).select('ID', 'LATITUDE', 'LONGITUDE')
points_df = toSpatialPoints(events, 'LATITUDE', 'LONGITUDE')

broadcast_join = 
points_df.alias('points').join(f.broadcast(polygons_df).alias('polygons'), 
f.expr('ST_Contains(polygons.polygonshape, points.pointshape)'))
broadcast_join.show()

Reply via email to