Hi all, Please take a look at this short proposal that came out of implementing http://s.apache.org/textio-sdf. I think it's a nice generalization. I would welcome comments on the proposed API, or corner cases of semantics that I haven't thought about, or more generalizations, etc.
http://s.apache.org/beam-watch-transform We propose a PTransform Watch.growthOf(poll function) that repeatedly polls sets associated with each of its inputs and continuously produces new elements in each set until a per-set termination condition is reached. It is a generalization of "watch filepattern for matching files". Code snippet: PCollection<InputT> inputs = …; PCollection<KV<InputT, OutputT>> outputs = inputs.apply( Watch.growthOf((InputT input, PollReceiver<OutputT> out) → { … out.put(timestamp, value) … return Watch.outputCanGrow().withWatermark( … ); // or: return Watch.outputIsFinal(); }) .withPollInterval(10 sec) .withTerminationPerInput( Watch.afterEitherOf( Watch.afterTotalOf(5 min), Watch.afterOutputStableFor(1 min))) .withOutputCoder( … )) Thanks!