@Jinfeng & Aman

The proposal here is basically async execution. Pull or push are both fine.
The main requirement is to eliminate blocking calls on incoming and
outgoing buffers. In the pull model, like we discussed yesterday with some
of Drillers, NOT_YET is the only state that needs to be handled thoroughly
by operators. I do not see any state keeping besides NOT_YET in DAG. I am
not sure what additional stuff Jacques is referring to by
unwinding/rewinding states.

@Jinfeng
1: The new model is async. Regardless, it is pull or push only a single
thread works on a fragment at a time. However, thread and fragment is
decoupled in the new model. That is, the same thread could execute another
fragment on the next tick. This substantially reduces number of threads
spawned from 1000s to a fraction of number of available cores.

2: No. Each task runs through the entire fragment/operator tree.

3: Yes. Please refer to my analysis at [1] where Drill ends up having a
churn of ctx switches (~90,000/sec) for some time window(about a minute),
causing threads to starve, timing out ZooKeeper and eventually choking the
entire system.


Thanks.
-Hanifi

1: https://issues.apache.org/jira/browse/DRILL-4325


2016-03-01 11:36 GMT-08:00 Hanifi GUNES <[email protected]>:

> Do you want to elaborate on and possibly walk though an example as to how
> shouldContinue(...) behaves at fragment boundaries(entry/exit) and in the
> middle considering back-pressure, inner pull loops like hash join, blocking
> semantics etc?
>
>
> Thanks.
> -Hanifi
>
>
>
> 2016-02-29 22:15 GMT-08:00 Neeraja Rentachintala <
> [email protected]>:
>
>> Jacques
>> can you provide more context on what user/customer problem these changes
>> that you & Hanifi discussed are trying to solve.
>> Is it part of the better resource utilization or concurrency/multi tenancy
>> handling or both.
>> It will help to understand that as a background for the discussion.
>>
>> -Neeraja
>>
>> On Mon, Feb 29, 2016 at 9:36 PM, Jacques Nadeau <[email protected]>
>> wrote:
>>
>> > Hanifi and I had a great conversation late last week about how Drill
>> > currently provides parallelization. Hanifi suggested we move to a model
>> > whereby there is a fixed threadpool for all Drill work and we treat all
>> > operator and/or fragment operations as tasks that can be scheduled
>> within
>> > that pool. This would serve the following purposes:
>> >
>> > 1. reduce the number of threads that Drill creates
>> > 2. Decrease wasteful context switching (especially in high concurrency
>> > scenarios)
>> > 3. Provide more predictable slas for Drill infrastructure tasks such as
>> > heartbeats/rpc and cancellations/planning and queue management/etc (a
>> key
>> > hot-button for Vicki :)
>> >
>> > For reference, this is already the threading model we use for the RPC
>> > threads and is a fairly standard asynchronous programming model. When
>> > Hanifi and I met, we brainstormed on what types of changes might need
>> to be
>> > done and ultimately thought that in order to do this, we'd realistically
>> > want to move iterator trees from a pull model to a push model within a
>> > node.
>> >
>> > After spending more time thinking about this idea, I had the following
>> > thoughts:
>> >
>> > - We could probably accomplish the same behavior staying with a pull
>> model
>> > and using the IteraOutcome.NOT_YET to return.
>> > - In order for this to work effectively, all code would need to be
>> > non-blocking (including reading from disk, writing to socket, waiting
>> for
>> > zookeeper responses, etc)
>> > - Task length (or coarseness) would be need to be quantized
>> appropriately.
>> > While operating at the RootExec.next() might be attractive, it is too
>> > coarse to get reasonable sharing and we'd need to figure out ways to
>> have
>> > time-based exit within operators.
>> > - With this approach, one of the biggest challenges would be reworking
>> all
>> > the operators to be able to unwind the stack to exit execution (to yield
>> > their thread).
>> >
>> > Given those challenges, I think there may be another, simpler solution
>> that
>> > could cover items 2 & 3 above without dealing with all the issues that
>> we
>> > would have to deal with in the proposal that Hanifi suggested. At its
>> core,
>> > I see the biggest issue is dealing with the unwinding/rewinding that
>> would
>> > be required to move between threads. This is very similar to how we
>> needed
>> > to unwind in the case of memory allocation before we supported realloc
>> and
>> > causes substantial extra code complexity. As such, I suggest we use a
>> pause
>> > approach that uses something similar to a semaphore for the number of
>> > active threads we allow. This could be done using the existing
>> > shouldContinue() mechanism where we suspend or reacquire thread use as
>> we
>> > pass through this method. We'd also create some alternative
>> shoudlContinue
>> > methods such as shouldContinue(Lock toLock) and shouldContinue(Queue
>> > queueToTakeFrom), etc so that shouldContinue would naturally wrap
>> blocking
>> > calls with the right logic. This would be a fairly simple set of changes
>> > and we could see how well it improves issues 2 & 3 above.
>> >
>> > On top of this, I think we still need to implement automatic
>> > parallelization scaling of the cluster. Even a rudimentary monitoring of
>> > cluster load and parallel reduction of max_width_per_node would
>> > substantially improve the behavior of the cluster under heavy concurrent
>> > loads. (And note, I think that this is required no matter what we
>> implement
>> > above.)
>> >
>> > Thoughts?
>> > Jacques
>> >
>> > --
>> > Jacques Nadeau
>> > CTO and Co-Founder, Dremio
>> >
>>
>
>

Reply via email to