Ashutosh Dixit created BEAM-12633:
-------------------------------------

             Summary: How to get failed insert record for file load insertion 
in BigQuery.
                 Key: BEAM-12633
                 URL: https://issues.apache.org/jira/browse/BEAM-12633
             Project: Beam
          Issue Type: Improvement
          Components: beam-community, io-java-gcp
            Reporter: Ashutosh Dixit


I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load 
method (File loads). I want to retrieve those records which failed during 
insertion.

Is it possible to have a retry policy on failed records?

Actually getting error after retying 1000 times.

Below is my code:

 

{{}}
{code:java}
public static void insertToBigQueryDataLake(
        final PCollectionTuple dataStoresCollectionTuple,
        final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
        final Long loadJobTriggerFrequency,
        final Integer loadJobNumShard) {


    WriteResult writeResult = dataStoresCollectionTuple
            .get(dataLakeValidTag)
            .apply(TRANSFORMATION_NAME, 
DataLakeTableProcessor.dataLakeTableProcessorTransform())
            .apply(
                    WRITING_EVENTS_NAME,
                    BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
                            .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                            
.withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
                            .withNumFileShards(loadJobNumShard)
                            .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
                            
.withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));

    writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
        @ProcessElement
        public void processElement(final ProcessContext processContext) throws 
IOException {
            System.out.println("Table Row : " + 
processContext.element().toPrettyString());
        }
    }));

}{code}
{{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to