[
https://issues.apache.org/jira/browse/BEAM-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17476500#comment-17476500
]
Kenneth Knowles commented on BEAM-12633:
----------------------------------------
Is there any functionality for this [~reuvenlax] [~chamikara]
> How to get failed insert record for file load insertion in BigQuery.
> --------------------------------------------------------------------
>
> Key: BEAM-12633
> URL: https://issues.apache.org/jira/browse/BEAM-12633
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Ashutosh Dixit
> Priority: P2
>
> I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch
> load method (File loads). I want to retrieve those records which failed
> during insertion.
> Is it possible to have a retry policy on failed records?
> Actually getting error after retying 1000 times.
> Below is my code:
>
> {{}}
> {code:java}
> public static void insertToBigQueryDataLake(
> final PCollectionTuple dataStoresCollectionTuple,
> final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
> final Long loadJobTriggerFrequency,
> final Integer loadJobNumShard) {
> WriteResult writeResult = dataStoresCollectionTuple
> .get(dataLakeValidTag)
> .apply(TRANSFORMATION_NAME,
> DataLakeTableProcessor.dataLakeTableProcessorTransform())
> .apply(
> WRITING_EVENTS_NAME,
> BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>
> .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
> .withNumFileShards(loadJobNumShard)
> .to(new
> DynamicTableRowDestinations<>(IS_DATA_LAKE))
>
> .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
> writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
> @ProcessElement
> public void processElement(final ProcessContext processContext)
> throws IOException {
> System.out.println("Table Row : " +
> processContext.element().toPrettyString());
> }
> }));
> }{code}
> {{}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)