Hello Sedona community, I was benchmarking the following geospatial range join query and similar using Sedona, based on serverless spark clusters provisioned via Amazon Glue Notebooks. I have been using Sedona version v1.5.0. Pyspark dataframes 't' and 'g' are both loaded from parquet files hosted on Amazon S3, and are partitioned in nested folder structures, based on region, year, month and date. Specifically, 'g' is a polygon layer with millions of rows representing polygons across Canada, and 't' is a point layer that has about 200 columns, and up to tens of billions of rows depending on the region/year/month/date chosen, representing points across Canada and the U.S.
result = sedona.sql( """ select t.device_uid, g.grid_id from t, g where st_contains(ST_GeomFromWKT(g.geometry), ST_GeomFromWKT(t.geometry)) """ ) I was able to increase spark cluster size in AWS Glue to handle larger volumes of the 't' dataset. For instance, as a stress test, I was able to complete a query with 't' amounting to 14 billion rows in Canada in a bit over 3 hours, with a spark cluster of 2TB memory. I am in general impressed with the performance, and my questions below are regarding potential venues to improve the performance a bit further if possible, and also help me understand deeper about Sedona performance tuning. I am attaching the result of running 'explain' on my query. Any comments and suggestions are greatly appreciated! And I apologize for the long message. 1) Geoparquet-level predicate pushdown Based on https://sedona.apache.org/1.5.0/api/sql/Optimizer/, my understanding is that geoparquet level predicate pushdown happens automatically. I have experimented with roughly equal volumes of 't' in Canada and U.S. respectively. When 't' is in the order of hundreds of millions of objects in the U.S., the above query would complete in 1/6 of the time taken by same volume of 't' data in Canada, with an identical size and configuration of spark cluster. That seems to confirm geoparquet-level predicate pushdown is at play and causes 'polygons in Canada containing points in U.S.' to return right away. When I increased the volume of 't' data 10x to the order of billions of point objects, the U.S. case is still faster than the Canada case, but only marginally, based on a spark cluster 8x larger in both cases. So for the seemingly diminishing benefit of geoparquet-level predicate pushdown, I wonder if it is hitting any limitation of Sedona's predicate pushdown mechanism, or it is potentially due to limitations associated with the way spark clusters are being scaled up in AWS Glue. In general, how can one confirm based on spark logs etc. that geoparquet-level predicate pushdown is active? Can it be turned on and off? And if so can one see statistics on volumes of data and files scanned? As a related note and question, in https://sedona.apache.org/1.5.0/api/sql/Optimizer/ under "Push spatial predicates to GeoParquet", it states that "To maximize the performance of Sedona GeoParquet filter pushdown, we suggest that you sort the data by their geohash values and then save as a GeoParquet file". In my case, the large points layer 't' does have a geohash6 column, and I took the following steps to see if that would help a) select * from 't' order by t.geohash6 b) save the result to S3 as geoparquet files, following the same partitioning scheme c) run 'g' contains 't' query against the newly ordered 't' dataset with a spark cluster of the same size and configuration Based on my results, that does help the performance a little bit, but it is very marginal to the point it is negligible. I am a little confused as to whether geoparquet-level predicate pushdown is active or not. 2) The use of pyspark dataframe function repartition(n, column) I have experimented with the repartition function against the dataset 't', i.e., the layer consists of a large number of points. From my experience, increasing the count n will help performance up until a certain point, beyond which having more partitions could actually lead to slower queries, and may also cause the spark cluster to fail with out of memory error. If I understand correctly, finding the right partition count is a matter of experimentation. Is there something in the spark logs that could be an indication that the number of partitions is too low or too high? And a second sub-question regarding repartition is that, should one repartition all dataframes involved in a query to improve performance, or just the biggest dataframe? In my case, since the point layer 't' is much greater than the polygon layer 'g' in terms of both object count and data volume in GBs on S3, I have only repartitioned 't' so far. 3) Anything else that you would recommend trying to improve the performance of my query? Thank you! Hanxi
== Parsed Logical Plan == 'Project ['t.device_uid, 'g.grid_id] +- 'Filter 'st_contains('ST_GeomFromWKT('g.geometry), 'ST_GeomFromWKT('t.geometry)) +- 'Join Inner :- 'SubqueryAlias t : +- 'UnresolvedRelation [tutela1], [], false +- 'SubqueryAlias g +- 'UnresolvedRelation [grid], [], false == Analyzed Logical Plan == device_uid: string, grid_id: bigint Project [device_uid#14, grid_id#714L] +- Filter **org.apache.spark.sql.sedona_sql.expressions.ST_Contains** +- Join Inner :- SubqueryAlias t : +- SubqueryAlias tutela1 : +- View (`tutela1`, [device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,device_hsdpacapable#38,device_umtscapable#39,device_hspaevolvedcapable#40,device_ltecapable#41,device_lteadvancedcapable#42,device_ltecategorycapable#43L,device_voltecapable#44,device_wificapable#45,device_vowificapable#46,device_rcscapable#47,device_voiceovercellularcapable#48,device_2gcapable#49,device_3gcapable#50,device_4gcapable#51,device_5gcapable#52,device_gpscapable#53,device_screenwidth#54L,device_screenheight#55L,device_storage#56L,device_usedstorage#57L,device_freestorage#58L,device_memory#59L,device_usedmemory#60L,device_freememory#61L,device_cpu#62,device_batterylevel#63,device_batterystate#64,device_systemuptime#65L,device_speed#66,device_bearing#67,device_simslotcount#68L,device_esimcount#69L,device_simsize#70,device_simmcc#71,device_simmnc#72,device_simserviceprovider#73,device_simserviceproviderbrandname#74,device_simspecificserviceprovider#75,device_simcountry#76,device_simstate#77,device_simservicetype#78,device_simaccesspointname#79,device_sdkversion#80,device_sdkconfiguration#81,connection_start#82,connection_end#83,connection_length#84L,connection_timezone#85L,connection_type#86,connection_category#87,connection_technology#88,connection_generationcategory#89,connection_mcc#90,connection_mnc#91,connection_towermcc#92,connection_towermnc#93,connection_lac#94L,connection_cid#95L,connection_enodebid#96,connection_sectorid#97L,connection_towercelltype#98,connection_serviceprovider#99,connection_serviceproviderbrandname#100,connection_mobilechannel#101L,connection_mobilefrequency#102,connection_band#103,connection_bsic#104L,connection_pci#105L,connection_psc#106L,connection_cpid#107L,connection_bandwidth#108L,connection_bssid#109,connection_ssid#110,connection_wififrequency#111L,connection_wifichannel#112L,connection_wifichannelwidth#113,connection_wifimanufacturer#114,connection_internetasn#115,connection_internetserviceprovider#116,connection_internetserviceproviderorganization#117,location_latitude#118,location_longitude#119,location_altitude#120,location_horizontalaccuracy#121,location_verticalaccuracy#122,location_availability#123,location_horizontalaccuracyassessment#124,location_assessmentmodelversion#125,location_country#126,location_region#127,location_city#128,location_geohash6#129,location_geohash7#130,location_geohash8#131,qos_date#132,qos_localdate#133,qos_uploadthroughput#134,qos_uploadthroughputdnslookup#135L,qos_uploadthroughputtimetofirstaccess#136L,qos_uploadthroughputteststatus#137,qos_uploadthroughputtestserver#138,qos_uploadthroughputtestsize#139L,qos_downloadthroughput#140,qos_downloadthroughputdnslookup#141L,qos_downloadthroughputtimetofirstaccess#142L,qos_downloadthroughputtimetofirstbyte#143L,qos_downloadthroughputteststatus#144,qos_downloadthroughputtestserver#145,qos_downloadthroughputtestsize#146L,qos_latencymin#147,qos_latencyaverage#148,qos_jittermin#149,qos_jitteraverage#150,qos_packetlossdiscardpercentage#151,qos_packetlosslostpercentage#152,qos_serverresponsednslookup#153L,qos_serverresponseteststatus#154,qos_serverresponsetestserver#155,qos_serverresponsetestconfigpacketdelay#156L,qos_serverresponsetestconfigpacketsize#157L,qos_serverresponsetestconfigsentpackets#158L,qos_icmplatencymin#159,qos_icmplatencyaverage#160,qos_icmppacketlosslostpercentage#161,qos_icmpteststatus#162,qos_icmptraceroutehops#163,qos_icmptraceroutehopcount#164L,qos_icmptracerouteteststatus#165,qos_icmptestserver#166,qos_icmptestconfigpacketdelay#167L,qos_icmptestconfigpacketsize#168L,qos_icmptestconfigsentpackets#169L,qos_icmptestconfigttl#170L,qos_icmptestconfigbytessent#171L,qos_icmptestconfigarguments#172,qos_icmptraceroutetestconfigpacketdelay#173L,qos_icmptraceroutetestconfigmaxhopcount#174L,qos_icmptraceroutetestsentpackets#175L,qos_visiblecelltowers#176L,qos_cellbandwidths#177,qos_linkspeed#178L,qos_linkspeedreceived#179L,qos_linkdownstreambandwidth#180L,qos_linkupstreambandwidth#181L,qos_signalstrength#182L,qos_signallevel#183L,qos_asu#184L,qos_deltatransmittedbytes#185L,qos_deltareceivedbytes#186L,qos_deltatransceivedbytes#187L,qos_rsrp#188L,qos_rsrq#189L,qos_rssnr#190L,qos_cqi#191L,qos_ta#192L,qos_ecio#193L,qos_evdosnr#194L,qos_gsmbiterrorrate#195L,qos_ecno#196L,qos_newradiocsirsrp#197L,qos_newradiocsirsrq#198L,qos_newradiocsisinr#199L,qos_newradiossrsrp#200L,qos_newradiossrsrq#201L,qos_newradiosssinr#202L,qos_newradiostate#203,qos_newradioendcstate#204,qos_newradiofrequencyrange#205,qos_appstate#206,qos_voiceservicestate#207,qos_ltevoiceduplexmode#208,qos_vpnconnected#209,qos_userinteraction#210,qos_dozemode#211,qos_networkrestrictions#212,qos_callstate#213,qos_castate#214,qos_mobiledataroamingstate#215,qos_tetheringstate#216,qos_dnsaddresses#217,qos_airplanemode#218,qos_testtrigger#219L,meta_createddate#220,device_typeallocationcode#221,device_hardwareclassification#222,device_esimstate#223,connection_cellbands#224,connection_cellplmns#225,connection_equivalenthomeplmns#226,connection_meteredstate#227,connection_networkcapabilities#228,connection_cellulardataallowed#229,connection_cellulardatadisabledreason#230,connection_wifiprotocol#231,location_applocationaccuracy#232,location_continent#233,qos_icmptraceroutemaxhopcount#234L,qos_rscp#235L,qos_cqitableindex#236L,qos_csicqitableindex#237L,qos_csicqireport#238,qos_appstandbybucket#239,meta_functionsversion#240,qos_day#241,region#242,year#243,month#244,day#245,geometry#479]) : +- Repartition 50000, true : +- Project [device_uid#14, device_manufacturer#15, device_manufacturerbrandname#16, device_model#17, device_modelbrandname#18, device_os#19, device_osbrandname#20, device_language#21, device_locale#22, device_useragent#23, device_isrooted#24, device_yearreleased#25L, device_primaryhardwaretype#26, device_chipsetmanufacturer#27, device_chipsetbrandname#28, device_chipsetmodel#29, device_cpubrandname#30, device_cpucores#31, device_cpumaxfrequency#32, device_gpuname#33, device_csdcapable#34, device_hscsdcapable#35, device_gprscapable#36, device_edgecapable#37, ... 209 more fields] : +- Relation [device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,... 208 more fields] parquet +- SubqueryAlias g +- SubqueryAlias grid +- View (`grid`, [grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731]) +- Relation [grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731] parquet == Optimized Logical Plan == Project [device_uid#14, grid_id#714L] +- Join Inner, **org.apache.spark.sql.sedona_sql.expressions.ST_Contains** :- Repartition 50000, true : +- Project [device_uid#14, pythonUDF0#752 AS geometry#479] : +- BatchEvalPython [convert_wkt(location_longitude#119, location_latitude#118)#478], [pythonUDF0#752] : +- Project [device_uid#14, location_latitude#118, location_longitude#119] : +- Relation [device_uid#14,device_manufacturer#15,device_manufacturerbrandname#16,device_model#17,device_modelbrandname#18,device_os#19,device_osbrandname#20,device_language#21,device_locale#22,device_useragent#23,device_isrooted#24,device_yearreleased#25L,device_primaryhardwaretype#26,device_chipsetmanufacturer#27,device_chipsetbrandname#28,device_chipsetmodel#29,device_cpubrandname#30,device_cpucores#31,device_cpumaxfrequency#32,device_gpuname#33,device_csdcapable#34,device_hscsdcapable#35,device_gprscapable#36,device_edgecapable#37,... 208 more fields] parquet +- Project [grid_id#714L, geometry#725] +- Relation [grid_id#714L,population_2016#715,dwellings_2016#716,population_2011#717,dwellings_2011#718,population_2006#719,dwellings_2006#720,population_2001#721,dwellings_2001#722,population_1996#723,dwellings_1996#724,geometry#725,centroid#726,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731] parquet == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [device_uid#14, grid_id#714L] +- RangeJoin **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT** , **org.apache.spark.sql.sedona_sql.expressions.ST_GeomFromWKT** , WITHIN :- Exchange RoundRobinPartitioning(50000), REPARTITION_BY_NUM, [id=#60] : +- Project [device_uid#14, pythonUDF0#752 AS geometry#479] : +- BatchEvalPython [convert_wkt(location_longitude#119, location_latitude#118)#478], [pythonUDF0#752] : +- Project [device_uid#14, location_latitude#118, location_longitude#119] : +- FileScan parquet [device_uid#14,location_latitude#118,location_longitude#119,region#242,year#243,month#244,day#245] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://000crcdatalake-hanxi/tutela_ca_all], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<device_uid:string,location_latitude:double,location_longitude:double> +- Project [grid_id#714L, geometry#725] +- FileScan parquet [grid_id#714L,geometry#725,tier_1#727,tier_2#728,tier_3#729,tier_4#730,tier_5#731] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://000crcdatalake/ised_grid], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<grid_id:bigint,geometry:string> None J'ai fini