Hi all, I've been adding new features to TextIO and AvroIO recently, see e.g. https://github.com/apache/beam/pull/3725. The features are: - withHintMatchesManyFiles() - readAll() that reads a PCollection of filepatterns - configurable treatment of filepatterns that match no files - watchForNewFiles() that incrementally watches for new files matching the filepatterns
However, these features also make sense for other file-based IOs (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly to each of these requires a lot of boilerplate and reeks of lack of modularity. I don't want to add this much duplicated code to each of them, nor to require authors of new such IOs to add it. Note that all of these features are available on the recently added Match.filepatterns() transform, that converts a PCollection<String> to a PCollection<MatchResult.Metadata> (file path and size). The boilerplate in file-based IOs ends up simply passing on the properties to the Match transform. Because of this, I'd like to propose the following recommendation for file-based IOs: A file-based FooIO should include: - A read transform that reads the data from a filepattern specified at pipeline construction time - FooIO.read().from(filepattern) or something analogous, as a PCollection<Foo> - A transform FooIO.readAllMatches() that converts a PCollection<Metadata> to PCollection<Foo> Then FooIO.read() handles the common case, and the user can solve all advanced cases by combining Match.filepatterns() with FooIO.readAllMatches(): // Read files in a filepattern but don't fail if it's empty PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW)) .apply(FooIO.readAllMatches()); // Read new filepatterns arriving over PubSub, and for each filepattern // continuously watch for new files matching it, polling every 1 minute // and stop polling a filepattern if no new files appear for 5 minutes PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...) PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) .apply(Match.filepatterns().continuously( Duration.standardMinutes(1), afterTimeSinceNewOutput(Duration.standardMinutes(5)))) .apply(FooIO.readAllMatches()); Adding explicit support for these configuration options to FooIO.read(), and adding a FooIO.readAll() should be optional. WDYT?