On Thu, Jan 21, 2021 at 12:55 PM Ismaël Mejía <[email protected]> wrote:

> Thanks Kenn! That sounds like a good and achievable strategy to get
> the first/limit results. I will check the code to see if we can reuse
> this logic, the extra question is if we may fit in the direct runner
> for the general use case (not only SQL) maybe via some PipelineOptions
> of the runner.
>
> > Note that both of these don't solve the issue that Read + GBK + take(N)
> would have to do the full Read+GBK for a batch pipeline.
>
> Just to confirm that I understand correctly Robert,


This is not just for Read. In batch, you may have [any large pipeline] +
GBK + take(N), and due to GBK being a barrier there's no way to not execute
all of [any large pipeline].


> you mention this
> for example for the case of IOs where we can match 1000s of
> `ReadableFiles` and we will necessarily end up distributing and
> reading the thousands until we have the take(N) results. You mean we
> cannot avoid this.
>

Yes, you'll match thousands of files, but the concurrency of your read
depends on how many workers you have. If, say, you have 16 worker threads
you'll quickly reach N records and be able to abort much earlier.


> I was wondering if with SDF we could have a generic solution
> (specially now that most translations are based on SDF), maybe some
> sort of 'BoundedRestrictionTracker' to deal with the limit and then
> stop producing output. Maybe Boyuan, Luke or Robert can have an idea
> if this approach is really viable or there can be issues. I am saying
> this in the context of finding a solution for all runners.
>

Something like "Please abort the pipeline once PCollection X has at least N
records?" The one way SDF plays into this is it provides the runner the
ability to say to these (potentially long-running usefns) "please stop
gracefully as soon as you can."


>
> On Thu, Jan 21, 2021 at 8:34 PM Robert Bradshaw <[email protected]>
> wrote:
> >
> > I don't know that SDF vs. BoundedSources changes things here--for both
> one can implement take(n) by running until one has N elements and then
> canceling the pipeline.
> >
> > One could have a more sophisticated First(n) operator that has a
> "back-edge" to checkpoint/splits the upstream operators once a sufficient
> number of elements has been observed.
> >
> > Note that both of these don't solve the issue that Read + GBK + take(N)
> would have to do the full Read+GBK for a batch pipeline.
> >
> > On Thu, Jan 21, 2021 at 10:25 AM Kenneth Knowles <[email protected]>
> wrote:
> >>
> >> I forgot two things:
> >>
> >> 1. I totally agree that this is a good opportunity to make Beam more
> useful. Different engines have their own similar abilities some time, but
> making it available across the runners and xlang transforms, etc, is way
> cool.
> >> 2. You can of course do the same trick for a distributed runner by
> using a message queue between the pipeline and the controller program. And
> interactive Beam Java, or improving/unifying the concepts between
> Python/Java/SQL (Go?) would be great. Not sure how much code can be reused.
> >>
> >> Kenn
> >>
> >> On Thu, Jan 21, 2021 at 10:15 AM Kenneth Knowles <[email protected]>
> wrote:
> >>>
> >>> I think the approach used in the SQL CLI to implement a LIMIT clause
> may work for some cases. It only works in the same process with the
> DirectRunner. It doesn't sample at the source, because you never know what
> will happen in the query. Instead it counts outputs and then cancels the
> job when it has enough:
> https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200
> >>>
> >>> However if your pipeline starts with a read of 1000s of files it may
> be a different pattern for invoking SDF:
> >>>
> >>> 1. initial splits do not matter much, probably
> >>> 2. you want to checkpoint and emit values so that the end output of
> the pipeline can receive them to cancel it; you don't want to read a whole
> restriction like in a batch case
> >>>
> >>> I don't know the status of this, if it needs special treatment or not.
> There may also be the issue that SDF is more actively developed in portable
> runners and less so in classic runners.
> >>>
> >>> Kenn
> >>>
> >>> On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <[email protected]> wrote:
> >>>>
> >>>> > Those are good points. Do you know if the Interactive Runner has
> been tried in those instances? If so, what were the shortcomings?
> >>>>
> >>>> I am not aware of experiences or shortcomings with the Interactive
> >>>> Runner. The issue is that the Interactive runner is based on python
> >>>> and all the tools I mention above are Java-based so Python probably
> >>>> won't be a valid alternative.
> >>>>
> >>>> What is concerning for me is that in other similar systems (e.g.
> >>>> Spark, Flink) a developer can consistently do a `.take(n)` read from a
> >>>> data source and have results in constant time almost independently of
> >>>> the size of the targeted data. This allows to  iterate faster and
> >>>> improve the developer experience.
> >>>>
> >>>> What is not clear for me yet is how we can achieve this in a clean
> >>>> way, given all the 'wrappings' we already have in translation time. I
> >>>> don't know if there could be a way to override some default
> >>>> translation(s) to achieve this. Any ideas maybe?
> >>>>
> >>>>
> >>>> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <[email protected]> wrote:
> >>>> >
> >>>> > Hi Ismael,
> >>>> >
> >>>> > Those are good points. Do you know if the Interactive Runner has
> been tried in those instances? If so, what were the shortcomings?
> >>>> >
> >>>> > I can also see the use of sampling for a performance benchmarking
> reason. We have seen others send in known elements which are tracked
> throughout the pipeline to generate timings for each transform/stage.
> >>>> >
> >>>> > -Sam
> >>>> >
> >>>> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <[email protected]>
> wrote:
> >>>> >>
> >>>> >> Hello,
> >>>> >>
> >>>> >> The use of direct runner for interactive local use cases has
> increased
> >>>> >> with the years on Beam due to projects like Scio, Kettle/Hop and
> our
> >>>> >> own SQL CLI. All these tools have in common one thing, they show a
> >>>> >> sample of some source input to the user and interactively apply
> >>>> >> transforms to it to help users build Pipelines more rapidly.
> >>>> >>
> >>>> >> If you build a pipeline today to produce this sample using the
> Beam’s
> >>>> >> Sample transform from a set of files, the read of the files happens
> >>>> >> first and then the sample, so the more files or the bigger they are
> >>>> >> the longer it takes to produce the sample even if the number of
> >>>> >> elements expected to read is constant.
> >>>> >>
> >>>> >> During Beam Summit last year there were some discussions about how
> we
> >>>> >> could improve this scenario (and others) but I have the impression
> no
> >>>> >> further discussions happened in the mailing list, so I wanted to
> know
> >>>> >> if there are some ideas about how we can get direct runner to
> improve
> >>>> >> this case.
> >>>> >>
> >>>> >> It seems to me that we can still ‘force’ the count with some static
> >>>> >> field because it is not a distributed case but I don’t know how we
> can
> >>>> >> stop reading once we have the number of sampled elements in a
> generic
> >>>> >> way, specially now it seems to me a bit harder to do with pure DoFn
> >>>> >> (SDF) APIs vs old Source ones, but well that’s just a guess.
> >>>> >>
> >>>> >> Does anyone have an idea of how could we generalize this and of
> course
> >>>> >> if you see the value of such use case, other ideas for
> improvements?
> >>>> >>
> >>>> >> Regards,
> >>>> >> Ismaël
>

Reply via email to