What would be the benefit of that, compared to what TextIO already exposes?

On Thu, Sep 28, 2017, 8:20 AM Jacob Marble <[email protected]> wrote:

> Looks good to me, should do exactly what I need.
>
> Any thoughts on making TextSource public? I would be happy to submit the PR
> myself.
>
> On Wed, Sep 27, 2017 at 5:56 PM, Eugene Kirpichov <
> [email protected]> wrote:
>
> > Try TextIO.readAll() that takes a PCollection of filepatterns as input;
> and
> > the PCollection can be produced by the UNLOAD step, emitting a single
> > element (the filepattern) after the unload completes. It's available in
> > Beam 2.2 which hasn't been released yet, but the release vote will likely
> > start this week, and meanwhile you can run against a snapshot.
> >
> > On Wed, Sep 27, 2017 at 5:40 PM Jacob Marble <[email protected]>
> wrote:
> >
> > > After playing with this for a day, I can't figure out how to make step
> 2
> > > start *after* step 1 completes.
> > >
> > > The natural way to accomplish step 2 is TextIO.Read, which only accepts
> > > PBegin as input, and has no side input option. Wrapping TextIO.Read in
> an
> > > adaptor doesn't help because the graph still must build with
> > > TextIO.Read.expand(PBegin).
> > >
> > > I tried extending TextIO.Read directly, but that doesn't work because
> an
> > > AutoValue class can't extend another.
> > >
> > > I thought about using TextSource directly, but it's package-private.
> > >
> > > So my thought is that either (1) TextIO.Read should accept side input,
> if
> > > only to block execution until the side input is complete, or (2) make
> > > TextSource should be public, probably the more natural of these two
> > > options. For now, I'll have to go with (3) copy FileSource source code
> > into
> > > a new class that I can use directly.
> > >
> > > Does anyone have thoughts here?
> > >
> > > On Wed, Sep 27, 2017 at 9:25 AM, Jacob Marble <[email protected]>
> > wrote:
> > >
> > > > Reuven, I think I found an example of the pattern you describe in
> > > > JdbcIO.Read.expand(). Thanks for this.
> > > >
> > > > On Wed, Sep 27, 2017 at 9:13 AM, Reuven Lax <[email protected]
> >
> > > > wrote:
> > > >
> > > >> Create is essentially a BoundedSource under the covers.
> > > >>
> > > >> There are multiple ways to handle step 3. One is to produce a
> > > >> PCollection<String> containing the filenames. You could then attach
> a
> > > Void
> > > >> key (using WithKeys), GBK the filenames together and delete in the
> > next
> > > >> step.
> > > >>
> > > >> Reuven
> > > >>
> > > >> On Wed, Sep 27, 2017 at 9:04 AM, Jacob Marble <[email protected]>
> > > >> wrote:
> > > >>
> > > >> > Thanks, Reuven, that makes sense for step 1. After sending my
> > original
> > > >> > message, I started down the path of BoundedSource, but I think
> this
> > > >> could
> > > >> > be better.
> > > >> >
> > > >> > Do you know any trick for step 3?
> > > >> >
> > > >> > On Wed, Sep 27, 2017 at 8:58 AM, Reuven Lax
> > <[email protected]
> > > >
> > > >> > wrote:
> > > >> >
> > > >> > > A common pattern is the following
> > > >> > >
> > > >> > > p.apply(Create.of((Void) null))
> > > >> > >   .apply(MapElements.via((Void v) -> /* once operation */);
> > > >> > >
> > > >> > > Of course as is always the case with any Beam DoFn, your
> operation
> > > >> might
> > > >> > be
> > > >> > > executed multiple times (e.g. if something fails before the
> runner
> > > >> > commits
> > > >> > > the fact that the operation has succeeded). You need to ensure
> > that
> > > >> the
> > > >> > > operation is idempotent.
> > > >> > >
> > > >> > > Reuven
> > > >> > >
> > > >> > > On Wed, Sep 27, 2017 at 8:51 AM, Jacob Marble <
> > [email protected]>
> > > >> > wrote:
> > > >> > >
> > > >> > > > I have been thinking on a Redshift reader/writer, basically to
> > > wrap
> > > >> > > UNLOAD
> > > >> > > > and COPY in a PTransform. For example, steps to UNLOAD into a
> > > >> > > PCollection:
> > > >> > > >
> > > >> > > > 1) JDBC to Redshift - UNLOAD
> > > >> > > > <http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html>
> > TO
> > > >> > > > 's3://bucket/tmp-prefix'
> > > >> > > > 2) S3 to PCollection - work in progress <
> > > >> https://github.com/Kochava/
> > > >> > > > beam-s3>
> > > >> > > > 3) delete tmp files from S3
> > > >> > > >
> > > >> > > > To implement steps 1 and 3, I can't see a way to perform a
> task
> > > >> exactly
> > > >> > > > once, globally, in a PTransform. Sure, I could do those steps
> in
> > > >> main()
> > > >> > > or
> > > >> > > > even in a separate script, but the result isn't code that can
> be
> > > >> shared
> > > >> > > and
> > > >> > > > reused very well.
> > > >> > > >
> > > >> > > > Am I missing something? Seems like the kind of problem that I
> > > >> shouldn't
> > > >> > > be
> > > >> > > > the first to encounter.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > >
> > > >> > > > Jacob
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Jacob
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Jacob
> > > >
> > >
> > >
> > >
> > > --
> > > Jacob
> > >
> >
>
>
>
> --
> Jacob
>

Reply via email to