This PR has been submitted and Watch is available. Next PR https://github.com/apache/beam/pull/3607 is in review for adding TextIO.read().watchForNewFiles()! (will extend it to Avro and provide utility transforms for authors of similar IOs)
On Wed, Jul 19, 2017 at 8:37 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Hi all, > > A PR for this is in review https://github.com/apache/beam/pull/3565 > > I'm very excited about this, because: > - I think it turned out to be a very neat API and I'm looking forward to > people coming up with more use cases for it > - I was pleasantly surprised by being able to come up with a good way to > test this in great detail > - This PR validated that SDF is the right API for this sort of thing: > developing this transform did not require changes to SDF API or semantics, > even for a highly nontrivial aspect like watermarks > - I believe it's in good shape for being used in > TextIO.read()/readAll().watchForNewFiles(), which will be the first use > case of SDF for a previously impossible but much wanted IO connector! I'm > going to implement these tomorrow. > > On Thu, Jun 29, 2017 at 4:43 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Hi all, >> >> Please take a look at this short proposal that came out of implementing >> http://s.apache.org/textio-sdf. I think it's a nice generalization. I >> would welcome comments on the proposed API, or corner cases of semantics >> that I haven't thought about, or more generalizations, etc. >> >> http://s.apache.org/beam-watch-transform >> >> We propose a PTransform Watch.growthOf(poll function) that repeatedly >> polls sets associated with each of its inputs and continuously produces new >> elements in each set until a per-set termination condition is reached. It >> is a generalization of "watch filepattern for matching files". >> >> Code snippet: >> >> PCollection<InputT> inputs = …; >> PCollection<KV<InputT, OutputT>> outputs = inputs.apply( >> Watch.growthOf((InputT input, PollReceiver<OutputT> out) → { >> … out.put(timestamp, value) … >> return Watch.outputCanGrow().withWatermark( … ); >> // or: >> return Watch.outputIsFinal(); >> }) >> .withPollInterval(10 sec) >> .withTerminationPerInput( >> Watch.afterEitherOf( >> Watch.afterTotalOf(5 min), >> Watch.afterOutputStableFor(1 min))) >> .withOutputCoder( … )) >> >> Thanks! >> >