There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class. On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <kirpic...@google.com> wrote:
> Hmm, I think this approach has some complications: > - Using JobStatus makes it tied to using BigQuery batch load jobs, but the > return type ought to be the same regardless of which method of writing is > used (including potential future BigQuery APIs - they are evolving), or how > many BigQuery load jobs are involved in writing a given window (it can be > multiple). > - Returning a success/failure indicator makes it prone to users ignoring > the failure: the default behavior should be that, if the pipeline succeeds, > that means all data was successfully written - if users want different > error handling, e.g. a deadletter queue, they should have to specify it > explicitly. > > I would recommend to return a PCollection of a type that's invariant to > which load method is used (streaming writes, load jobs, multiple load jobs > etc.). If it's unclear what type that should be, you could introduce an > empty type e.g. "class BigQueryWriteResult {}" just for the sake of > signaling success, and later add something to it. > > On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <car...@mrcalonso.com> > wrote: > >> All good so far. I've been a bit side tracked but more or less I have the >> idea of using the JobStatus as part of the collection so that not only the >> completion is signaled, but also the result (success/failure) can be >> accessed, how does it sound? >> >> Regards >> >> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> Hi Carlos, >>> >>> Any updates / roadblocks you hit? >>> >>> >>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> Awesome!! Thanks for the heads up, very exciting, this is going to make >>>> a lot of people happy :) >>>> >>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> >>>> wrote: >>>> >>>>> + dev@beam.apache.org >>>>> >>>>> Just a quick email to let you know that I'm starting developing this. >>>>> >>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov < >>>>> kirpic...@google.com> wrote: >>>>> >>>>>> Hi Carlos, >>>>>> >>>>>> Thank you for expressing interest in taking this on! Let me give you >>>>>> a few pointers to start, and I'll be happy to help everywhere along the >>>>>> way. >>>>>> >>>>>> Basically we want BigQueryIO.write() to return something (e.g. a >>>>>> PCollection) that can be used as input to Wait.on(). >>>>>> Currently it returns a WriteResult, which only contains a >>>>>> PCollection<TableRow> of failed inserts - that one can not be used >>>>>> directly, instead we should add another component to WriteResult that >>>>>> represents the result of successfully writing some data. >>>>>> >>>>>> Given that BQIO supports dynamic destination writes, I think it makes >>>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in >>>>>> theory >>>>>> we could sequence different destinations independently (currently >>>>>> Wait.on() >>>>>> does not provide such a feature, but it could); and it will require >>>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the >>>>>> "???" >>>>>> might be - it is something that represents the result of successfully >>>>>> writing a window of data. I think it can even be Void, or "?" (wildcard >>>>>> type) for now, until we figure out something better. >>>>>> >>>>>> Implementing this would require roughly the following work: >>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >>>>>> - Modify the BatchLoads transform to provide it on both codepaths: >>>>>> expandTriggered() and expandUntriggered() >>>>>> ...- expandTriggered() itself writes via 2 codepaths: >>>>>> single-partition and multi-partition. Both need to be handled - we need >>>>>> to >>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten >>>>>> these >>>>>> two PCollections together to get the final result. The single-partition >>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables >>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The >>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, >>>>>> this >>>>>> codepath drops DestinationT along the way and will need to be refactored >>>>>> a >>>>>> bit to keep it until the end. >>>>>> ...- expandUntriggered() should be treated the same way. >>>>>> - Modify the StreamingWriteTables transform to provide it >>>>>> ...- Here also, the challenge is to propagate the DestinationT type >>>>>> all the way until the end of StreamingWriteTables - it will need to be >>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...> >>>>>> should be easy. >>>>>> >>>>>> Another challenge with all of this is backwards compatibility in >>>>>> terms of API and pipeline update. >>>>>> Pipeline update is much less of a concern for the BatchLoads >>>>>> codepath, because it's typically used in batch-mode pipelines that don't >>>>>> get updated. I would recommend to start with this, perhaps even with only >>>>>> the untriggered codepath (it is much more commonly used) - that will pave >>>>>> the way for future work. >>>>>> >>>>>> Hope this helps, please ask more if something is unclear! >>>>>> >>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Eugene!! >>>>>>> >>>>>>> I’d gladly take a stab on it although I’m not sure how much >>>>>>> available time I might have to put into but... yeah, let’s try it. >>>>>>> >>>>>>> Where should I begin? Is there a Jira issue or shall I file one? >>>>>>> >>>>>>> Thanks! >>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not >>>>>>>> implemented in a way that it can be used with Wait.on(). It would >>>>>>>> certainly >>>>>>>> be a welcome contribution to change this - many people expressed >>>>>>>> interest >>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested >>>>>>>> in >>>>>>>> helping out? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to >>>>>>>>> my understanding. I'd also be interested in getting batch load >>>>>>>>> result's >>>>>>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! >>>>>>>>> :) >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>>>>>>> simon.kitch...@unbelievable-machine.com> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send >>>>>>>>>> a Pubsub message to trigger further processing. >>>>>>>>>> >>>>>>>>>> I found this thread titled "Callbacks/other functions run after a >>>>>>>>>> PDone/output transform" on the user-list which was very relevant: >>>>>>>>>> >>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>>>>>> >>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>>>>>> >>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not >>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get >>>>>>>>>> it to >>>>>>>>>> work. Advice appreciated. >>>>>>>>>> >>>>>>>>>> Here's (most of) the relevant test code: >>>>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>>>>>> >>>>>>>>>> TableFieldSchema f1 = new >>>>>>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>>>>>> TableSchema s2 = new >>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>>>>>> >>>>>>>>>> WriteResult writeResult = lines.apply("Write and load >>>>>>>>>> data", BigQueryIO.<String>write() // >>>>>>>>>> .to(options.getTableSpec()) // >>>>>>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>>>>>>> // >>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>>>>>> .withSchema(s2) >>>>>>>>>> >>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>>>>> // >>>>>>>>>> >>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>>>>>> OnCompletion())); >>>>>>>>>> >>>>>>>>>> where >>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a >>>>>>>>>> small sleep for testing purposes, and >>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>>>>>> >>>>>>>>>> In production code, OnCompletion would be fed some collection >>>>>>>>>> derived from lines, eg min/max record id, and the operation would be >>>>>>>>>> "send >>>>>>>>>> pubsub message" rather than print.. >>>>>>>>>> >>>>>>>>>> My expectation is that the "SlowFormatter" would run for each >>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print >>>>>>>>>> each >>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. >>>>>>>>>> However for >>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>>>>>> >>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" >>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed >>>>>>>>>> records, but believe that it can be used as a "signal" for the >>>>>>>>>> Wait.on - ie >>>>>>>>>> the output is "complete for window" only after all data has been >>>>>>>>>> uploaded, >>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS. >>>>>>>>>> >>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is >>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps >>>>>>>>>> an >>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to >>>>>>>>>> the >>>>>>>>>> batch-load-job that is triggered: >>>>>>>>>> private WriteResult writeResult(Pipeline p) { >>>>>>>>>> PCollection<TableRow> empty = >>>>>>>>>> p.apply("CreateEmptyFailedInserts", >>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), >>>>>>>>>> empty); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load >>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job >>>>>>>>>> status >>>>>>>>>> until it reaches DONE or FAILED. However that information does not >>>>>>>>>> appear >>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes >>>>>>>>>> completion-state via the failedInserts stream). >>>>>>>>>> >>>>>>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome >>>>>>>>>> :-) >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Simon >>>>>>>>>> >>>>>>>>>>