I don't know that SDF vs. BoundedSources changes things here--for both one can implement take(n) by running until one has N elements and then canceling the pipeline.
One could have a more sophisticated First(n) operator that has a "back-edge" to checkpoint/splits the upstream operators once a sufficient number of elements has been observed. Note that both of these don't solve the issue that Read + GBK + take(N) would have to do the full Read+GBK for a batch pipeline. On Thu, Jan 21, 2021 at 10:25 AM Kenneth Knowles <[email protected]> wrote: > I forgot two things: > > 1. I totally agree that this is a good opportunity to make Beam more > useful. Different engines have their own similar abilities some time, but > making it available across the runners and xlang transforms, etc, is way > cool. > 2. You can of course do the same trick for a distributed runner by using a > message queue between the pipeline and the controller program. And > interactive Beam Java, or improving/unifying the concepts between > Python/Java/SQL (Go?) would be great. Not sure how much code can be reused. > > Kenn > > On Thu, Jan 21, 2021 at 10:15 AM Kenneth Knowles <[email protected]> wrote: > >> I think the approach used in the SQL CLI to implement a LIMIT clause may >> work for some cases. It only works in the same process with the >> DirectRunner. It doesn't sample at the source, because you never know what >> will happen in the query. Instead it counts outputs and then cancels the >> job when it has enough: >> https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200 >> >> However if your pipeline starts with a read of 1000s of files it may be a >> different pattern for invoking SDF: >> >> 1. initial splits do not matter much, probably >> 2. you want to checkpoint and emit values so that the end output of the >> pipeline can receive them to cancel it; you don't want to read a whole >> restriction like in a batch case >> >> I don't know the status of this, if it needs special treatment or not. >> There may also be the issue that SDF is more actively developed in portable >> runners and less so in classic runners. >> >> Kenn >> >> On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <[email protected]> wrote: >> >>> > Those are good points. Do you know if the Interactive Runner has been >>> tried in those instances? If so, what were the shortcomings? >>> >>> I am not aware of experiences or shortcomings with the Interactive >>> Runner. The issue is that the Interactive runner is based on python >>> and all the tools I mention above are Java-based so Python probably >>> won't be a valid alternative. >>> >>> What is concerning for me is that in other similar systems (e.g. >>> Spark, Flink) a developer can consistently do a `.take(n)` read from a >>> data source and have results in constant time almost independently of >>> the size of the targeted data. This allows to iterate faster and >>> improve the developer experience. >>> >>> What is not clear for me yet is how we can achieve this in a clean >>> way, given all the 'wrappings' we already have in translation time. I >>> don't know if there could be a way to override some default >>> translation(s) to achieve this. Any ideas maybe? >>> >>> >>> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <[email protected]> wrote: >>> > >>> > Hi Ismael, >>> > >>> > Those are good points. Do you know if the Interactive Runner has been >>> tried in those instances? If so, what were the shortcomings? >>> > >>> > I can also see the use of sampling for a performance benchmarking >>> reason. We have seen others send in known elements which are tracked >>> throughout the pipeline to generate timings for each transform/stage. >>> > >>> > -Sam >>> > >>> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <[email protected]> >>> wrote: >>> >> >>> >> Hello, >>> >> >>> >> The use of direct runner for interactive local use cases has increased >>> >> with the years on Beam due to projects like Scio, Kettle/Hop and our >>> >> own SQL CLI. All these tools have in common one thing, they show a >>> >> sample of some source input to the user and interactively apply >>> >> transforms to it to help users build Pipelines more rapidly. >>> >> >>> >> If you build a pipeline today to produce this sample using the Beam’s >>> >> Sample transform from a set of files, the read of the files happens >>> >> first and then the sample, so the more files or the bigger they are >>> >> the longer it takes to produce the sample even if the number of >>> >> elements expected to read is constant. >>> >> >>> >> During Beam Summit last year there were some discussions about how we >>> >> could improve this scenario (and others) but I have the impression no >>> >> further discussions happened in the mailing list, so I wanted to know >>> >> if there are some ideas about how we can get direct runner to improve >>> >> this case. >>> >> >>> >> It seems to me that we can still ‘force’ the count with some static >>> >> field because it is not a distributed case but I don’t know how we can >>> >> stop reading once we have the number of sampled elements in a generic >>> >> way, specially now it seems to me a bit harder to do with pure DoFn >>> >> (SDF) APIs vs old Source ones, but well that’s just a guess. >>> >> >>> >> Does anyone have an idea of how could we generalize this and of course >>> >> if you see the value of such use case, other ideas for improvements? >>> >> >>> >> Regards, >>> >> Ismaël >>> >>
