On Thu, Jan 21, 2021 at 12:55 PM Ismaël Mejía <[email protected]> wrote:
> Thanks Kenn! That sounds like a good and achievable strategy to get > the first/limit results. I will check the code to see if we can reuse > this logic, the extra question is if we may fit in the direct runner > for the general use case (not only SQL) maybe via some PipelineOptions > of the runner. > > > Note that both of these don't solve the issue that Read + GBK + take(N) > would have to do the full Read+GBK for a batch pipeline. > > Just to confirm that I understand correctly Robert, This is not just for Read. In batch, you may have [any large pipeline] + GBK + take(N), and due to GBK being a barrier there's no way to not execute all of [any large pipeline]. > you mention this > for example for the case of IOs where we can match 1000s of > `ReadableFiles` and we will necessarily end up distributing and > reading the thousands until we have the take(N) results. You mean we > cannot avoid this. > Yes, you'll match thousands of files, but the concurrency of your read depends on how many workers you have. If, say, you have 16 worker threads you'll quickly reach N records and be able to abort much earlier. > I was wondering if with SDF we could have a generic solution > (specially now that most translations are based on SDF), maybe some > sort of 'BoundedRestrictionTracker' to deal with the limit and then > stop producing output. Maybe Boyuan, Luke or Robert can have an idea > if this approach is really viable or there can be issues. I am saying > this in the context of finding a solution for all runners. > Something like "Please abort the pipeline once PCollection X has at least N records?" The one way SDF plays into this is it provides the runner the ability to say to these (potentially long-running usefns) "please stop gracefully as soon as you can." > > On Thu, Jan 21, 2021 at 8:34 PM Robert Bradshaw <[email protected]> > wrote: > > > > I don't know that SDF vs. BoundedSources changes things here--for both > one can implement take(n) by running until one has N elements and then > canceling the pipeline. > > > > One could have a more sophisticated First(n) operator that has a > "back-edge" to checkpoint/splits the upstream operators once a sufficient > number of elements has been observed. > > > > Note that both of these don't solve the issue that Read + GBK + take(N) > would have to do the full Read+GBK for a batch pipeline. > > > > On Thu, Jan 21, 2021 at 10:25 AM Kenneth Knowles <[email protected]> > wrote: > >> > >> I forgot two things: > >> > >> 1. I totally agree that this is a good opportunity to make Beam more > useful. Different engines have their own similar abilities some time, but > making it available across the runners and xlang transforms, etc, is way > cool. > >> 2. You can of course do the same trick for a distributed runner by > using a message queue between the pipeline and the controller program. And > interactive Beam Java, or improving/unifying the concepts between > Python/Java/SQL (Go?) would be great. Not sure how much code can be reused. > >> > >> Kenn > >> > >> On Thu, Jan 21, 2021 at 10:15 AM Kenneth Knowles <[email protected]> > wrote: > >>> > >>> I think the approach used in the SQL CLI to implement a LIMIT clause > may work for some cases. It only works in the same process with the > DirectRunner. It doesn't sample at the source, because you never know what > will happen in the query. Instead it counts outputs and then cancels the > job when it has enough: > https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200 > >>> > >>> However if your pipeline starts with a read of 1000s of files it may > be a different pattern for invoking SDF: > >>> > >>> 1. initial splits do not matter much, probably > >>> 2. you want to checkpoint and emit values so that the end output of > the pipeline can receive them to cancel it; you don't want to read a whole > restriction like in a batch case > >>> > >>> I don't know the status of this, if it needs special treatment or not. > There may also be the issue that SDF is more actively developed in portable > runners and less so in classic runners. > >>> > >>> Kenn > >>> > >>> On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <[email protected]> wrote: > >>>> > >>>> > Those are good points. Do you know if the Interactive Runner has > been tried in those instances? If so, what were the shortcomings? > >>>> > >>>> I am not aware of experiences or shortcomings with the Interactive > >>>> Runner. The issue is that the Interactive runner is based on python > >>>> and all the tools I mention above are Java-based so Python probably > >>>> won't be a valid alternative. > >>>> > >>>> What is concerning for me is that in other similar systems (e.g. > >>>> Spark, Flink) a developer can consistently do a `.take(n)` read from a > >>>> data source and have results in constant time almost independently of > >>>> the size of the targeted data. This allows to iterate faster and > >>>> improve the developer experience. > >>>> > >>>> What is not clear for me yet is how we can achieve this in a clean > >>>> way, given all the 'wrappings' we already have in translation time. I > >>>> don't know if there could be a way to override some default > >>>> translation(s) to achieve this. Any ideas maybe? > >>>> > >>>> > >>>> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <[email protected]> wrote: > >>>> > > >>>> > Hi Ismael, > >>>> > > >>>> > Those are good points. Do you know if the Interactive Runner has > been tried in those instances? If so, what were the shortcomings? > >>>> > > >>>> > I can also see the use of sampling for a performance benchmarking > reason. We have seen others send in known elements which are tracked > throughout the pipeline to generate timings for each transform/stage. > >>>> > > >>>> > -Sam > >>>> > > >>>> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <[email protected]> > wrote: > >>>> >> > >>>> >> Hello, > >>>> >> > >>>> >> The use of direct runner for interactive local use cases has > increased > >>>> >> with the years on Beam due to projects like Scio, Kettle/Hop and > our > >>>> >> own SQL CLI. All these tools have in common one thing, they show a > >>>> >> sample of some source input to the user and interactively apply > >>>> >> transforms to it to help users build Pipelines more rapidly. > >>>> >> > >>>> >> If you build a pipeline today to produce this sample using the > Beam’s > >>>> >> Sample transform from a set of files, the read of the files happens > >>>> >> first and then the sample, so the more files or the bigger they are > >>>> >> the longer it takes to produce the sample even if the number of > >>>> >> elements expected to read is constant. > >>>> >> > >>>> >> During Beam Summit last year there were some discussions about how > we > >>>> >> could improve this scenario (and others) but I have the impression > no > >>>> >> further discussions happened in the mailing list, so I wanted to > know > >>>> >> if there are some ideas about how we can get direct runner to > improve > >>>> >> this case. > >>>> >> > >>>> >> It seems to me that we can still ‘force’ the count with some static > >>>> >> field because it is not a distributed case but I don’t know how we > can > >>>> >> stop reading once we have the number of sampled elements in a > generic > >>>> >> way, specially now it seems to me a bit harder to do with pure DoFn > >>>> >> (SDF) APIs vs old Source ones, but well that’s just a guess. > >>>> >> > >>>> >> Does anyone have an idea of how could we generalize this and of > course > >>>> >> if you see the value of such use case, other ideas for > improvements? > >>>> >> > >>>> >> Regards, > >>>> >> Ismaël >
