Yes, I thought of this, but: - The distinct transform needs to apply per input (probably easy) - You still need an SDF to run the set expansion repeatedly - It's not clear when to terminate the repeated expansion in this implementation
On Tue, Jul 11, 2017 at 10:14 PM Reuven Lax <re...@google.com.invalid> wrote: > As a thought experiment: could this be done by expanding the set into a > PCollection and running it through a Distinct (in the global window, > trigger every element) transform? > > On Tue, Jul 11, 2017 at 9:48 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > In the current version, the transform is intended to watch a set that is > > continuously growing; do you mean a GCS bucket that eventually contains > > more files than can fit in a state tag? > > > > I agree that this will eventually become an issue; I can see a couple of > > solutions: > > - I suspect many such sets are highly compressible, so we can use a coder > > that compresses things and get some headroom. > > - When an element disappears from a set, we can remove it from the state > > (without emitting anything into the transform's output - just for GC > > purposes). Of course this assumes that elements actually disappear from > the > > set (e.g. get removed from the GCS bucket). > > - There might be a way to shard the set using a GBK. I'm not quite sure > how > > it would look in the transform, in particular how the termination > condition > > would look like - because polling would need to happen before the GBK, > and > > termination conditions such as "no new elements observed" depend on > > information in shards that's after the GBK. > > > > On Tue, Jul 11, 2017 at 9:23 PM Reuven Lax <re...@google.com.invalid> > > wrote: > > > > > BTW - I am worried about SDF storing everything in a single tag for > > watch. > > > The problem is that streaming pipeline can run "forever." So someone > > > watching a GCS bucket "forever" will eventually crash due to the value > > > getting too large. Is there any reasonable way to garbage collect this > > > state? > > > > > > On Tue, Jul 11, 2017 at 9:08 PM, Eugene Kirpichov < > > > kirpic...@google.com.invalid> wrote: > > > > > > > First PR has been submitted - enjoy TextIO.readAll() which reads a > > > > PCollection of filenames! > > > > I've started working on the SDF-based Watch transform > > > > http://s.apache.org/beam-watch-transform, and after that will be > able > > to > > > > implement the incremental features in TextIO. > > > > > > > > On Tue, Jun 27, 2017 at 1:55 PM Eugene Kirpichov < > kirpic...@google.com > > > > > > > wrote: > > > > > > > > > Thanks all. The first PR is out for review: > > > > > https://github.com/apache/beam/pull/3443 > > > > > Next work (watching for new files) is in progress, based on > > > > > https://github.com/apache/beam/pull/3360 > > > > > > > > > > On Tue, Jun 27, 2017 at 11:22 AM Kenneth Knowles > > > <k...@google.com.invalid > > > > > > > > > > wrote: > > > > > > > > > >> +1 > > > > >> > > > > >> This is a really nice doc and plan. > > > > >> > > > > >> On Tue, Jun 27, 2017 at 1:49 AM, Aljoscha Krettek < > > > aljos...@apache.org> > > > > >> wrote: > > > > >> > > > > >> > +1 > > > > >> > > > > > >> > This sounds very good and there is a clear implementation path! > > > > >> > > > > > >> > > On 24. Jun 2017, at 20:55, Jean-Baptiste Onofré < > > j...@nanthrax.net> > > > > >> wrote: > > > > >> > > > > > > >> > > Fair enough ;) > > > > >> > > > > > > >> > > Let me review the different Jira and provide some feedback. > > > > >> > > > > > > >> > > Regards > > > > >> > > JB > > > > >> > > > > > > >> > > On Jun 24, 2017, 20:54, at 20:54, Eugene Kirpichov > > > > >> > <kirpic...@google.com.INVALID> wrote: > > > > >> > >> Hi JB, > > > > >> > >> I haven't yet thought about how this work can be > parallelized. > > > For > > > > >> now > > > > >> > >> I'd > > > > >> > >> like to just get feedback on the approach :) > > > > >> > >> But glad that you're willing to help out - let's discuss this > > > too a > > > > >> bit > > > > >> > >> later! > > > > >> > >> > > > > >> > >> On Sat, Jun 24, 2017 at 11:51 AM Jean-Baptiste Onofré < > > > > >> j...@nanthrax.net> > > > > >> > >> wrote: > > > > >> > >> > > > > >> > >>> Thanks Eugene > > > > >> > >>> > > > > >> > >>> I will pick up some. > > > > >> > >>> > > > > >> > >>> Regards > > > > >> > >>> JB > > > > >> > >>> > > > > >> > >>> On Jun 24, 2017, 20:00, at 20:00, Eugene Kirpichov > > > > >> > >>> <kirpic...@google.com.INVALID> wrote: > > > > >> > >>>> Filed JIRAs for the proposed features and linked with the > > doc: > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2511 TextIO > > should > > > > >> > >> support > > > > >> > >>>> reading a PCollection of filenames > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2512 TextIO > > should > > > > >> > >> support > > > > >> > >>>> watching for new files > > > > >> > >>>> https://issues.apache.org/jira/browse/BEAM-2513 TextIO > > should > > > > >> > >> support > > > > >> > >>>> watching files for new entries > > > > >> > >>>> > > > > >> > >>>> On Fri, Jun 23, 2017 at 4:32 PM Eugene Kirpichov > > > > >> > >> <kirpic...@google.com> > > > > >> > >>>> wrote: > > > > >> > >>>> > > > > >> > >>>>> Hi all, > > > > >> > >>>>> > > > > >> > >>>>> I've written up a proposal for incrementally delivering a > > > bunch > > > > of > > > > >> > >>>> useful > > > > >> > >>>>> new features in TextIO based on Splittable DoFn. It's > > > applicable > > > > >> > >> to > > > > >> > >>>> other > > > > >> > >>>>> file-based connectors, TextIO is just one good example. > Let > > me > > > > >> > >> know > > > > >> > >>>> what > > > > >> > >>>>> you think! > > > > >> > >>>>> > > > > >> > >>>>> https://s.apache.org/textio-sdf > > > > >> > >>>>> > > > > >> > >>>>> Copy of abstract: > > > > >> > >>>>> > > > > >> > >>>>> Users have often expressed interest in several new > features > > > for > > > > >> > >>>> reading > > > > >> > >>>>> files - in particular, incremental reading of log files > > > > (streaming > > > > >> > >> of > > > > >> > >>>> new > > > > >> > >>>>> files matching a pattern and new entries in each file) and > > > > reading > > > > >> > >> a > > > > >> > >>>>> PCollection of filenames (in particular, an unbounded > > > collection > > > > >> > >>>> arriving > > > > >> > >>>>> from a stream such as PubSub or Kafka). > > > > >> > >>>>> > > > > >> > >>>>> Splittable DoFn <http://s.apache.org/splittable-do-fn> > > (SDF) > > > > >> > >> enables > > > > >> > >>>>> these features. This document proposes an API for them, > > using > > > > the > > > > >> > >>>> example > > > > >> > >>>>> of TextIO, and proposes and a plan for delivering them > > subject > > > > to > > > > >> > >>>>> availability of SDF in different runners. Some > availability > > > > >> > >>>> constraints are > > > > >> > >>>>> circumvented by Running Splittable DoFn via Source API > > > > >> > >>>>> <http://s.apache.org/sdf-via-source>. > > > > >> > >>>>> > > > > >> > >>>>> TL;DR Read a collection of filepatterns arriving on PubSub > > via > > > > >> > >>>>> files.apply(TextIO.readEach()). Tail a filepattern via > > > > >> > >>>>> > TextIO.read().watchForNewFiles().watchFilesForNewEntries(). > > > > Coming > > > > >> > >> to > > > > >> > >>>> a > > > > >> > >>>>> Beam SDK near you in small pieces. > > > > >> > >>>>> > > > > >> > >>>>> I think I'm gonna start working on the first steps of the > > > > proposed > > > > >> > >>>> plan, > > > > >> > >>>>> in parallel with this discussion, because I'm excited :) > > > > >> > >>>>> > > > > >> > >>> > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > > > > > >