Ashutosh Dixit created BEAM-12633:
-------------------------------------
Summary: How to get failed insert record for file load insertion
in BigQuery.
Key: BEAM-12633
URL: https://issues.apache.org/jira/browse/BEAM-12633
Project: Beam
Issue Type: Improvement
Components: beam-community, io-java-gcp
Reporter: Ashutosh Dixit
I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load
method (File loads). I want to retrieve those records which failed during
insertion.
Is it possible to have a retry policy on failed records?
Actually getting error after retying 1000 times.
Below is my code:
{{}}
{code:java}
public static void insertToBigQueryDataLake(
final PCollectionTuple dataStoresCollectionTuple,
final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
final Long loadJobTriggerFrequency,
final Integer loadJobNumShard) {
WriteResult writeResult = dataStoresCollectionTuple
.get(dataLakeValidTag)
.apply(TRANSFORMATION_NAME,
DataLakeTableProcessor.dataLakeTableProcessorTransform())
.apply(
WRITING_EVENTS_NAME,
BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
.withNumFileShards(loadJobNumShard)
.to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
.withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
@ProcessElement
public void processElement(final ProcessContext processContext) throws
IOException {
System.out.println("Table Row : " +
processContext.element().toPrettyString());
}
}));
}{code}
{{}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)