Unless I'm really missing something I don't think so. As I said, it goes
through an iterator and after processing each stream side we do a
shouldStop check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */     /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 098 */
/* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */     // initialize Range
/* 102 */     if (!range_initRange) {
/* 103 */       range_initRange = true;
/* 104 */       initRange(partitionIndex);
/* 105 */     }
/* 106 */
/* 107 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */       long range_value = range_number;
/* 109 */       range_number += 1L;
/* 110 */       if (range_number < range_value ^ 1L < 0) {
/* 111 */         range_overflow = true;
/* 112 */       }
/* 113 */
/* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 115 */
/* 116 */       // generate join key for stream side
/* 117 */
/* 118 */       // find matches from HashedRelation
/* 119 */       UnsafeRow bhj_matched = false ? null:
(UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */       if (bhj_matched == null) continue;
/* 121 */
/* 122 */       bhj_metricValue.add(1);
/* 123 */
/* 124 */       /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */       System.out.println("i got one row");
/* 127 */
/* 128 */       /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */       project_rowWriter.write(0, range_value);
/* 131 */       append(project_result);
/* 132 */
*/* 133 */       if (shouldStop()) return;*
/* 134 */     }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:

> From the physical plan, the limit is one level up than the
> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
> it work, the limit has to be part of the wholeStageCodeGen.
>
> Correct me if I am wrong.
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com> wrote:
>
> I could be wrong but I think we currently do that through whole stage
> codegen. After processing every row on the stream side, the generated code
> for broadcast join checks whether it has hit the limit or not (through this
> thing called shouldStop).
>
> It is not the most optimal solution, because a single stream side row
> might output multiple hits, but it is usually not a problem.
>
>
> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com> wrote:
>
>> While you can't automatically push the limit *through* the join, we could
>> push it *into* the join (stop processing after generating 10 records). I
>> believe that is what Rajesh is suggesting.
>>
>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>> hvanhov...@questtec.nl> wrote:
>>
>>> I am not sure if you can push a limit through a join. This becomes
>>> problematic if not all keys are present on both sides; in such a case a
>>> limit can produce fewer rows than the set limit.
>>>
>>> This might be a rare case in which whole stage codegen is slower, due to
>>> the fact that we need to buffer the result of such a stage. You could try
>>> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>
>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com>
>>> :
>>>
>>>> Hi,
>>>>
>>>> I ran the following query in spark (latest master codebase) and it took
>>>> a lot of time to complete even though it was a broadcast hash join.
>>>>
>>>> It appears that limit computation is done only after computing complete
>>>> join condition.  Shouldn't the limit condition be pushed to
>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>> generating 10 rows?).  Please let me know if my understanding on this is
>>>> wrong.
>>>>
>>>>
>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>> limit 10;
>>>>
>>>> >>>>
>>>> | == Physical Plan ==
>>>> CollectLimit 10
>>>> +- WholeStageCodegen
>>>>    :  +- Project [l_partkey#893]
>>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>>>> BuildRight, None
>>>>    :        :- Project [l_partkey#893]
>>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>    :        +- INPUT
>>>>    +- BroadcastExchange
>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>> bigint)),List(ps_partkey#908))
>>>>       +- WholeStageCodegen
>>>>          :  +- Project [ps_partkey#908]
>>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>  |
>>>> >>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ~Rajesh.B
>>>>
>>>
>>>
>>
>
>

Reply via email to