I think I missed one thing in the second solution.

Drill needs to report the locations of the source of skipped records, which
are known only at Scan. It seems hop-by-hop is needed to carry that
information.

On Sun, Dec 13, 2015 at 4:36 PM, Jacques Nadeau <[email protected]> wrote:

> If your goal is early termination, sending the messages back as quickly as
> possible to the Screen or similar centralized operator will allow you to
> respond quickly. Remember that there will likely be many fragments
> executing in parallel.
>
> --
> Jacques Nadeau
> CTO and Co-Founder, Dremio
>
> On Sun, Dec 13, 2015 at 2:46 PM, Hsuan Yi Chu <[email protected]> wrote:
>
> > On Sun, Dec 13, 2015 at 9:15 AM, Jacques Nadeau <[email protected]>
> > wrote:
> >
> > > You seem to be mixing multiple things in your response.
> > >
> > > - Why do you say this complex? It is very simple. Is it because you
> don't
> > > know how it would be implemented? I'm offering to do the vast majority
> of
> > > the work to implement the framework so you shouldn't use that as a
> gauge.
> > > - It is designed to provide for multiple different use cases, not just
> > your
> > > own. As such, you should expect it to be more general. There is
> clearly a
> > > need to provide these messages in direction other than straight up the
> > > operator tree. There is also a need to provide sideband messages
> outside
> > > the context of a record batch. (We shouldn't be creating fake empty
> > record
> > > batches just to send sideband messages, that caused us problems before
> on
> > > the UserRpc and I think we should compound purposes.)
> > > - You should evaluate whether it would solve the use case you
> presented.
> > I
> > > believe it will.
> > >
> > > As far as your proposed implementation goes: I think you are
> confounding
> > > communication with the user with traversal of the operator tree. I
> would
> > > assume that each operator may be able to skip records. When you
> > accumulate
> > > that information, you would want to know how much skip there were for
> > each
> > > operator. The info might look like:
> > >
> > > skips: [
> > > { op: 1:1:1, records: [123,456,789]}
> > > { op: 1:2:1, records: [123,456,789]}
> > > { op: 1:1:2, records: [123,456,789]}
> > > { op: 1:2:2, records: [123,456,789]}
> > > ]
> > >
> > > In this case, there is no need for operator 1:1:1 to know about
> operator
> > > 1:1:2's skips. It shouldn't even need to manage or move that data. So I
> > > believe your requirements are actually to provide a stream of skip
> > records
> > > to a separate writer that should be on the edge of the plan. The more I
> > > talk through this, I'm wondering if sideband messages should take the
> > same
> > > shape as a separate record batch and that we need to provide a separate
> > > subtree/fragment for this purpose. Sideband in that case would be a tee
> > in
> > > the plan.
> > >
> >
> > For the case of skipping records, we will have a threshold, which defines
> > the bound on # of skipped records before Drill fails the query. Thus, if
> > operator 1:1:1 can be informed of how many records have been skipped in
> the
> > upstream operators, we could fail the query earlier.
> >
> > Given this in mind, we could have two solutions to fail the query
> earlier:
> > 1. Let the sideband message hop from upstream to downstream. On the way,
> > each operator determines to fail the query if the threshold is exceeded.
> >
> > 2. While each operator does work independently, the sideband sink
> operator
> > would be the one and the only one which has the knowledge of how many #
> > have been skipped. Once the threshold is exceeded, this sink operator
> will
> > be responsible to stop the query (via another sideband message to inform
> > the foreman).
> >
> > When I read the proposal, I was thinking about the first solution.
> > Certainly, the second one seems leveraging sideband better.
> >
> > For example, imagine this tree:
> > >
> > >
> > >
> >
> https://docs.google.com/drawings/d/19w7lbpnajsmQPUqzxlb2JP2jr6MsGU9RDOmQO1N05uU/edit
> > >
> > > As you can see, I believe that the vast majority of the issues that you
> > > want to manage with your skip record design can be managed by
> providing a
> > > couple of simple tools: sideband, sideband sink operator (basically a
> > > custom version of the union receiver), and an enhancement to the Screen
> > > operator to support a secondary incoming stream with a defined schema
> > that
> > > will be transformed into a set of warnings (this also allows fine
> grained
> > > warnings or use an aggregate in the secondary tree for aggregate
> > warnings).
> > >
> > > The key goal here is trying to avoid the introduction of a new or more
> > > complicated interfaces at the execution layer and instead use the
> logical
> > > layer to manage things. I believe this also extends to the concept of
> > > $recordIdentifier (or similar). This should simply be a virtual field
> > > produced by all record readers (when requested) that includes relevant
> > > provenance information. If you want to know which records are
> > problematic,
> > > ask for the identifier and then record in a separate file. Basically,
> > let's
> > > use the highly efficient infrastructure we already have to do new
> things
> > > rather than implementing a new set of classes and concepts.
> > >
> > >
> > > --
> > > Jacques Nadeau
> > > CTO and Co-Founder, Dremio
> > >
> > > On Fri, Dec 11, 2015 at 1:16 PM, Hsuan Yi Chu <[email protected]>
> > wrote:
> > >
> > > > The design scope is very general, but, for the applications we are
> > > thinking
> > > > about now, this is a bit complex and will make the solutions a little
> > bit
> > > > indirect. Especially, this one "data to be sent between any two
> > > > three-coordinate locations" implies sideband data goes in teleport?
> > This
> > > is
> > > > a bit too involving. And even for advanced pushdown, it is not
> > necessary
> > > to
> > > > be that flexible for communications.
> > > >
> > > > My original picture of "sideband" is that the additional information
> > > should
> > > > be "associated with" RecordBatch. That means this additional
> > information
> > > > should be attached to a particular RecordBatch and cannot run on
> their
> > > own.
> > > >
> > > > As the RecordBatch flows from upstream to downstream, the operator
> can
> > > > optionally access or update the sideband message.
> > > > For example, in the application of record-skipping, operator can see
> > how
> > > > many records were skipped so far and increment the count if more are
> > > > skipped.
> > > >
> > > > If we go with this design, the place we need to change is on the
> > receiver
> > > > side, which needs to decode the sideband info from the incoming
> > buffers.
> > > >
> > > > On Tue, Dec 8, 2015 at 7:10 PM, Jacques Nadeau <[email protected]>
> > > wrote:
> > > >
> > > > > inline
> > > > >
> > > > > It seems that SidebandTunnel is point-to-point. That is, there is
> one
> > > > > > producer and one consumer. No broadcast or topics (multiple
> > consumers
> > > > of
> > > > > > the same message). Order is preserved. At-most-once (i.e. may
> lose
> > > data
> > > > > in
> > > > > > event of failure). Producer and consumer may be on the same node
> or
> > > > > > different nodes. Correct?
> > > > > >
> > > > >
> > > > > Yes, you are correct in all of this. Since we don't use UDP in
> Drill,
> > > we
> > > > do
> > > > > broadcast as a collection of individual p2p calls, all using the
> same
> > > > > message (and multiple reference counts if using raw bytes).
> > > > >
> > > > >
> > > > > >
> > > > > > I’m not sure SidebandTunnel.close is necessary. I would presume
> > that
> > > a
> > > > > > SidebandTunnel is closed when its associated statement is closed,
> > and
> > > > > only
> > > > > > then.
> > > > > >
> > > > >
> > > > > I started without it. My thought was that we may need to signal
> that
> > > > you've
> > > > > gotten all of a sideband stream prior to the close of a particular
> > > > > fragment. If I'm on the downside of an operation reporting multiple
> > > > skips,
> > > > > I may want to hold off on reporting to the user until I got all of
> > the
> > > > > messages. One option is for the sender to send a discrete message
> via
> > > the
> > > > > Tunnel close. The other option is a implicit message when the
> > fragment
> > > is
> > > > > completed. I like the latter from a cleanliness perspective but
> think
> > > the
> > > > > former may be required. I'm ok for not exposing at the tunnel level
> > > > > publically initially and we can always expose later. I would love
> to
> > > hear
> > > > > whether people think there is going to be a need/use case to
> continue
> > > > > fragment operation but have another operator know that a sideband
> > > stream
> > > > is
> > > > > complete. Maybe when sending a downstream set of samples on the
> first
> > > 1mm
> > > > > records of a larger scan?
> > > > >
> > > > >
> > > > > > Also, would it be easier if the tunnels were defined as part of
> the
> > > > DAG,
> > > > > > and DAG initialization time was the only time that they could be
> > > > created?
> > > > > >
> > > > >
> > > > > That is a really good question. I need to think about it a bit. I'm
> > not
> > > > > sure it is easier given my initial proposal is to piggy-back on the
> > > > > DataTunnel, (which is independent of DAG initialization).  However,
> > it
> > > > > might be cleaner if operators have to declare this relationship at
> > > > > initialization time and it is all managed 'outside'.
> > > > >
> > > > > Thanks for the feedback. Will need to think further on your last
> > point
> > > > > especially.
> > > > >
> > > > >
> > > > > >
> > > > > > Julian
> > > > > >
> > > > > >
> > > > > > > On Dec 8, 2015, at 11:00 AM, Jacques Nadeau <
> [email protected]>
> > > > > wrote:
> > > > > > >
> > > > > > > Please see some initial thoughts attached. Would love feedback
> > and
> > > > > > thoughts
> > > > > > > from others on how we can shape this.
> > > > > > >
> > > > > > > https://gist.github.com/jacques-n/84b13e704e0e3829ca99
> > > > > > >
> > > > > > > --
> > > > > > > Jacques Nadeau
> > > > > > > CTO and Co-Founder, Dremio
> > > > > > >
> > > > > > > On Thu, Dec 3, 2015 at 8:17 AM, Zelaine Fong <
> [email protected]
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, it would be great to get your thoughts so we can assess
> the
> > > > scope
> > > > > > of
> > > > > > >> what's involved.
> > > > > > >>
> > > > > > >> Thanks.
> > > > > > >>
> > > > > > >> -- Zelaine
> > > > > > >>
> > > > > > >> On Wed, Dec 2, 2015 at 7:29 PM, Jacques Nadeau <
> > > [email protected]>
> > > > > > wrote:
> > > > > > >>
> > > > > > >>> Definitely agree that we shouldn't boil the ocean.  That
> said,
> > I
> > > > > don't
> > > > > > >>> think we should make RecordBatch interface changes without
> > > > deliberate
> > > > > > >>> design. Same for RPC protocol changes. Part of my internal
> > > struggle
> > > > > > with
> > > > > > >>> the warning patch is exactly this lack of broader design. I
> > think
> > > > > this
> > > > > > is
> > > > > > >>> especially true given the drive to supports backwards
> > > > compatibility.
> > > > > > >>>
> > > > > > >>> I don't think we're talking about a massive undertaking. I'll
> > try
> > > > to
> > > > > > >> write
> > > > > > >>> up some thoughts later this week to get the ball rolling.
> Sound
> > > > good?
> > > > > > >>>
> > > > > > >>> --
> > > > > > >>> Jacques Nadeau
> > > > > > >>> CTO and Co-Founder, Dremio
> > > > > > >>> +1 on having a framework.
> > > > > > >>> OTOH, as with the warnings implementation, we might want to
> go
> > > > ahead
> > > > > > >> with a
> > > > > > >>> simpler implementation while we get a more generic framework
> > > design
> > > > > in
> > > > > > >>> place.
> > > > > > >>>
> > > > > > >>> Jacques, do you have any preliminary thoughts on the
> framework?
> > > > > > >>>
> > > > > > >>> On Tue, Dec 1, 2015 at 2:08 PM, Julian Hyde <
> [email protected]>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> +1 for a sideband mechanism.
> > > > > > >>>>
> > > > > > >>>> Sideband can also allow correlated restart of sub-queries.
> > > > > > >>>>
> > > > > > >>>> In sideband use cases you described, the messages ran in the
> > > > > opposite
> > > > > > >>>> direction to the data. Would the sideband also run in the
> same
> > > > > > >> direction
> > > > > > >>> as
> > > > > > >>>> the data? If so it could carry warnings, rejected rows,
> > progress
> > > > > > >>>> indications, and (for online aggregation[1]) notifications
> > that
> > > a
> > > > > > >> better
> > > > > > >>>> approximate query result is available.
> > > > > > >>>>
> > > > > > >>>> Julian
> > > > > > >>>>
> > > > > > >>>> [1] https://en.wikipedia.org/wiki/Online_aggregation
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>> On Dec 1, 2015, at 1:51 PM, Jacques Nadeau <
> > [email protected]
> > > >
> > > > > > >> wrote:
> > > > > > >>>>>
> > > > > > >>>>> This seems like a form of sideband communication. I think
> we
> > > > should
> > > > > > >>> have
> > > > > > >>>> a
> > > > > > >>>>> framework for this type of thing in general rather than a
> > > one-off
> > > > > for
> > > > > > >>>> this
> > > > > > >>>>> particular need. Other forms of sideband might be small
> table
> > > > > > >>> bloomfilter
> > > > > > >>>>> generation and pushdown into hbase, separate file
> > > > > > >>> assignment/partitioning
> > > > > > >>>>> providers balancing/generating scanner workloads,
> statistics
> > > > > > >> generation
> > > > > > >>>> for
> > > > > > >>>>> adaptive execution, etc.
> > > > > > >>>>>
> > > > > > >>>>> --
> > > > > > >>>>> Jacques Nadeau
> > > > > > >>>>> CTO and Co-Founder, Dremio
> > > > > > >>>>>
> > > > > > >>>>> On Tue, Dec 1, 2015 at 11:35 AM, Hsuan Yi Chu <
> > > > [email protected]
> > > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> I am trying to deal with the following scenario:
> > > > > > >>>>>>
> > > > > > >>>>>> A bunch of minor fragments are doing things in parallel.
> > Each
> > > of
> > > > > > >> them
> > > > > > >>>> could
> > > > > > >>>>>> skip some records. Since the downstream minor fragment
> needs
> > > to
> > > > > know
> > > > > > >>> the
> > > > > > >>>>>> sum of skipped-record-counts (in order to just display or
> > see
> > > if
> > > > > the
> > > > > > >>>> number
> > > > > > >>>>>> exceeds the threshold) in the upstreams, each upstream
> minor
> > > > > > >> fragment
> > > > > > >>>> needs
> > > > > > >>>>>> to pass this scalar with RecordBatch.
> > > > > > >>>>>>
> > > > > > >>>>>> Since this seems impacting the protocol of RecordBatch, I
> am
> > > > > looking
> > > > > > >>> for
> > > > > > >>>>>> some advice here.
> > > > > > >>>>>>
> > > > > > >>>>>> Thanks.
> > > > > > >>>>>>
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to