There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.

On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hmm, I think this approach has some complications:
> - Using JobStatus makes it tied to using BigQuery batch load jobs, but the
> return type ought to be the same regardless of which method of writing is
> used (including potential future BigQuery APIs - they are evolving), or how
> many BigQuery load jobs are involved in writing a given window (it can be
> multiple).
> - Returning a success/failure indicator makes it prone to users ignoring
> the failure: the default behavior should be that, if the pipeline succeeds,
> that means all data was successfully written - if users want different
> error handling, e.g. a deadletter queue, they should have to specify it
> explicitly.
>
> I would recommend to return a PCollection of a type that's invariant to
> which load method is used (streaming writes, load jobs, multiple load jobs
> etc.). If it's unclear what type that should be, you could introduce an
> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
> signaling success, and later add something to it.
>
> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> All good so far. I've been a bit side tracked but more or less I have the
>> idea of using the JobStatus as part of the collection so that not only the
>> completion is signaled, but also the result (success/failure) can be
>> accessed, how does it sound?
>>
>> Regards
>>
>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Hi Carlos,
>>>
>>> Any updates / roadblocks you hit?
>>>
>>>
>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> Awesome!! Thanks for the heads up, very exciting, this is going to make
>>>> a lot of people happy :)
>>>>
>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> + dev@beam.apache.org
>>>>>
>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>
>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>> kirpic...@google.com> wrote:
>>>>>
>>>>>> Hi Carlos,
>>>>>>
>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>> a few pointers to start, and I'll be happy to help everywhere along the 
>>>>>> way.
>>>>>>
>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>> PCollection<TableRow> of failed inserts - that one can not be used
>>>>>> directly, instead we should add another component to WriteResult that
>>>>>> represents the result of successfully writing some data.
>>>>>>
>>>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>>>> sense for that to be a PCollection<KV<DestinationT, ???>> so that in 
>>>>>> theory
>>>>>> we could sequence different destinations independently (currently 
>>>>>> Wait.on()
>>>>>> does not provide such a feature, but it could); and it will require
>>>>>> changing WriteResult to be WriteResult<DestinationT>. As for what the 
>>>>>> "???"
>>>>>> might be - it is something that represents the result of successfully
>>>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>>>> type) for now, until we figure out something better.
>>>>>>
>>>>>> Implementing this would require roughly the following work:
>>>>>> - Add this PCollection<KV<DestinationT, ?>> to WriteResult
>>>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>>>> expandTriggered() and expandUntriggered()
>>>>>> ...- expandTriggered() itself writes via 2 codepaths:
>>>>>> single-partition and multi-partition. Both need to be handled - we need 
>>>>>> to
>>>>>> get a PCollection<KV<DestinationT, ?>> from each of them, and Flatten 
>>>>>> these
>>>>>> two PCollections together to get the final result. The single-partition
>>>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>>>> that returns a KV<DestinationT, ...> so it's directly usable. The
>>>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, 
>>>>>> this
>>>>>> codepath drops DestinationT along the way and will need to be refactored 
>>>>>> a
>>>>>> bit to keep it until the end.
>>>>>> ...- expandUntriggered() should be treated the same way.
>>>>>> - Modify the StreamingWriteTables transform to provide it
>>>>>> ...- Here also, the challenge is to propagate the DestinationT type
>>>>>> all the way until the end of StreamingWriteTables - it will need to be
>>>>>> refactored. After such a refactoring, returning a KV<DestinationT, ...>
>>>>>> should be easy.
>>>>>>
>>>>>> Another challenge with all of this is backwards compatibility in
>>>>>> terms of API and pipeline update.
>>>>>> Pipeline update is much less of a concern for the BatchLoads
>>>>>> codepath, because it's typically used in batch-mode pipelines that don't
>>>>>> get updated. I would recommend to start with this, perhaps even with only
>>>>>> the untriggered codepath (it is much more commonly used) - that will pave
>>>>>> the way for future work.
>>>>>>
>>>>>> Hope this helps, please ask more if something is unclear!
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Eugene!!
>>>>>>>
>>>>>>> I’d gladly take a stab on it although I’m not sure how much
>>>>>>> available time I might have to put into but... yeah, let’s try it.
>>>>>>>
>>>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Yes, you're both right - BigQueryIO.write() is currently not
>>>>>>>> implemented in a way that it can be used with Wait.on(). It would 
>>>>>>>> certainly
>>>>>>>> be a welcome contribution to change this - many people expressed 
>>>>>>>> interest
>>>>>>>> in specifically waiting for BigQuery writes. Is any of you interested 
>>>>>>>> in
>>>>>>>> helping out?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Simon, I think your explanation was very accurate, at least to
>>>>>>>>> my understanding. I'd also be interested in getting batch load 
>>>>>>>>> result's
>>>>>>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>>>>>>> otherwise we could propose submitting a Jira, or even better, a PR!! 
>>>>>>>>> :)
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>>>>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I need to write some data to BigQuery (batch-mode) and then send
>>>>>>>>>> a Pubsub message to trigger further processing.
>>>>>>>>>>
>>>>>>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>>>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>>>>>>
>>>>>>>>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>>>>>>>>
>>>>>>>>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>>>>>>>>
>>>>>>>>>> Unfortunately, it appears that the Wait.on transform does not
>>>>>>>>>> work with BiqQueryIO in FILE_LOADS mode - or at least I cannot get 
>>>>>>>>>> it to
>>>>>>>>>> work. Advice appreciated.
>>>>>>>>>>
>>>>>>>>>> Here's (most of) the relevant test code:
>>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>>         PCollection<String> lines = p.apply("Read Input",
>>>>>>>>>> Create.of("line1", "line2", "line3", "line4"));
>>>>>>>>>>
>>>>>>>>>>         TableFieldSchema f1 = new
>>>>>>>>>> TableFieldSchema().setName("value").setType("string");
>>>>>>>>>>         TableSchema s2 = new
>>>>>>>>>> TableSchema().setFields(Collections.singletonList(f1));
>>>>>>>>>>
>>>>>>>>>>         WriteResult writeResult = lines.apply("Write and load
>>>>>>>>>> data", BigQueryIO.<String>write() //
>>>>>>>>>>                 .to(options.getTableSpec()) //
>>>>>>>>>>                 .withFormatFunction(new SlowFormatter()) //
>>>>>>>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>>>>>>>>> //
>>>>>>>>>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
>>>>>>>>>>                 .withSchema(s2)
>>>>>>>>>>
>>>>>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>>>>> //
>>>>>>>>>>
>>>>>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>>>>>>>>> OnCompletion()));
>>>>>>>>>>
>>>>>>>>>> where
>>>>>>>>>> + format-function "SlowFormatter" prints out each line and has a
>>>>>>>>>> small sleep for testing purposes, and
>>>>>>>>>> + DoFn OnCompletion just prints out the contents of each line
>>>>>>>>>>
>>>>>>>>>> In production code, OnCompletion would be fed some collection
>>>>>>>>>> derived from lines, eg min/max record id, and the operation would be 
>>>>>>>>>> "send
>>>>>>>>>> pubsub message" rather than print..
>>>>>>>>>>
>>>>>>>>>> My expectation is that the "SlowFormatter" would run for each
>>>>>>>>>> line, then the data would be uploaded, then OnCompletion would print 
>>>>>>>>>> each
>>>>>>>>>> line. And indeed that happens when STREAMING_INSERTS is used. 
>>>>>>>>>> However for
>>>>>>>>>> FILE_LOADS, LinePrinter runs before the upload takes place.
>>>>>>>>>>
>>>>>>>>>> I use WriteResult.getFailedInserts as that is the only "output"
>>>>>>>>>> that BiqQueryIO.write() generates AFAICT. I don't expect any failed
>>>>>>>>>> records, but believe that it can be used as a "signal" for the 
>>>>>>>>>> Wait.on - ie
>>>>>>>>>> the output is "complete for window" only after all data has been 
>>>>>>>>>> uploaded,
>>>>>>>>>> which is what I need. And that does seem to work for STREAMING_LOADS.
>>>>>>>>>>
>>>>>>>>>> I suspect the reason that this does not work for FILE_LOADS is
>>>>>>>>>> that method BatchLoads.writeResult returns a WriteResult that wraps 
>>>>>>>>>> an
>>>>>>>>>> "empty" failedInserts collection, ie data which is not connected to 
>>>>>>>>>> the
>>>>>>>>>> batch-load-job that is triggered:
>>>>>>>>>>   private WriteResult writeResult(Pipeline p) {
>>>>>>>>>>     PCollection<TableRow> empty =
>>>>>>>>>>         p.apply("CreateEmptyFailedInserts",
>>>>>>>>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>>>>>>>>>     return WriteResult.in(p, new TupleTag<>("failedInserts"),
>>>>>>>>>> empty);
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> Note that BatchLoads does "synchronously" invoke BigQuery load
>>>>>>>>>> jobs; once a job is submitted the code repeatedly polls the job 
>>>>>>>>>> status
>>>>>>>>>> until it reaches DONE or FAILED. However that information does not 
>>>>>>>>>> appear
>>>>>>>>>> to be exposed anywhere (unlike streaming which effectively exposes
>>>>>>>>>> completion-state via the failedInserts stream).
>>>>>>>>>>
>>>>>>>>>> If I have misunderstood something, corrections welcome! If not,
>>>>>>>>>> suggestions for workarounds or alternate solutions are also welcome 
>>>>>>>>>> :-)
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Simon
>>>>>>>>>>
>>>>>>>>>>

Reply via email to