[ 
https://issues.apache.org/jira/browse/BEAM-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416816#comment-17416816
 ] 

Beam JIRA Bot commented on BEAM-12633:
--------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it 
has been labeled "stale-P2". If this issue is still affecting you, we care! 
Please comment and remove the label. Otherwise, in 14 days the issue will be 
moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed 
explanation of what these priorities mean.


> How to get failed insert record for file load insertion in BigQuery.
> --------------------------------------------------------------------
>
>                 Key: BEAM-12633
>                 URL: https://issues.apache.org/jira/browse/BEAM-12633
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-community, io-java-gcp
>            Reporter: Ashutosh Dixit
>            Priority: P2
>              Labels: stale-P2
>
> I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch 
> load method (File loads). I want to retrieve those records which failed 
> during insertion.
> Is it possible to have a retry policy on failed records?
> Actually getting error after retying 1000 times.
> Below is my code:
>  
> {{}}
> {code:java}
> public static void insertToBigQueryDataLake(
>         final PCollectionTuple dataStoresCollectionTuple,
>         final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
>         final Long loadJobTriggerFrequency,
>         final Integer loadJobNumShard) {
>     WriteResult writeResult = dataStoresCollectionTuple
>             .get(dataLakeValidTag)
>             .apply(TRANSFORMATION_NAME, 
> DataLakeTableProcessor.dataLakeTableProcessorTransform())
>             .apply(
>                     WRITING_EVENTS_NAME,
>                     BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
>                             .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                             
> .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
>                             .withNumFileShards(loadJobNumShard)
>                             .to(new 
> DynamicTableRowDestinations<>(IS_DATA_LAKE))
>                             
> .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
>     writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
>         @ProcessElement
>         public void processElement(final ProcessContext processContext) 
> throws IOException {
>             System.out.println("Table Row : " + 
> processContext.element().toPrettyString());
>         }
>     }));
> }{code}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to