[
https://issues.apache.org/jira/browse/SEDONA-233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677707#comment-17677707
]
Martin Andersson commented on SEDONA-233:
-----------------------------------------
[~jiayu] yes a cache would be one way to work around the bug.
To fix the bug in Sedona it would be nice to find a more reliable method to get
the partitionId. I did a quick test and the xxxWithIndex operations on RDD
seems to be working. Unfortunately there is no zipPartitionsWithIndex. But the
join could be performed in two steps. First the rdds are joined as usual with
zipPartitions but the deduplication isn't executed yet - all matches are
yielded. Then a deduplication step is executed with mapPartitionsWithIndex.
> Incorrect results for several joins in a single stage
> -----------------------------------------------------
>
> Key: SEDONA-233
> URL: https://issues.apache.org/jira/browse/SEDONA-233
> Project: Apache Sedona
> Issue Type: Bug
> Reporter: Martin Andersson
> Priority: Major
> Attachments: image-2023-01-16-17-38-00-132.png
>
>
> Queries with several joins in a single stage leads to warning logs and
> possibly incorrect results. One way to trigger the error is to use the union
> operator.
> {code:java}
> joined_df = geo_df.alias("a").join(geo_df.alias("b"),
> f.expr("st_intersects(a.geom, b.geom)"))
> joined_df.union(joined_df).count()
> {code}
> Logs:
> {code:java}
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this
> partition: 8
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this
> partition: 11
> 23/01/16 17:22:58 WARN JudgementBase: Didn't find partition extent for this
> partition: 12
> ...
> {code}
> Partitioned joins in Sedona assumes that TaskContext.partitionId is the same
> as the grid id used for partitioning (JudgementBase::initPartition). That
> isn't true if Spark runs several joins in a single stage.
> In the example above, if 10 partitions are used in each join, Spark will run
> the two joins in a single stage with 20 tasks. The second join will have
> partition id 10-19 instead of the expected 0-9. The second join could produce
> incorrect results. If the partition extent isn't found there is no
> deduplication. If it maps to the wrong extent it could eliminate rows that
> shouldn't be eliminated.
> From spark-ui. Two joins in a single stage:
> !image-2023-01-16-17-38-00-132.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)