Just opened this PR: https://github.com/apache/beam/pull/6055 to get feedback ASAP. Basically what it does is return the job status in a PCollection of BigQueryWriteResult objects
On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax <re...@google.com> wrote: > There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class. > > On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Hmm, I think this approach has some complications: >> - Using JobStatus makes it tied to using BigQuery batch load jobs, but >> the return type ought to be the same regardless of which method of writing >> is used (including potential future BigQuery APIs - they are evolving), or >> how many BigQuery load jobs are involved in writing a given window (it can >> be multiple). >> - Returning a success/failure indicator makes it prone to users ignoring >> the failure: the default behavior should be that, if the pipeline succeeds, >> that means all data was successfully written - if users want different >> error handling, e.g. a deadletter queue, they should have to specify it >> explicitly. >> >> I would recommend to return a PCollection of a type that's invariant to >> which load method is used (streaming writes, load jobs, multiple load jobs >> etc.). If it's unclear what type that should be, you could introduce an >> empty type e.g. "class BigQueryWriteResult {}" just for the sake of >> signaling success, and later add something to it. >> >> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <car...@mrcalonso.com> >> wrote: >> >>> All good so far. I've been a bit side tracked but more or less I have >>> the idea of using the JobStatus as part of the collection so that not only >>> the completion is signaled, but also the result (success/failure) can be >>> accessed, how does it sound? >>> >>> Regards >>> >>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> Hi Carlos, >>>> >>>> Any updates / roadblocks you hit? >>>> >>>> >>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> Awesome!! Thanks for the heads up, very exciting, this is going to >>>>> make a lot of people happy :) >>>>> >>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com> >>>>> wrote: >>>>> >>>>>> + dev@beam.apache.org >>>>>> >>>>>> Just a quick email to let you know that I'm starting developing this. >>>>>> >>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> Hi Carlos, >>>>>>> >>>>>>> Thank you for expressing interest in taking this on! Let me give you >>>>>>> a few pointers to start, and I'll be happy to help everywhere along the >>>>>>> way. >>>>>>> >>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a >>>>>>> PCollection) that can be used as input to Wait.on(). >>>>>>> Currently it returns a WriteResult, which only contains a >>>>>>> PCollection<TableRow> of failed inserts - that one can not be used >>>>>>> directly, instead we should add another component to WriteResult that >>>>>>> represents the result of successfully writing some data. >>>>>>> >>>>>>> Given that BQIO supports dynamic destination writes, I think it >>>>>>> makes sense for that to be a PCollection<KV<DestinationT, ???>> so that >>>>>>> in >>>>>>> theory we could sequence different destinations independently (currently >>>>>>> Wait.on() does not provide such a feature, but it could); and it will >>>>>>> require changing WriteResult to be WriteResult<DestinationT>. As for >>>>>>> what >>>>>>> the "???" might be - it is something that represents the result of >>>>>>> successfully writing a window of data. I think it can even be Void, or >>>>>>> "?" >>>>>>> (wildcard type) for now, until we figure out something better. >>>>>>> >>>>>>> Implementing this would require roughly the following work: >>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >>>>>>> - Modify the BatchLoads transform to provide it on both codepaths: >>>>>>> expandTriggered() and expandUntriggered() >>>>>>> ...- expandTriggered() itself writes via 2 codepaths: >>>>>>> single-partition and multi-partition. Both need to be handled - we need >>>>>>> to >>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten >>>>>>> these >>>>>>> two PCollections together to get the final result. The single-partition >>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables >>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The >>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, >>>>>>> this >>>>>>> codepath drops DestinationT along the way and will need to be >>>>>>> refactored a >>>>>>> bit to keep it until the end. >>>>>>> ...- expandUntriggered() should be treated the same way. >>>>>>> - Modify the StreamingWriteTables transform to provide it >>>>>>> ...- Here also, the challenge is to propagate the DestinationT type >>>>>>> all the way until the end of StreamingWriteTables - it will need to be >>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...> >>>>>>> should be easy. >>>>>>> >>>>>>> Another challenge with all of this is backwards compatibility in >>>>>>> terms of API and pipeline update. >>>>>>> Pipeline update is much less of a concern for the BatchLoads >>>>>>> codepath, because it's typically used in batch-mode pipelines that don't >>>>>>> get updated. I would recommend to start with this, perhaps even with >>>>>>> only >>>>>>> the untriggered codepath (it is much more commonly used) - that will >>>>>>> pave >>>>>>> the way for future work. >>>>>>> >>>>>>> Hope this helps, please ask more if something is unclear! >>>>>>> >>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey Eugene!! >>>>>>>> >>>>>>>> I’d gladly take a stab on it although I’m not sure how much >>>>>>>> available time I might have to put into but... yeah, let’s try it. >>>>>>>> >>>>>>>> Where should I begin? Is there a Jira issue or shall I file one? >>>>>>>> >>>>>>>> Thanks! >>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not >>>>>>>>> implemented in a way that it can be used with Wait.on(). It would >>>>>>>>> certainly >>>>>>>>> be a welcome contribution to change this - many people expressed >>>>>>>>> interest >>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested >>>>>>>>> in >>>>>>>>> helping out? >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso < >>>>>>>>> car...@mrcalonso.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to >>>>>>>>>> my understanding. I'd also be interested in getting batch load >>>>>>>>>> result's >>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! >>>>>>>>>> :) >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>>>>>>>> simon.kitch...@unbelievable-machine.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi All, >>>>>>>>>>> >>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send >>>>>>>>>>> a Pubsub message to trigger further processing. >>>>>>>>>>> >>>>>>>>>>> I found this thread titled "Callbacks/other functions run after >>>>>>>>>>> a PDone/output transform" on the user-list which was very relevant: >>>>>>>>>>> >>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>>>>>>> >>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>>>>>>> >>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not >>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get >>>>>>>>>>> it to >>>>>>>>>>> work. Advice appreciated. >>>>>>>>>>> >>>>>>>>>>> Here's (most of) the relevant test code: >>>>>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>>>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>>>>>>> >>>>>>>>>>> TableFieldSchema f1 = new >>>>>>>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>>>>>>> TableSchema s2 = new >>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>>>>>>> >>>>>>>>>>> WriteResult writeResult = lines.apply("Write and load >>>>>>>>>>> data", BigQueryIO.<String>write() // >>>>>>>>>>> .to(options.getTableSpec()) // >>>>>>>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) >>>>>>>>>>> // >>>>>>>>>>> // >>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>>>>>>> .withSchema(s2) >>>>>>>>>>> >>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>>>>>> // >>>>>>>>>>> >>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>>>>>>> OnCompletion())); >>>>>>>>>>> >>>>>>>>>>> where >>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a >>>>>>>>>>> small sleep for testing purposes, and >>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>>>>>>> >>>>>>>>>>> In production code, OnCompletion would be fed some collection >>>>>>>>>>> derived from lines, eg min/max record id, and the operation would >>>>>>>>>>> be "send >>>>>>>>>>> pubsub message" rather than print.. >>>>>>>>>>> >>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each >>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would >>>>>>>>>>> print each >>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. >>>>>>>>>>> However for >>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>>>>>>> >>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" >>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed >>>>>>>>>>> records, but believe that it can be used as a "signal" for the >>>>>>>>>>> Wait.on - ie >>>>>>>>>>> the output is "complete for window" only after all data has been >>>>>>>>>>> uploaded, >>>>>>>>>>> which is what I need. And that does seem to work for >>>>>>>>>>> STREAMING_LOADS. >>>>>>>>>>> >>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is >>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps >>>>>>>>>>> an >>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to >>>>>>>>>>> the >>>>>>>>>>> batch-load-job that is triggered: >>>>>>>>>>> private WriteResult writeResult(Pipeline p) { >>>>>>>>>>> PCollection<TableRow> empty = >>>>>>>>>>> p.apply("CreateEmptyFailedInserts", >>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>>>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), >>>>>>>>>>> empty); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load >>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job >>>>>>>>>>> status >>>>>>>>>>> until it reaches DONE or FAILED. However that information does not >>>>>>>>>>> appear >>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes >>>>>>>>>>> completion-state via the failedInserts stream). >>>>>>>>>>> >>>>>>>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome >>>>>>>>>>> :-) >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Simon >>>>>>>>>>> >>>>>>>>>>>