I don't know that SDF vs. BoundedSources changes things here--for both one
can implement take(n) by running until one has N elements and then
canceling the pipeline.

One could have a more sophisticated First(n) operator that has a
"back-edge" to checkpoint/splits the upstream operators once a sufficient
number of elements has been observed.

Note that both of these don't solve the issue that Read + GBK + take(N)
would have to do the full Read+GBK for a batch pipeline.

On Thu, Jan 21, 2021 at 10:25 AM Kenneth Knowles <[email protected]> wrote:

> I forgot two things:
>
> 1. I totally agree that this is a good opportunity to make Beam more
> useful. Different engines have their own similar abilities some time, but
> making it available across the runners and xlang transforms, etc, is way
> cool.
> 2. You can of course do the same trick for a distributed runner by using a
> message queue between the pipeline and the controller program. And
> interactive Beam Java, or improving/unifying the concepts between
> Python/Java/SQL (Go?) would be great. Not sure how much code can be reused.
>
> Kenn
>
> On Thu, Jan 21, 2021 at 10:15 AM Kenneth Knowles <[email protected]> wrote:
>
>> I think the approach used in the SQL CLI to implement a LIMIT clause may
>> work for some cases. It only works in the same process with the
>> DirectRunner. It doesn't sample at the source, because you never know what
>> will happen in the query. Instead it counts outputs and then cancels the
>> job when it has enough:
>> https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200
>>
>> However if your pipeline starts with a read of 1000s of files it may be a
>> different pattern for invoking SDF:
>>
>> 1. initial splits do not matter much, probably
>> 2. you want to checkpoint and emit values so that the end output of the
>> pipeline can receive them to cancel it; you don't want to read a whole
>> restriction like in a batch case
>>
>> I don't know the status of this, if it needs special treatment or not.
>> There may also be the issue that SDF is more actively developed in portable
>> runners and less so in classic runners.
>>
>> Kenn
>>
>> On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <[email protected]> wrote:
>>
>>> > Those are good points. Do you know if the Interactive Runner has been
>>> tried in those instances? If so, what were the shortcomings?
>>>
>>> I am not aware of experiences or shortcomings with the Interactive
>>> Runner. The issue is that the Interactive runner is based on python
>>> and all the tools I mention above are Java-based so Python probably
>>> won't be a valid alternative.
>>>
>>> What is concerning for me is that in other similar systems (e.g.
>>> Spark, Flink) a developer can consistently do a `.take(n)` read from a
>>> data source and have results in constant time almost independently of
>>> the size of the targeted data. This allows to  iterate faster and
>>> improve the developer experience.
>>>
>>> What is not clear for me yet is how we can achieve this in a clean
>>> way, given all the 'wrappings' we already have in translation time. I
>>> don't know if there could be a way to override some default
>>> translation(s) to achieve this. Any ideas maybe?
>>>
>>>
>>> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <[email protected]> wrote:
>>> >
>>> > Hi Ismael,
>>> >
>>> > Those are good points. Do you know if the Interactive Runner has been
>>> tried in those instances? If so, what were the shortcomings?
>>> >
>>> > I can also see the use of sampling for a performance benchmarking
>>> reason. We have seen others send in known elements which are tracked
>>> throughout the pipeline to generate timings for each transform/stage.
>>> >
>>> > -Sam
>>> >
>>> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <[email protected]>
>>> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> The use of direct runner for interactive local use cases has increased
>>> >> with the years on Beam due to projects like Scio, Kettle/Hop and our
>>> >> own SQL CLI. All these tools have in common one thing, they show a
>>> >> sample of some source input to the user and interactively apply
>>> >> transforms to it to help users build Pipelines more rapidly.
>>> >>
>>> >> If you build a pipeline today to produce this sample using the Beam’s
>>> >> Sample transform from a set of files, the read of the files happens
>>> >> first and then the sample, so the more files or the bigger they are
>>> >> the longer it takes to produce the sample even if the number of
>>> >> elements expected to read is constant.
>>> >>
>>> >> During Beam Summit last year there were some discussions about how we
>>> >> could improve this scenario (and others) but I have the impression no
>>> >> further discussions happened in the mailing list, so I wanted to know
>>> >> if there are some ideas about how we can get direct runner to improve
>>> >> this case.
>>> >>
>>> >> It seems to me that we can still ‘force’ the count with some static
>>> >> field because it is not a distributed case but I don’t know how we can
>>> >> stop reading once we have the number of sampled elements in a generic
>>> >> way, specially now it seems to me a bit harder to do with pure DoFn
>>> >> (SDF) APIs vs old Source ones, but well that’s just a guess.
>>> >>
>>> >> Does anyone have an idea of how could we generalize this and of course
>>> >> if you see the value of such use case, other ideas for improvements?
>>> >>
>>> >> Regards,
>>> >> Ismaël
>>>
>>

Reply via email to