I think I missed one thing in the second solution. Drill needs to report the locations of the source of skipped records, which are known only at Scan. It seems hop-by-hop is needed to carry that information.
On Sun, Dec 13, 2015 at 4:36 PM, Jacques Nadeau <[email protected]> wrote: > If your goal is early termination, sending the messages back as quickly as > possible to the Screen or similar centralized operator will allow you to > respond quickly. Remember that there will likely be many fragments > executing in parallel. > > -- > Jacques Nadeau > CTO and Co-Founder, Dremio > > On Sun, Dec 13, 2015 at 2:46 PM, Hsuan Yi Chu <[email protected]> wrote: > > > On Sun, Dec 13, 2015 at 9:15 AM, Jacques Nadeau <[email protected]> > > wrote: > > > > > You seem to be mixing multiple things in your response. > > > > > > - Why do you say this complex? It is very simple. Is it because you > don't > > > know how it would be implemented? I'm offering to do the vast majority > of > > > the work to implement the framework so you shouldn't use that as a > gauge. > > > - It is designed to provide for multiple different use cases, not just > > your > > > own. As such, you should expect it to be more general. There is > clearly a > > > need to provide these messages in direction other than straight up the > > > operator tree. There is also a need to provide sideband messages > outside > > > the context of a record batch. (We shouldn't be creating fake empty > > record > > > batches just to send sideband messages, that caused us problems before > on > > > the UserRpc and I think we should compound purposes.) > > > - You should evaluate whether it would solve the use case you > presented. > > I > > > believe it will. > > > > > > As far as your proposed implementation goes: I think you are > confounding > > > communication with the user with traversal of the operator tree. I > would > > > assume that each operator may be able to skip records. When you > > accumulate > > > that information, you would want to know how much skip there were for > > each > > > operator. The info might look like: > > > > > > skips: [ > > > { op: 1:1:1, records: [123,456,789]} > > > { op: 1:2:1, records: [123,456,789]} > > > { op: 1:1:2, records: [123,456,789]} > > > { op: 1:2:2, records: [123,456,789]} > > > ] > > > > > > In this case, there is no need for operator 1:1:1 to know about > operator > > > 1:1:2's skips. It shouldn't even need to manage or move that data. So I > > > believe your requirements are actually to provide a stream of skip > > records > > > to a separate writer that should be on the edge of the plan. The more I > > > talk through this, I'm wondering if sideband messages should take the > > same > > > shape as a separate record batch and that we need to provide a separate > > > subtree/fragment for this purpose. Sideband in that case would be a tee > > in > > > the plan. > > > > > > > For the case of skipping records, we will have a threshold, which defines > > the bound on # of skipped records before Drill fails the query. Thus, if > > operator 1:1:1 can be informed of how many records have been skipped in > the > > upstream operators, we could fail the query earlier. > > > > Given this in mind, we could have two solutions to fail the query > earlier: > > 1. Let the sideband message hop from upstream to downstream. On the way, > > each operator determines to fail the query if the threshold is exceeded. > > > > 2. While each operator does work independently, the sideband sink > operator > > would be the one and the only one which has the knowledge of how many # > > have been skipped. Once the threshold is exceeded, this sink operator > will > > be responsible to stop the query (via another sideband message to inform > > the foreman). > > > > When I read the proposal, I was thinking about the first solution. > > Certainly, the second one seems leveraging sideband better. > > > > For example, imagine this tree: > > > > > > > > > > > > https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N05uU/edit > > > > > > As you can see, I believe that the vast majority of the issues that you > > > want to manage with your skip record design can be managed by > providing a > > > couple of simple tools: sideband, sideband sink operator (basically a > > > custom version of the union receiver), and an enhancement to the Screen > > > operator to support a secondary incoming stream with a defined schema > > that > > > will be transformed into a set of warnings (this also allows fine > grained > > > warnings or use an aggregate in the secondary tree for aggregate > > warnings). > > > > > > The key goal here is trying to avoid the introduction of a new or more > > > complicated interfaces at the execution layer and instead use the > logical > > > layer to manage things. I believe this also extends to the concept of > > > $recordIdentifier (or similar). This should simply be a virtual field > > > produced by all record readers (when requested) that includes relevant > > > provenance information. If you want to know which records are > > problematic, > > > ask for the identifier and then record in a separate file. Basically, > > let's > > > use the highly efficient infrastructure we already have to do new > things > > > rather than implementing a new set of classes and concepts. > > > > > > > > > -- > > > Jacques Nadeau > > > CTO and Co-Founder, Dremio > > > > > > On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu <[email protected]> > > wrote: > > > > > > > The design scope is very general, but, for the applications we are > > > thinking > > > > about now, this is a bit complex and will make the solutions a little > > bit > > > > indirect. Especially, this one "data to be sent between any two > > > > three-coordinate locations" implies sideband data goes in teleport? > > This > > > is > > > > a bit too involving. And even for advanced pushdown, it is not > > necessary > > > to > > > > be that flexible for communications. > > > > > > > > My original picture of "sideband" is that the additional information > > > should > > > > be "associated with" RecordBatch. That means this additional > > information > > > > should be attached to a particular RecordBatch and cannot run on > their > > > own. > > > > > > > > As the RecordBatch flows from upstream to downstream, the operator > can > > > > optionally access or update the sideband message. > > > > For example, in the application of record-skipping, operator can see > > how > > > > many records were skipped so far and increment the count if more are > > > > skipped. > > > > > > > > If we go with this design, the place we need to change is on the > > receiver > > > > side, which needs to decode the sideband info from the incoming > > buffers. > > > > > > > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau <[email protected]> > > > wrote: > > > > > > > > > inline > > > > > > > > > > It seems that SidebandTunnel is point-to-point. That is, there is > one > > > > > > producer and one consumer. No broadcast or topics (multiple > > consumers > > > > of > > > > > > the same message). Order is preserved. At-most-once (i.e. may > lose > > > data > > > > > in > > > > > > event of failure). Producer and consumer may be on the same node > or > > > > > > different nodes. Correct? > > > > > > > > > > > > > > > > Yes, you are correct in all of this. Since we don't use UDP in > Drill, > > > we > > > > do > > > > > broadcast as a collection of individual p2p calls, all using the > same > > > > > message (and multiple reference counts if using raw bytes). > > > > > > > > > > > > > > > > > > > > > > I’m not sure SidebandTunnel.close is necessary. I would presume > > that > > > a > > > > > > SidebandTunnel is closed when its associated statement is closed, > > and > > > > > only > > > > > > then. > > > > > > > > > > > > > > > > I started without it. My thought was that we may need to signal > that > > > > you've > > > > > gotten all of a sideband stream prior to the close of a particular > > > > > fragment. If I'm on the downside of an operation reporting multiple > > > > skips, > > > > > I may want to hold off on reporting to the user until I got all of > > the > > > > > messages. One option is for the sender to send a discrete message > via > > > the > > > > > Tunnel close. The other option is a implicit message when the > > fragment > > > is > > > > > completed. I like the latter from a cleanliness perspective but > think > > > the > > > > > former may be required. I'm ok for not exposing at the tunnel level > > > > > publically initially and we can always expose later. I would love > to > > > hear > > > > > whether people think there is going to be a need/use case to > continue > > > > > fragment operation but have another operator know that a sideband > > > stream > > > > is > > > > > complete. Maybe when sending a downstream set of samples on the > first > > > 1mm > > > > > records of a larger scan? > > > > > > > > > > > > > > > > Also, would it be easier if the tunnels were defined as part of > the > > > > DAG, > > > > > > and DAG initialization time was the only time that they could be > > > > created? > > > > > > > > > > > > > > > > That is a really good question. I need to think about it a bit. I'm > > not > > > > > sure it is easier given my initial proposal is to piggy-back on the > > > > > DataTunnel, (which is independent of DAG initialization). However, > > it > > > > > might be cleaner if operators have to declare this relationship at > > > > > initialization time and it is all managed 'outside'. > > > > > > > > > > Thanks for the feedback. Will need to think further on your last > > point > > > > > especially. > > > > > > > > > > > > > > > > > > > > > > Julian > > > > > > > > > > > > > > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau < > [email protected]> > > > > > wrote: > > > > > > > > > > > > > > Please see some initial thoughts attached. Would love feedback > > and > > > > > > thoughts > > > > > > > from others on how we can shape this. > > > > > > > > > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99 > > > > > > > > > > > > > > -- > > > > > > > Jacques Nadeau > > > > > > > CTO and Co-Founder, Dremio > > > > > > > > > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong < > [email protected] > > > > > > > > wrote: > > > > > > > > > > > > > >> Yes, it would be great to get your thoughts so we can assess > the > > > > scope > > > > > > of > > > > > > >> what's involved. > > > > > > >> > > > > > > >> Thanks. > > > > > > >> > > > > > > >> -- Zelaine > > > > > > >> > > > > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau < > > > [email protected]> > > > > > > wrote: > > > > > > >> > > > > > > >>> Definitely agree that we shouldn't boil the ocean. That > said, > > I > > > > > don't > > > > > > >>> think we should make RecordBatch interface changes without > > > > deliberate > > > > > > >>> design. Same for RPC protocol changes. Part of my internal > > > struggle > > > > > > with > > > > > > >>> the warning patch is exactly this lack of broader design. I > > think > > > > > this > > > > > > is > > > > > > >>> especially true given the drive to supports backwards > > > > compatibility. > > > > > > >>> > > > > > > >>> I don't think we're talking about a massive undertaking. I'll > > try > > > > to > > > > > > >> write > > > > > > >>> up some thoughts later this week to get the ball rolling. > Sound > > > > good? > > > > > > >>> > > > > > > >>> -- > > > > > > >>> Jacques Nadeau > > > > > > >>> CTO and Co-Founder, Dremio > > > > > > >>> +1 on having a framework. > > > > > > >>> OTOH, as with the warnings implementation, we might want to > go > > > > ahead > > > > > > >> with a > > > > > > >>> simpler implementation while we get a more generic framework > > > design > > > > > in > > > > > > >>> place. > > > > > > >>> > > > > > > >>> Jacques, do you have any preliminary thoughts on the > framework? > > > > > > >>> > > > > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian Hyde < > [email protected]> > > > > > wrote: > > > > > > >>> > > > > > > >>>> +1 for a sideband mechanism. > > > > > > >>>> > > > > > > >>>> Sideband can also allow correlated restart of sub-queries. > > > > > > >>>> > > > > > > >>>> In sideband use cases you described, the messages ran in the > > > > > opposite > > > > > > >>>> direction to the data. Would the sideband also run in the > same > > > > > > >> direction > > > > > > >>> as > > > > > > >>>> the data? If so it could carry warnings, rejected rows, > > progress > > > > > > >>>> indications, and (for online aggregation[1]) notifications > > that > > > a > > > > > > >> better > > > > > > >>>> approximate query result is available. > > > > > > >>>> > > > > > > >>>> Julian > > > > > > >>>> > > > > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques Nadeau < > > [email protected] > > > > > > > > > > >> wrote: > > > > > > >>>>> > > > > > > >>>>> This seems like a form of sideband communication. I think > we > > > > should > > > > > > >>> have > > > > > > >>>> a > > > > > > >>>>> framework for this type of thing in general rather than a > > > one-off > > > > > for > > > > > > >>>> this > > > > > > >>>>> particular need. Other forms of sideband might be small > table > > > > > > >>> bloomfilter > > > > > > >>>>> generation and pushdown into hbase, separate file > > > > > > >>> assignment/partitioning > > > > > > >>>>> providers balancing/generating scanner workloads, > statistics > > > > > > >> generation > > > > > > >>>> for > > > > > > >>>>> adaptive execution, etc. > > > > > > >>>>> > > > > > > >>>>> -- > > > > > > >>>>> Jacques Nadeau > > > > > > >>>>> CTO and Co-Founder, Dremio > > > > > > >>>>> > > > > > > >>>>> On Tue, Dec 1, 2015 at 11:35 AM, Hsuan Yi Chu < > > > > [email protected] > > > > > > > > > > > > >>>> wrote: > > > > > > >>>>> > > > > > > >>>>>> I am trying to deal with the following scenario: > > > > > > >>>>>> > > > > > > >>>>>> A bunch of minor fragments are doing things in parallel. > > Each > > > of > > > > > > >> them > > > > > > >>>> could > > > > > > >>>>>> skip some records. Since the downstream minor fragment > needs > > > to > > > > > know > > > > > > >>> the > > > > > > >>>>>> sum of skipped-record-counts (in order to just display or > > see > > > if > > > > > the > > > > > > >>>> number > > > > > > >>>>>> exceeds the threshold) in the upstreams, each upstream > minor > > > > > > >> fragment > > > > > > >>>> needs > > > > > > >>>>>> to pass this scalar with RecordBatch. > > > > > > >>>>>> > > > > > > >>>>>> Since this seems impacting the protocol of RecordBatch, I > am > > > > > looking > > > > > > >>> for > > > > > > >>>>>> some advice here. > > > > > > >>>>>> > > > > > > >>>>>> Thanks. > > > > > > >>>>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >
