[ https://issues.apache.org/jira/browse/BEAM-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169069#comment-17169069 ]
Chamikara Madhusanka Jayalath commented on BEAM-10248: ------------------------------------------------------ Fix for [1] was submitted and I confirmed that [2] is the intended behavior (we retry three times and fail for batch but will keep retrying the workitem for Dataflow streaming). Resolving this. [https://github.com/apache/beam/pull/12431] As mentioned before [3] is a known limitation of BQ sink. > Beam does not set correct region for BigQuery when requesting load job status > ----------------------------------------------------------------------------- > > Key: BEAM-10248 > URL: https://issues.apache.org/jira/browse/BEAM-10248 > Project: Beam > Issue Type: Bug > Components: io-java-gcp > Affects Versions: 2.22.0 > Environment: -Beam 2.22.0 > -DirectRunner & DataflowRunner > -Java 11 > Reporter: Graham Polley > Assignee: Chamikara Madhusanka Jayalath > Priority: P1 > Attachments: Untitled document.pdf > > Time Spent: 40m > Remaining Estimate: 0h > > I am using `FILE_LOADS` (micro-batching) from Pub/Sub and writing to > BigQuery. My BigQuery dataset is in region `australia-southeast1`. > If the load job into BigQuery fails for some reason (e.g. table does not > exist, or schema has changed), then an error from BigQuery is returned for > the load. > Beam then enters into a loop of retrying the load job. However, instead of > using a new job id (where the suffix is incremented by 1), it wrongly tries > to reinsert the job using the same job id. This is because when it tries to > look up if the job id already exists, it does not take into consideration the > region where the dataset is. Instead, it defaults to the US but the job was > created in `australia-southeast1`, so it returns `null`. > Bug is on LN308 of `BigQueryHelpers.java`. It should not return `null`. It > returns `null` because it's looking in the wrong region. > If I *test* using a dataset in BigQuery that is in the US region, then it > correctly finds the job id and begins to retry the job with a new job id > (suffixed with the retry count). We have seen this bug in other areas of Beam > before and in other tools/services on GCP e.g. Cloud Composer. > However, that leads me to my next problem/bug. Even if that is fixed, the > number of retries is set to `Integer.MX_VALUE`, so Beam will keep retrying > the job 2,147,483,647 times. This is not good. > The exception is swallowed up by the Beam SDK and never propagated back up > the stack for users to catch and handle. So, if a load job fails there is no > way to handle it and react for users. > I attempted to set `withFailedInsertRetryPolicy()` to transient errors only, > but it is not supported with `FILE_LOADS`. I also tried, using the > `WriteResult` object returned from the bigQuery sink/write, to get a handle > on the error but it does not work. Users need a way to catch and catch failed > load jobs when using `FILE_LOADS`. > > {code:java} > public class TemplatePipeline { > private static final String TOPIC = > "projects/etl-demo-269105/topics/test-micro-batching"; > private static final String BIGQUERY_DESTINATION_FILTERED = > "etl-demo-269105:etl_demo_fun.micro_batch_test_xxx"; > public static void main(String[] args) throws Exception { > try { > PipelineOptionsFactory.register(DataflowPipelineOptions.class); > DataflowPipelineOptions options = PipelineOptionsFactory > .fromArgs(args) > .withoutStrictParsing() > .as(DataflowPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection<PubsubMessage> messages = pipeline > .apply(PubsubIO.readMessagesWithAttributes().fromTopic(String.format(TOPIC, > options.getProject()))) > .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1)))); > WriteResult result = messages.apply(ParDo.of(new RowToBQRow())) > .apply(BigQueryIO.writeTableRows() > .to(String.format(BIGQUERY_DESTINATION_FILTERED, options.getProject())) > .withCreateDisposition(CREATE_NEVER) > .withWriteDisposition(WRITE_APPEND) > .withMethod(BigQueryIO.Write.Method.FILE_LOADS) > .withTriggeringFrequency(Duration.standardSeconds(5)) > .withNumFileShards(1) > .withExtendedErrorInfo() > .withSchema(getTableSchema())); > // result.getFailedInsertsWithErr().apply(ParDo.of(new > DoFn<BigQueryInsertError, String>() { > // @ProcessElement > // public void processElement(ProcessContext c) { > // for(ErrorProto err : c.element().getError().getErrors()){ > // throw new RuntimeException(err.getMessage()); > // } > // > // } > // })); > result.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, String>() { > @ProcessElement > public void processElement(ProcessContext c) { > System.out.println(c.element()); > c.output("foo"); > throw new RuntimeException("Failed to load"); > } > @FinishBundle > public void finishUp(FinishBundleContext finishBundleContextc){ > System.out.println("Got here"); > } > })); > pipeline.run(); > } catch (Exception e) { > e.printStackTrace(); > throw new Exception(e); > } > } > private static TableSchema getTableSchema() { > List<TableFieldSchema> fields = new ArrayList<>(); > fields.add(new TableFieldSchema().setName("timestamp").setType("INTEGER")); > fields.add(new TableFieldSchema().setName("payload").setType("STRING")); > return new TableSchema().setFields(fields); > } > public static class RowToBQRow extends DoFn<PubsubMessage, TableRow> { > @ProcessElement > public void processElement(ProcessContext c) { > String payload = new String(c.element().getPayload(), > StandardCharsets.UTF_8); > c.output(new TableRow() > .set("timestamp", System.currentTimeMillis()) > .set("payload", payload) > ); > } > } > }{code} > Stack trace is attached showing problem of same job id being used/inserted to > BigQuery on each retry. > {noformat} > {noformat} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)