Just some initial thought...the pull model for iterator-based operators within a thread combined with push model across threads via Exchanges is fairly well understood (based on Volcano) and it would be good to build upon it to handle cluster overload situations. A hash join for instance need not 'pull' its probe side unless the build side is done since the build side may even produce 0 rows. In the pure push model, it would be difficult to manage such cases.
I need to think more about the various alternatives proposed in this email thread from Jacques and Hanifi. Aman On Tue, Mar 1, 2016 at 8:06 AM, Jacques Nadeau <[email protected]> wrote: > The main point here is cluster behavior during heavy load periods. The goal > would be to improve cluster responsiveness and performance in those > situations. While related to the discussion on workload management, I think > of this mostly as an independent topic relating to how we do thread > modeling. > > -- > Jacques Nadeau > CTO and Co-Founder, Dremio > > On Mon, Feb 29, 2016 at 10:15 PM, Neeraja Rentachintala < > [email protected]> wrote: > > > Jacques > > can you provide more context on what user/customer problem these changes > > that you & Hanifi discussed are trying to solve. > > Is it part of the better resource utilization or concurrency/multi > tenancy > > handling or both. > > It will help to understand that as a background for the discussion. > > > > -Neeraja > > > > On Mon, Feb 29, 2016 at 9:36 PM, Jacques Nadeau <[email protected]> > > wrote: > > > > > Hanifi and I had a great conversation late last week about how Drill > > > currently provides parallelization. Hanifi suggested we move to a model > > > whereby there is a fixed threadpool for all Drill work and we treat all > > > operator and/or fragment operations as tasks that can be scheduled > within > > > that pool. This would serve the following purposes: > > > > > > 1. reduce the number of threads that Drill creates > > > 2. Decrease wasteful context switching (especially in high concurrency > > > scenarios) > > > 3. Provide more predictable slas for Drill infrastructure tasks such as > > > heartbeats/rpc and cancellations/planning and queue management/etc (a > key > > > hot-button for Vicki :) > > > > > > For reference, this is already the threading model we use for the RPC > > > threads and is a fairly standard asynchronous programming model. When > > > Hanifi and I met, we brainstormed on what types of changes might need > to > > be > > > done and ultimately thought that in order to do this, we'd > realistically > > > want to move iterator trees from a pull model to a push model within a > > > node. > > > > > > After spending more time thinking about this idea, I had the following > > > thoughts: > > > > > > - We could probably accomplish the same behavior staying with a pull > > model > > > and using the IteraOutcome.NOT_YET to return. > > > - In order for this to work effectively, all code would need to be > > > non-blocking (including reading from disk, writing to socket, waiting > for > > > zookeeper responses, etc) > > > - Task length (or coarseness) would be need to be quantized > > appropriately. > > > While operating at the RootExec.next() might be attractive, it is too > > > coarse to get reasonable sharing and we'd need to figure out ways to > have > > > time-based exit within operators. > > > - With this approach, one of the biggest challenges would be reworking > > all > > > the operators to be able to unwind the stack to exit execution (to > yield > > > their thread). > > > > > > Given those challenges, I think there may be another, simpler solution > > that > > > could cover items 2 & 3 above without dealing with all the issues that > we > > > would have to deal with in the proposal that Hanifi suggested. At its > > core, > > > I see the biggest issue is dealing with the unwinding/rewinding that > > would > > > be required to move between threads. This is very similar to how we > > needed > > > to unwind in the case of memory allocation before we supported realloc > > and > > > causes substantial extra code complexity. As such, I suggest we use a > > pause > > > approach that uses something similar to a semaphore for the number of > > > active threads we allow. This could be done using the existing > > > shouldContinue() mechanism where we suspend or reacquire thread use as > we > > > pass through this method. We'd also create some alternative > > shoudlContinue > > > methods such as shouldContinue(Lock toLock) and shouldContinue(Queue > > > queueToTakeFrom), etc so that shouldContinue would naturally wrap > > blocking > > > calls with the right logic. This would be a fairly simple set of > changes > > > and we could see how well it improves issues 2 & 3 above. > > > > > > On top of this, I think we still need to implement automatic > > > parallelization scaling of the cluster. Even a rudimentary monitoring > of > > > cluster load and parallel reduction of max_width_per_node would > > > substantially improve the behavior of the cluster under heavy > concurrent > > > loads. (And note, I think that this is required no matter what we > > implement > > > above.) > > > > > > Thoughts? > > > Jacques > > > > > > -- > > > Jacques Nadeau > > > CTO and Co-Founder, Dremio > > > > > >
