Hi Tomas, thanks for reporting this bug! Is it possible to share your dataset so that other people can reproduce and debug it?
On Thu, Apr 8, 2021 at 7:52 AM Tomas Bartalos <tomas.barta...@gmail.com> wrote: > when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get > an incorrect result of 0 rows. > > val rightDF = spark.read.format("parquet").load("table-a") > val leftDF = spark.read.format("parquet").load("table-b") > //needed to activate dynamic pruning subquery > .where('part_ts === 20210304000L) > > // leftDF has 7 Mil rows ~ 120 MB > val join = broadcast(leftDF).join(rightDF, > $"match_part_id" === $"part_id" && $"match_id" === $"id" > ) > join.count > > res1: Long = 0 > > I think it's connected with Dynamic Partition Pruning of the rightDF, > which is happening according to the plan: > > PartitionFilters: [isnotnull(part_id#477L), > dynamicpruningexpression(part_id#477L IN dynamicpruning#534)] > > ===== Subqueries ===== > > Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L IN > dynamicpruning#534 > ReusedExchange (11) > > > (11) ReusedExchange [Reuses operator id: 5] > Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L] > > *Removing the broadcast hint OR shrinking the broadcasted table corrects > the result*: > > val rightDF = spark.read.format("parquet").load("table-a") > val leftDF = spark.read.format("parquet").load("table-b") > //needed to activate dynamic pruning subquery > .where('part_ts === 20210304000L) > // shrinks the broadcasted table to 18K rows > .where('match_id === 33358792) > > // leftDF has 18K rows > val join = broadcast(leftDF).join(rightDF, > $"match_part_id" === $"part_id" && $"match_id" === $"id" > ) > join.count > > res2: Long = 379701 > > I would expect the broadcast to fail, but would never expect to get > incorrect results without an exception. What do you think ? > > > BR, > > Tomas >