Try TextIO.readAll() that takes a PCollection of filepatterns as input; and the PCollection can be produced by the UNLOAD step, emitting a single element (the filepattern) after the unload completes. It's available in Beam 2.2 which hasn't been released yet, but the release vote will likely start this week, and meanwhile you can run against a snapshot.
On Wed, Sep 27, 2017 at 5:40 PM Jacob Marble <[email protected]> wrote: > After playing with this for a day, I can't figure out how to make step 2 > start *after* step 1 completes. > > The natural way to accomplish step 2 is TextIO.Read, which only accepts > PBegin as input, and has no side input option. Wrapping TextIO.Read in an > adaptor doesn't help because the graph still must build with > TextIO.Read.expand(PBegin). > > I tried extending TextIO.Read directly, but that doesn't work because an > AutoValue class can't extend another. > > I thought about using TextSource directly, but it's package-private. > > So my thought is that either (1) TextIO.Read should accept side input, if > only to block execution until the side input is complete, or (2) make > TextSource should be public, probably the more natural of these two > options. For now, I'll have to go with (3) copy FileSource source code into > a new class that I can use directly. > > Does anyone have thoughts here? > > On Wed, Sep 27, 2017 at 9:25 AM, Jacob Marble <[email protected]> wrote: > > > Reuven, I think I found an example of the pattern you describe in > > JdbcIO.Read.expand(). Thanks for this. > > > > On Wed, Sep 27, 2017 at 9:13 AM, Reuven Lax <[email protected]> > > wrote: > > > >> Create is essentially a BoundedSource under the covers. > >> > >> There are multiple ways to handle step 3. One is to produce a > >> PCollection<String> containing the filenames. You could then attach a > Void > >> key (using WithKeys), GBK the filenames together and delete in the next > >> step. > >> > >> Reuven > >> > >> On Wed, Sep 27, 2017 at 9:04 AM, Jacob Marble <[email protected]> > >> wrote: > >> > >> > Thanks, Reuven, that makes sense for step 1. After sending my original > >> > message, I started down the path of BoundedSource, but I think this > >> could > >> > be better. > >> > > >> > Do you know any trick for step 3? > >> > > >> > On Wed, Sep 27, 2017 at 8:58 AM, Reuven Lax <[email protected] > > > >> > wrote: > >> > > >> > > A common pattern is the following > >> > > > >> > > p.apply(Create.of((Void) null)) > >> > > .apply(MapElements.via((Void v) -> /* once operation */); > >> > > > >> > > Of course as is always the case with any Beam DoFn, your operation > >> might > >> > be > >> > > executed multiple times (e.g. if something fails before the runner > >> > commits > >> > > the fact that the operation has succeeded). You need to ensure that > >> the > >> > > operation is idempotent. > >> > > > >> > > Reuven > >> > > > >> > > On Wed, Sep 27, 2017 at 8:51 AM, Jacob Marble <[email protected]> > >> > wrote: > >> > > > >> > > > I have been thinking on a Redshift reader/writer, basically to > wrap > >> > > UNLOAD > >> > > > and COPY in a PTransform. For example, steps to UNLOAD into a > >> > > PCollection: > >> > > > > >> > > > 1) JDBC to Redshift - UNLOAD > >> > > > <http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html> TO > >> > > > 's3://bucket/tmp-prefix' > >> > > > 2) S3 to PCollection - work in progress < > >> https://github.com/Kochava/ > >> > > > beam-s3> > >> > > > 3) delete tmp files from S3 > >> > > > > >> > > > To implement steps 1 and 3, I can't see a way to perform a task > >> exactly > >> > > > once, globally, in a PTransform. Sure, I could do those steps in > >> main() > >> > > or > >> > > > even in a separate script, but the result isn't code that can be > >> shared > >> > > and > >> > > > reused very well. > >> > > > > >> > > > Am I missing something? Seems like the kind of problem that I > >> shouldn't > >> > > be > >> > > > the first to encounter. > >> > > > > >> > > > Thanks, > >> > > > > >> > > > Jacob > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > Jacob > >> > > >> > > > > > > > > -- > > Jacob > > > > > > -- > Jacob >
