This PR has been submitted and Watch is available.

Next PR https://github.com/apache/beam/pull/3607 is in review for adding
TextIO.read().watchForNewFiles()! (will extend it to Avro and provide
utility transforms for authors of similar IOs)

On Wed, Jul 19, 2017 at 8:37 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi all,
>
> A PR for this is in review https://github.com/apache/beam/pull/3565
>
> I'm very excited about this, because:
> - I think it turned out to be a very neat API and I'm looking forward to
> people coming up with more use cases for it
> - I was pleasantly surprised by being able to come up with a good way to
> test this in great detail
> - This PR validated that SDF is the right API for this sort of thing:
> developing this transform did not require changes to SDF API or semantics,
> even for a highly nontrivial aspect like watermarks
> - I believe it's in good shape for being used in
> TextIO.read()/readAll().watchForNewFiles(), which will be the first use
> case of SDF for a previously impossible but much wanted IO connector! I'm
> going to implement these tomorrow.
>
> On Thu, Jun 29, 2017 at 4:43 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Hi all,
>>
>> Please take a look at this short proposal that came out of implementing
>> http://s.apache.org/textio-sdf. I think it's a nice generalization. I
>> would welcome comments on the proposed API, or corner cases of semantics
>> that I haven't thought about, or more generalizations, etc.
>>
>> http://s.apache.org/beam-watch-transform
>>
>> We propose a PTransform Watch.growthOf(poll function) that repeatedly
>> polls sets associated with each of its inputs and continuously produces new
>> elements in each set until a per-set termination condition is reached. It
>> is a generalization of "watch filepattern for matching files".
>>
>> Code snippet:
>>
>> PCollection<InputT> inputs = …;
>> PCollection<KV<InputT, OutputT>> outputs = inputs.apply(
>>     Watch.growthOf((InputT input, PollReceiver<OutputT> out) → {
>>            … out.put(timestamp, value) …
>>            return Watch.outputCanGrow().withWatermark( … );
>>            // or:
>>            return Watch.outputIsFinal();
>>          })
>>          .withPollInterval(10 sec)
>>          .withTerminationPerInput(
>>              Watch.afterEitherOf(
>>                Watch.afterTotalOf(5 min),
>>                Watch.afterOutputStableFor(1 min)))
>>          .withOutputCoder( … ))
>>
>> Thanks!
>>
>

Reply via email to