Just opened this PR: https://github.com/apache/beam/pull/6055 to get
feedback ASAP. Basically what it does is return the job status in a
PCollection of BigQueryWriteResult objects

On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax <re...@google.com> wrote:

> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>
> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Hmm, I think this approach has some complications:
>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>> the return type ought to be the same regardless of which method of writing
>> is used (including potential future BigQuery APIs - they are evolving), or
>> how many BigQuery load jobs are involved in writing a given window (it can
>> be multiple).
>> - Returning a success/failure indicator makes it prone to users ignoring
>> the failure: the default behavior should be that, if the pipeline succeeds,
>> that means all data was successfully written - if users want different
>> error handling, e.g. a deadletter queue, they should have to specify it
>> explicitly.
>>
>> I would recommend to return a PCollection of a type that's invariant to
>> which load method is used (streaming writes, load jobs, multiple load jobs
>> etc.). If it's unclear what type that should be, you could introduce an
>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>> signaling success, and later add something to it.
>>
>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> All good so far. I've been a bit side tracked but more or less I have
>>> the idea of using the JobStatus as part of the collection so that not only
>>> the completion is signaled, but also the result (success/failure) can be
>>> accessed, how does it sound?
>>>
>>> Regards
>>>
>>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Any updates / roadblocks you hit?
>>>>
>>>>
>>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> Awesome!! Thanks for the heads up, very exciting, this is going to
>>>>> make a lot of people happy :)
>>>>>
>>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> + dev@beam.apache.org
>>>>>>
>>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> Hi Carlos,
>>>>>>>
>>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>>> a few pointers to start, and I'll be happy to help everywhere along the 
>>>>>>> way.
>>>>>>>
>>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>>> directly, instead we should add another component to WriteResult that
>>>>>>> represents the result of successfully writing some data.
>>>>>>>
>>>>>>> Given that BQIO supports dynamic destination writes, I think it
>>>>>>> makes sense for that to be a PCollection<KV<DestinationT, ???>> so that 
>>>>>>> in
>>>>>>> theory we could sequence different destinations independently (currently
>>>>>>> Wait.on() does not provide such a feature, but it could); and it will
>>>>>>> require changing WriteResult to be WriteResult<DestinationT>. As for 
>>>>>>> what
>>>>>>> the "???" might be - it is something that represents the result of
>>>>>>> successfully writing a window of data. I think it can even be Void, or 
>>>>>>> "?"
>>>>>>> (wildcard type) for now, until we figure out something better.
>>>>>>>
>>>>>>> Implementing this would require roughly the following work:
>>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>>> expandTriggered() and expandUntriggered()
>>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>>> single-partition and multi-partition. Both need to be handled - we need 
>>>>>>> to
>>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten 
>>>>>>> these
>>>>>>> two PCollections together to get the final result. The single-partition
>>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, 
>>>>>>> this
>>>>>>> codepath drops DestinationT along the way and will need to be 
>>>>>>> refactored a
>>>>>>> bit to keep it until the end.
>>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>>> should be easy.
>>>>>>>
>>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>>> terms of API and pipeline update.
>>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>>> get updated. I would recommend to start with this, perhaps even with 
>>>>>>> only
>>>>>>> the untriggered codepath (it is much more commonly used) - that will 
>>>>>>> pave
>>>>>>> the way for future work.
>>>>>>>
>>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>>
>>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey Eugene!!
>>>>>>>>
>>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>>
>>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>>> implemented in a way that it can be used with Wait.on(). It would 
>>>>>>>>> certainly
>>>>>>>>> be a welcome contribution to change this - many people expressed 
>>>>>>>>> interest
>>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested 
>>>>>>>>> in
>>>>>>>>> helping out?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>>
>>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <
>>>>>>>>> car...@mrcalonso.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>>>> my understanding. I'd also be interested in getting batch load 
>>>>>>>>>> result's
>>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! 
>>>>>>>>>> :)
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send
>>>>>>>>>>> a Pubsub message to trigger further processing.
>>>>>>>>>>>
>>>>>>>>>>> I found this thread titled "Callbacks/other functions run after
>>>>>>>>>>> a PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>>
>>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>>
>>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>>
>>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get 
>>>>>>>>>>> it to
>>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>>
>>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>>
>>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>>>>>>>> //
>>>>>>>>>>> //
>>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>>
>>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>>> //
>>>>>>>>>>>
>>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>>> OnCompletion()));
>>>>>>>>>>>
>>>>>>>>>>> where
>>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>>
>>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>>> derived from lines, eg min/max record id, and the operation would 
>>>>>>>>>>> be "send
>>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>>
>>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would 
>>>>>>>>>>> print each
>>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. 
>>>>>>>>>>> However for
>>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>>
>>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>>> records, but believe that it can be used as a "signal" for the 
>>>>>>>>>>> Wait.on - ie
>>>>>>>>>>> the output is "complete for window" only after all data has been 
>>>>>>>>>>> uploaded,
>>>>>>>>>>> which is what I need. And that does seem to work for 
>>>>>>>>>>> STREAMING_LOADS.
>>>>>>>>>>>
>>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps 
>>>>>>>>>>> an
>>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to 
>>>>>>>>>>> the
>>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>>> empty);
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job 
>>>>>>>>>>> status
>>>>>>>>>>> until it reaches DONE or FAILED. However that information does not 
>>>>>>>>>>> appear
>>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>>
>>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome 
>>>>>>>>>>> :-)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Simon
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to