[
https://issues.apache.org/jira/browse/BEAM-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288986#comment-17288986
]
Matteo Martignon commented on BEAM-10248:
-----------------------------------------
Yes, it might be more comfortable to handle these errors in a programmatic way
in a similar way as the one implemented for STREAMING INSERT. [~chamikara], do
you think it can represent a new possible feature in the next months?
Thanks
> Beam does not set correct region for BigQuery when requesting load job status
> -----------------------------------------------------------------------------
>
> Key: BEAM-10248
> URL: https://issues.apache.org/jira/browse/BEAM-10248
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.22.0
> Environment: -Beam 2.22.0
> -DirectRunner & DataflowRunner
> -Java 11
> Reporter: Graham Polley
> Assignee: Chamikara Madhusanka Jayalath
> Priority: P1
> Fix For: 2.24.0
>
> Attachments: Untitled document.pdf
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> I am using `FILE_LOADS` (micro-batching) from Pub/Sub and writing to
> BigQuery. My BigQuery dataset is in region `australia-southeast1`.
> If the load job into BigQuery fails for some reason (e.g. table does not
> exist, or schema has changed), then an error from BigQuery is returned for
> the load.
> Beam then enters into a loop of retrying the load job. However, instead of
> using a new job id (where the suffix is incremented by 1), it wrongly tries
> to reinsert the job using the same job id. This is because when it tries to
> look up if the job id already exists, it does not take into consideration the
> region where the dataset is. Instead, it defaults to the US but the job was
> created in `australia-southeast1`, so it returns `null`.
> Bug is on LN308 of `BigQueryHelpers.java`. It should not return `null`. It
> returns `null` because it's looking in the wrong region.
> If I *test* using a dataset in BigQuery that is in the US region, then it
> correctly finds the job id and begins to retry the job with a new job id
> (suffixed with the retry count). We have seen this bug in other areas of Beam
> before and in other tools/services on GCP e.g. Cloud Composer.
> However, that leads me to my next problem/bug. Even if that is fixed, the
> number of retries is set to `Integer.MX_VALUE`, so Beam will keep retrying
> the job 2,147,483,647 times. This is not good.
> The exception is swallowed up by the Beam SDK and never propagated back up
> the stack for users to catch and handle. So, if a load job fails there is no
> way to handle it and react for users.
> I attempted to set `withFailedInsertRetryPolicy()` to transient errors only,
> but it is not supported with `FILE_LOADS`. I also tried, using the
> `WriteResult` object returned from the bigQuery sink/write, to get a handle
> on the error but it does not work. Users need a way to catch and catch failed
> load jobs when using `FILE_LOADS`.
>
> {code:java}
> public class TemplatePipeline {
> private static final String TOPIC =
> "projects/etl-demo-269105/topics/test-micro-batching";
> private static final String BIGQUERY_DESTINATION_FILTERED =
> "etl-demo-269105:etl_demo_fun.micro_batch_test_xxx";
> public static void main(String[] args) throws Exception {
> try {
> PipelineOptionsFactory.register(DataflowPipelineOptions.class);
> DataflowPipelineOptions options = PipelineOptionsFactory
> .fromArgs(args)
> .withoutStrictParsing()
> .as(DataflowPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<PubsubMessage> messages = pipeline
> .apply(PubsubIO.readMessagesWithAttributes().fromTopic(String.format(TOPIC,
> options.getProject())))
> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
> WriteResult result = messages.apply(ParDo.of(new RowToBQRow()))
> .apply(BigQueryIO.writeTableRows()
> .to(String.format(BIGQUERY_DESTINATION_FILTERED, options.getProject()))
> .withCreateDisposition(CREATE_NEVER)
> .withWriteDisposition(WRITE_APPEND)
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardSeconds(5))
> .withNumFileShards(1)
> .withExtendedErrorInfo()
> .withSchema(getTableSchema()));
> // result.getFailedInsertsWithErr().apply(ParDo.of(new
> DoFn<BigQueryInsertError, String>() {
> // @ProcessElement
> // public void processElement(ProcessContext c) {
> // for(ErrorProto err : c.element().getError().getErrors()){
> // throw new RuntimeException(err.getMessage());
> // }
> //
> // }
> // }));
> result.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, String>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> System.out.println(c.element());
> c.output("foo");
> throw new RuntimeException("Failed to load");
> }
> @FinishBundle
> public void finishUp(FinishBundleContext finishBundleContextc){
> System.out.println("Got here");
> }
> }));
> pipeline.run();
> } catch (Exception e) {
> e.printStackTrace();
> throw new Exception(e);
> }
> }
> private static TableSchema getTableSchema() {
> List<TableFieldSchema> fields = new ArrayList<>();
> fields.add(new TableFieldSchema().setName("timestamp").setType("INTEGER"));
> fields.add(new TableFieldSchema().setName("payload").setType("STRING"));
> return new TableSchema().setFields(fields);
> }
> public static class RowToBQRow extends DoFn<PubsubMessage, TableRow> {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String payload = new String(c.element().getPayload(),
> StandardCharsets.UTF_8);
> c.output(new TableRow()
> .set("timestamp", System.currentTimeMillis())
> .set("payload", payload)
> );
> }
> }
> }{code}
> Stack trace is attached showing problem of same job id being used/inserted to
> BigQuery on each retry.
> {noformat}
> {noformat}
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)