I think the approach used in the SQL CLI to implement a LIMIT clause may
work for some cases. It only works in the same process with the
DirectRunner. It doesn't sample at the source, because you never know what
will happen in the query. Instead it counts outputs and then cancels the
job when it has enough:
https://github.com/apache/beam/blob/a72460272354747a54449358f5df414be4b6d72c/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java#L200

However if your pipeline starts with a read of 1000s of files it may be a
different pattern for invoking SDF:

1. initial splits do not matter much, probably
2. you want to checkpoint and emit values so that the end output of the
pipeline can receive them to cancel it; you don't want to read a whole
restriction like in a batch case

I don't know the status of this, if it needs special treatment or not.
There may also be the issue that SDF is more actively developed in portable
runners and less so in classic runners.

Kenn

On Wed, Jan 6, 2021 at 9:05 AM Ismaël Mejía <[email protected]> wrote:

> > Those are good points. Do you know if the Interactive Runner has been
> tried in those instances? If so, what were the shortcomings?
>
> I am not aware of experiences or shortcomings with the Interactive
> Runner. The issue is that the Interactive runner is based on python
> and all the tools I mention above are Java-based so Python probably
> won't be a valid alternative.
>
> What is concerning for me is that in other similar systems (e.g.
> Spark, Flink) a developer can consistently do a `.take(n)` read from a
> data source and have results in constant time almost independently of
> the size of the targeted data. This allows to  iterate faster and
> improve the developer experience.
>
> What is not clear for me yet is how we can achieve this in a clean
> way, given all the 'wrappings' we already have in translation time. I
> don't know if there could be a way to override some default
> translation(s) to achieve this. Any ideas maybe?
>
>
> On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde <[email protected]> wrote:
> >
> > Hi Ismael,
> >
> > Those are good points. Do you know if the Interactive Runner has been
> tried in those instances? If so, what were the shortcomings?
> >
> > I can also see the use of sampling for a performance benchmarking
> reason. We have seen others send in known elements which are tracked
> throughout the pipeline to generate timings for each transform/stage.
> >
> > -Sam
> >
> > On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía <[email protected]> wrote:
> >>
> >> Hello,
> >>
> >> The use of direct runner for interactive local use cases has increased
> >> with the years on Beam due to projects like Scio, Kettle/Hop and our
> >> own SQL CLI. All these tools have in common one thing, they show a
> >> sample of some source input to the user and interactively apply
> >> transforms to it to help users build Pipelines more rapidly.
> >>
> >> If you build a pipeline today to produce this sample using the Beam’s
> >> Sample transform from a set of files, the read of the files happens
> >> first and then the sample, so the more files or the bigger they are
> >> the longer it takes to produce the sample even if the number of
> >> elements expected to read is constant.
> >>
> >> During Beam Summit last year there were some discussions about how we
> >> could improve this scenario (and others) but I have the impression no
> >> further discussions happened in the mailing list, so I wanted to know
> >> if there are some ideas about how we can get direct runner to improve
> >> this case.
> >>
> >> It seems to me that we can still ‘force’ the count with some static
> >> field because it is not a distributed case but I don’t know how we can
> >> stop reading once we have the number of sampled elements in a generic
> >> way, specially now it seems to me a bit harder to do with pure DoFn
> >> (SDF) APIs vs old Source ones, but well that’s just a guess.
> >>
> >> Does anyone have an idea of how could we generalize this and of course
> >> if you see the value of such use case, other ideas for improvements?
> >>
> >> Regards,
> >> Ismaël
>

Reply via email to