[ 
https://issues.apache.org/jira/browse/BEAM-13464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17483664#comment-17483664
 ] 

Thorsten Kienle commented on BEAM-13464:
----------------------------------------

[~chamikara] : In my opinion BigQueryIO should emit the successfully written 
records, otherwise the WriteResult.getSuccessfulInserts() wouldn't make much 
sense.

 

org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.BatchAndInsertElements#finishBundle
 should probably be extended with the following code:
{code:java}
for (ValueInSingleWindow<TableRow> row : successfulInserts) {
  context.output(SUCCESSFUL_ROWS_TAG, row.getValue(), row.getTimestamp(), 
row.getWindow());
} {code}

> BigQueryIO.write() never emits successfully written records
> -----------------------------------------------------------
>
>                 Key: BEAM-13464
>                 URL: https://issues.apache.org/jira/browse/BEAM-13464
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.34.0
>            Reporter: Thorsten Kienle
>            Priority: P2
>
> In the following code snippet the data gets successfully inserted into 
> BigQuery. But the "getSuccessfulInserts" Transform never returns any data. No 
> log is written.
> {code:java}
> WriteResult writeResultCategory =
>             categoryWithHeader
>                     .apply("BQ Insert Category",
>                             BigQueryIO.<CategoryWithHeader>write()
>                                     
> .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
>                                     
> .withWriteDisposition(WriteDisposition.WRITE_APPEND)
>                                     
> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>                                     .withExtendedErrorInfo()
>                                     
> .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
>                                     .useBeamSchema()
>                                     .to(getTableReference(tableCategory))
>                     );
>  writeResultCategory
>             .getSuccessfulInserts()
>             .apply("Log successful inserts", ParDo.of(new DoFn<TableRow, 
> Void>() {
>                 @ProcessElement
>                 public void process(ProcessContext c) {
>                     LOG.info("Successfully inserted Category: " + 
> c.element());
>                 }
>             })); {code}
> The problematic code might be in
> {noformat}
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.BatchAndInsertElements#finishBundle{noformat}
> The successfulInserts are never emitted from here.
> Please compare this behaviour with the following method where the 
> successfulInserts are emitted:
> {noformat}
> org.apache.beam.sdk.io.gcp.bigquery.BatchedStreamingWrite.InsertBatchedElements#processElement{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to