All good so far. I've been a bit side tracked but more or less I have the idea of using the JobStatus as part of the collection so that not only the completion is signaled, but also the result (success/failure) can be accessed, how does it sound?
Regards On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <[email protected]> wrote: > Hi Carlos, > > Any updates / roadblocks you hit? > > > On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <[email protected]> > wrote: > >> Awesome!! Thanks for the heads up, very exciting, this is going to make a >> lot of people happy :) >> >> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <[email protected]> wrote: >> >>> + [email protected] >>> >>> Just a quick email to let you know that I'm starting developing this. >>> >>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> Hi Carlos, >>>> >>>> Thank you for expressing interest in taking this on! Let me give you a >>>> few pointers to start, and I'll be happy to help everywhere along the way. >>>> >>>> Basically we want BigQueryIO.write() to return something (e.g. a >>>> PCollection) that can be used as input to Wait.on(). >>>> Currently it returns a WriteResult, which only contains a >>>> PCollection<TableRow> of failed inserts - that one can not be used >>>> directly, instead we should add another component to WriteResult that >>>> represents the result of successfully writing some data. >>>> >>>> Given that BQIO supports dynamic destination writes, I think it makes >>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in theory >>>> we could sequence different destinations independently (currently Wait.on() >>>> does not provide such a feature, but it could); and it will require >>>> changing WriteResult to be WriteResult<DestinationT>. As for what the "???" >>>> might be - it is something that represents the result of successfully >>>> writing a window of data. I think it can even be Void, or "?" (wildcard >>>> type) for now, until we figure out something better. >>>> >>>> Implementing this would require roughly the following work: >>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult >>>> - Modify the BatchLoads transform to provide it on both codepaths: >>>> expandTriggered() and expandUntriggered() >>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition >>>> and multi-partition. Both need to be handled - we need to get a >>>> PCollection<KV<DestinationT, ?>> from each of them, and Flatten these two >>>> PCollections together to get the final result. The single-partition >>>> codepath (writeSinglePartition) under the hood already uses WriteTables >>>> that returns a KV<DestinationT, ...> so it's directly usable. The >>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this >>>> codepath drops DestinationT along the way and will need to be refactored a >>>> bit to keep it until the end. >>>> ...- expandUntriggered() should be treated the same way. >>>> - Modify the StreamingWriteTables transform to provide it >>>> ...- Here also, the challenge is to propagate the DestinationT type all >>>> the way until the end of StreamingWriteTables - it will need to be >>>> refactored. After such a refactoring, returning a KV<DestinationT, ...> >>>> should be easy. >>>> >>>> Another challenge with all of this is backwards compatibility in terms >>>> of API and pipeline update. >>>> Pipeline update is much less of a concern for the BatchLoads codepath, >>>> because it's typically used in batch-mode pipelines that don't get updated. >>>> I would recommend to start with this, perhaps even with only the >>>> untriggered codepath (it is much more commonly used) - that will pave the >>>> way for future work. >>>> >>>> Hope this helps, please ask more if something is unclear! >>>> >>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <[email protected]> >>>> wrote: >>>> >>>>> Hey Eugene!! >>>>> >>>>> I’d gladly take a stab on it although I’m not sure how much available >>>>> time I might have to put into but... yeah, let’s try it. >>>>> >>>>> Where should I begin? Is there a Jira issue or shall I file one? >>>>> >>>>> Thanks! >>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Yes, you're both right - BigQueryIO.write() is currently not >>>>>> implemented in a way that it can be used with Wait.on(). It would >>>>>> certainly >>>>>> be a welcome contribution to change this - many people expressed interest >>>>>> in specifically waiting for BigQuery writes. Is any of you interested in >>>>>> helping out? >>>>>> >>>>>> Thanks. >>>>>> >>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Simon, I think your explanation was very accurate, at least to my >>>>>>> understanding. I'd also be interested in getting batch load result's >>>>>>> feedback on the pipeline... hopefully someone may suggest something, >>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :) >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I need to write some data to BigQuery (batch-mode) and then send a >>>>>>>> Pubsub message to trigger further processing. >>>>>>>> >>>>>>>> I found this thread titled "Callbacks/other functions run after a >>>>>>>> PDone/output transform" on the user-list which was very relevant: >>>>>>>> >>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E >>>>>>>> >>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)! >>>>>>>> >>>>>>>> Unfortunately, it appears that the Wait.on transform does not work >>>>>>>> with BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to >>>>>>>> work. >>>>>>>> Advice appreciated. >>>>>>>> >>>>>>>> Here's (most of) the relevant test code: >>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>> PCollection<String> lines = p.apply("Read Input", >>>>>>>> Create.of("line1", "line2", "line3", "line4")); >>>>>>>> >>>>>>>> TableFieldSchema f1 = new >>>>>>>> TableFieldSchema().setName("value").setType("string"); >>>>>>>> TableSchema s2 = new >>>>>>>> TableSchema().setFields(Collections.singletonList(f1)); >>>>>>>> >>>>>>>> WriteResult writeResult = lines.apply("Write and load >>>>>>>> data", BigQueryIO.<String>write() // >>>>>>>> .to(options.getTableSpec()) // >>>>>>>> .withFormatFunction(new SlowFormatter()) // >>>>>>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) // >>>>>>>> // >>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) // >>>>>>>> .withSchema(s2) >>>>>>>> >>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>>>> // >>>>>>>> >>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); >>>>>>>> >>>>>>>> >>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new >>>>>>>> OnCompletion())); >>>>>>>> >>>>>>>> where >>>>>>>> + format-function "SlowFormatter" prints out each line and has a >>>>>>>> small sleep for testing purposes, and >>>>>>>> + DoFn OnCompletion just prints out the contents of each line >>>>>>>> >>>>>>>> In production code, OnCompletion would be fed some collection >>>>>>>> derived from lines, eg min/max record id, and the operation would be >>>>>>>> "send >>>>>>>> pubsub message" rather than print.. >>>>>>>> >>>>>>>> My expectation is that the "SlowFormatter" would run for each line, >>>>>>>> then the data would be uploaded, then OnCompletion would print each >>>>>>>> line. >>>>>>>> And indeed that happens when STREAMING_INSERTS is used. However for >>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place. >>>>>>>> >>>>>>>> I use WriteResult.getFailedInserts as that is the only "output" >>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed >>>>>>>> records, but believe that it can be used as a "signal" for the Wait.on >>>>>>>> - ie >>>>>>>> the output is "complete for window" only after all data has been >>>>>>>> uploaded, >>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS. >>>>>>>> >>>>>>>> I suspect the reason that this does not work for FILE_LOADS is that >>>>>>>> method BatchLoads.writeResult returns a WriteResult that wraps an >>>>>>>> "empty" >>>>>>>> failedInserts collection, ie data which is not connected to the >>>>>>>> batch-load-job that is triggered: >>>>>>>> private WriteResult writeResult(Pipeline p) { >>>>>>>> PCollection<TableRow> empty = >>>>>>>> p.apply("CreateEmptyFailedInserts", >>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class))); >>>>>>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), >>>>>>>> empty); >>>>>>>> } >>>>>>>> >>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load >>>>>>>> jobs; once a job is submitted the code repeatedly polls the job status >>>>>>>> until it reaches DONE or FAILED. However that information does not >>>>>>>> appear >>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes >>>>>>>> completion-state via the failedInserts stream). >>>>>>>> >>>>>>>> If I have misunderstood something, corrections welcome! If not, >>>>>>>> suggestions for workarounds or alternate solutions are also welcome :-) >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Simon >>>>>>>> >>>>>>>>
