Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like
/* 094 */ protected void processNext() throws java.io.IOException { /* 095 */ /*** PRODUCE: Project [id#79L] */ /* 096 */ /* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 098 */ /* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */ /* 100 */ /* 101 */ // initialize Range /* 102 */ if (!range_initRange) { /* 103 */ range_initRange = true; /* 104 */ initRange(partitionIndex); /* 105 */ } /* 106 */ /* 107 */ while (!range_overflow && range_number < range_partitionEnd) { /* 108 */ long range_value = range_number; /* 109 */ range_number += 1L; /* 110 */ if (range_number < range_value ^ 1L < 0) { /* 111 */ range_overflow = true; /* 112 */ } /* 113 */ /* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */ /* 115 */ /* 116 */ // generate join key for stream side /* 117 */ /* 118 */ // find matches from HashedRelation /* 119 */ UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(range_value); /* 120 */ if (bhj_matched == null) continue; /* 121 */ /* 122 */ bhj_metricValue.add(1); /* 123 */ /* 124 */ /*** CONSUME: Project [id#79L] */ /* 125 */ /* 126 */ System.out.println("i got one row"); /* 127 */ /* 128 */ /*** CONSUME: WholeStageCodegen */ /* 129 */ /* 130 */ project_rowWriter.write(0, range_value); /* 131 */ append(project_result); /* 132 */ */* 133 */ if (shouldStop()) return;* /* 134 */ } /* 135 */ } /* 136 */ } shouldStop is false once we go pass the limit. On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com> wrote: > From the physical plan, the limit is one level up than the > WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move > it work, the limit has to be part of the wholeStageCodeGen. > > Correct me if I am wrong. > > Thanks. > > Zhan Zhang > > On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com> wrote: > > I could be wrong but I think we currently do that through whole stage > codegen. After processing every row on the stream side, the generated code > for broadcast join checks whether it has hit the limit or not (through this > thing called shouldStop). > > It is not the most optimal solution, because a single stream side row > might output multiple hits, but it is usually not a problem. > > > On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com> wrote: > >> While you can't automatically push the limit *through* the join, we could >> push it *into* the join (stop processing after generating 10 records). I >> believe that is what Rajesh is suggesting. >> >> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier < >> hvanhov...@questtec.nl> wrote: >> >>> I am not sure if you can push a limit through a join. This becomes >>> problematic if not all keys are present on both sides; in such a case a >>> limit can produce fewer rows than the set limit. >>> >>> This might be a rare case in which whole stage codegen is slower, due to >>> the fact that we need to buffer the result of such a stage. You could try >>> to disable it by setting "spark.sql.codegen.wholeStage" to false. >>> >>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com> >>> : >>> >>>> Hi, >>>> >>>> I ran the following query in spark (latest master codebase) and it took >>>> a lot of time to complete even though it was a broadcast hash join. >>>> >>>> It appears that limit computation is done only after computing complete >>>> join condition. Shouldn't the limit condition be pushed to >>>> BroadcastHashJoin (wherein it would have to stop processing after >>>> generating 10 rows?). Please let me know if my understanding on this is >>>> wrong. >>>> >>>> >>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey >>>> limit 10; >>>> >>>> >>>> >>>> | == Physical Plan == >>>> CollectLimit 10 >>>> +- WholeStageCodegen >>>> : +- Project [l_partkey#893] >>>> : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, >>>> BuildRight, None >>>> : :- Project [l_partkey#893] >>>> : : +- Filter isnotnull(l_partkey#893) >>>> : : +- Scan HadoopFiles[l_partkey#893] Format: ORC, >>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int> >>>> : +- INPUT >>>> +- BroadcastExchange >>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as >>>> bigint)),List(ps_partkey#908)) >>>> +- WholeStageCodegen >>>> : +- Project [ps_partkey#908] >>>> : +- Filter isnotnull(ps_partkey#908) >>>> : +- Scan HadoopFiles[ps_partkey#908] Format: ORC, >>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int> >>>> | >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> ~Rajesh.B >>>> >>> >>> >> > >