Hi all,

A PR for this is in review https://github.com/apache/beam/pull/3565

I'm very excited about this, because:
- I think it turned out to be a very neat API and I'm looking forward to
people coming up with more use cases for it
- I was pleasantly surprised by being able to come up with a good way to
test this in great detail
- This PR validated that SDF is the right API for this sort of thing:
developing this transform did not require changes to SDF API or semantics,
even for a highly nontrivial aspect like watermarks
- I believe it's in good shape for being used in
TextIO.read()/readAll().watchForNewFiles(), which will be the first use
case of SDF for a previously impossible but much wanted IO connector! I'm
going to implement these tomorrow.

On Thu, Jun 29, 2017 at 4:43 PM Eugene Kirpichov <[email protected]>
wrote:

> Hi all,
>
> Please take a look at this short proposal that came out of implementing
> http://s.apache.org/textio-sdf. I think it's a nice generalization. I
> would welcome comments on the proposed API, or corner cases of semantics
> that I haven't thought about, or more generalizations, etc.
>
> http://s.apache.org/beam-watch-transform
>
> We propose a PTransform Watch.growthOf(poll function) that repeatedly
> polls sets associated with each of its inputs and continuously produces new
> elements in each set until a per-set termination condition is reached. It
> is a generalization of "watch filepattern for matching files".
>
> Code snippet:
>
> PCollection<InputT> inputs = …;
> PCollection<KV<InputT, OutputT>> outputs = inputs.apply(
>     Watch.growthOf((InputT input, PollReceiver<OutputT> out) → {
>            … out.put(timestamp, value) …
>            return Watch.outputCanGrow().withWatermark( … );
>            // or:
>            return Watch.outputIsFinal();
>          })
>          .withPollInterval(10 sec)
>          .withTerminationPerInput(
>              Watch.afterEitherOf(
>                Watch.afterTotalOf(5 min),
>                Watch.afterOutputStableFor(1 min)))
>          .withOutputCoder( … ))
>
> Thanks!
>

Reply via email to