I sent a PR about this all: https://github.com/apache/beam/pull/3799
On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov <[email protected]> wrote: > Thanks. I think I agree that file-based IOs (at least widely used ones) > should, for convenience, still provide FooIO.read().from(filepattern), and > for performance until SDF has full support in all runners, implement it via > a BoundedSource. > > The second case with Create.of(filepattern) illustrates when the > filepattern is not known at construction time but rather there's a > collection of filepatterns: it's a separate use case. > > On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot <[email protected]> > wrote: > >> Hi Eugene, >> >> +1 to this, it is nice to add this common behavior to all the file-based >> IOs. I find the design elegant, I just have one minor API comment, I >> would prefer >> >> p.apply(FooIO.read().from(filepattern)) >> >> to >> >> p.apply(Create.of(filepattern)) >> >> IMHO, it is more readable and analogous to the other APIs. >> >> Etienne >> >> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit : >> > Hi all, >> > >> > I've been adding new features to TextIO and AvroIO recently, see e.g. >> > https://github.com/apache/beam/pull/3725. The features are: >> > - withHintMatchesManyFiles() >> > - readAll() that reads a PCollection of filepatterns >> > - configurable treatment of filepatterns that match no files >> > - watchForNewFiles() that incrementally watches for new files matching >> the >> > filepatterns >> > >> > However, these features also make sense for other file-based IOs >> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them >> explicitly >> > to each of these requires a lot of boilerplate and reeks of lack of >> > modularity. I don't want to add this much duplicated code to each of >> them, >> > nor to require authors of new such IOs to add it. >> > >> > Note that all of these features are available on the recently added >> > Match.filepatterns() transform, that converts a PCollection<String> to a >> > PCollection<MatchResult.Metadata> (file path and size). The boilerplate >> in >> > file-based IOs ends up simply passing on the properties to the Match >> > transform. >> > >> > Because of this, I'd like to propose the following recommendation for >> > file-based IOs: >> > A file-based FooIO should include: >> > - A read transform that reads the data from a filepattern specified at >> > pipeline construction time - FooIO.read().from(filepattern) or something >> > analogous, as a PCollection<Foo> >> > - A transform FooIO.readAllMatches() that converts a >> PCollection<Metadata> >> > to PCollection<Foo> >> > >> > Then FooIO.read() handles the common case, and the user can solve all >> > advanced cases by combining Match.filepatterns() with >> > FooIO.readAllMatches(): >> > >> > // Read files in a filepattern but don't fail if it's empty >> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) >> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW)) >> > .apply(FooIO.readAllMatches()); >> > >> > // Read new filepatterns arriving over PubSub, and for each filepattern >> > // continuously watch for new files matching it, polling every 1 minute >> > // and stop polling a filepattern if no new files appear for 5 minutes >> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...) >> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) >> > .apply(Match.filepatterns().continuously( >> > Duration.standardMinutes(1), >> > afterTimeSinceNewOutput(Duration.standardMinutes(5)))) >> > .apply(FooIO.readAllMatches()); >> > >> > Adding explicit support for these configuration options to FooIO.read(), >> > and adding a FooIO.readAll() should be optional. >> > >> > WDYT? >> > >> >>
