RussellSpitzer commented on issue #2533:
URL: https://github.com/apache/iceberg/issues/2533#issuecomment-834913139


   OK So here is what happens,
   
   We build up this plan which has a sort merge join
   
   The sort merge join implements as a zippartitions rdd
   ```
   result = {MapPartitionsRDD@21132} "MapPartitionsRDD[49] at sql at 
<console>:26"
    isBarrier_ = false
    prev = {ZippedPartitionsRDD2@21135} "ZippedPartitionsRDD2[48] at sql at 
<console>:26"
     f = {SortMergeJoinExec$lambda@21156} 
"org.apache.spark.sql.execution.joins.SortMergeJoinExec$$Lambda$3615/2092599862@10710787"
      arg = {SortMergeJoinExec@21170} "SortMergeJoin [user_name#39, 
start_time#32], [user_name#36, end_time#35], FullOuter\n:- *(2) Sort 
[user_name#39 ASC NULLS FIRST, start_time#32 ASC NULLS FIRST], false, 0\n:  +- 
SortAggregate(key=[user_name#39], functions=[min(log_time#37), 
max(log_time#37)], output=[start_time#32, end_time#33, user_name#39, 
_row_from_source_#42])\n:     +- SortAggregate(key=[user_name#39], 
functions=[partial_min(log_time#37), partial_max(log_time#37)], 
output=[user_name#39, min#60, max#61])\n:        +- *(1) Sort [user_name#39 ASC 
NULLS FIRST], false, 0\n:           +- *(1) Project [log_time#37, 
user_name#39]\n:              +- *(1) Filter ((isnotnull(log_time#37) AND 
(log_time#37 >= 2021-05-06 12:05:00)) AND (log_time#37 < 2021-05-06 
12:10:00))\n:                 +- BatchScan[log_time#37, user_name#39] 
iceberg.iceberg_db.user_logs [filters=log_time IS NOT NULL, log_time >= 
'2021-05-06 12:05:00', log_time < '2021-05-06 12:10:00']\n+- *(8) Sort 
[user_name#36 ASC 
 NULLS FIRST, end_time#35 ASC NULLS FIRST], false,"
      ```
      
      This is because of SortMergeJoinExec
      
      
https://github.com/apache/spark/blob/94cac5978cf33f99a9f28180c9c909d5c884c152/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L132
      
      Which does the zip partitions, but this code is not built for handling 
what happens when you try to zip a 0 partition RDD with a 1 Partition RDD.
      
      I think we basically just need to preempt our creation of a JoinPlan if 
either side of the join is an empty RDD and just bail out in the MergePlanning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to