[ 
https://issues.apache.org/jira/browse/BEAM-10248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169069#comment-17169069
 ] 

Chamikara Madhusanka Jayalath commented on BEAM-10248:
------------------------------------------------------

Fix for [1] was submitted and I confirmed that [2] is the intended behavior (we 
retry three times and fail for batch but will keep retrying the workitem for 
Dataflow streaming).  Resolving this.

[https://github.com/apache/beam/pull/12431]

 

As mentioned before [3] is a known limitation of BQ sink.

> Beam does not set correct region for BigQuery when requesting load job status
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-10248
>                 URL: https://issues.apache.org/jira/browse/BEAM-10248
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.22.0
>         Environment: -Beam 2.22.0
> -DirectRunner & DataflowRunner
> -Java 11
>            Reporter: Graham Polley
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>         Attachments: Untitled document.pdf
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> I am using `FILE_LOADS` (micro-batching) from Pub/Sub and writing to 
> BigQuery. My BigQuery dataset is in region `australia-southeast1`.
> If the load job into BigQuery fails for some reason (e.g. table does not 
> exist, or schema has changed), then an error from BigQuery is returned for 
> the load.
> Beam then enters into a loop of retrying the load job. However, instead of 
> using a new job id (where the suffix is incremented by 1), it wrongly tries 
> to reinsert the job using the same job id. This is because when it tries to 
> look up if the job id already exists, it does not take into consideration the 
> region where the dataset is. Instead, it defaults to the US but the job was 
> created in `australia-southeast1`, so it returns `null`.
> Bug is on LN308 of `BigQueryHelpers.java`. It should not return `null`. It 
> returns `null` because it's looking in the wrong region.
> If I *test* using a dataset in BigQuery that is in the US region, then it 
> correctly finds the job id and begins to retry the job with a new job id 
> (suffixed with the retry count). We have seen this bug in other areas of Beam 
> before and in other tools/services on GCP e.g. Cloud Composer.
> However, that leads me to my next problem/bug. Even if that is fixed, the 
> number of retries is set to `Integer.MX_VALUE`, so Beam will keep retrying 
> the job 2,147,483,647 times. This is not good.
> The exception is swallowed up by the Beam SDK and never propagated back up 
> the stack for users to catch and handle. So, if a load job fails there is no 
> way to handle it and react for users.
> I attempted to set `withFailedInsertRetryPolicy()` to transient errors only, 
> but it is not supported with `FILE_LOADS`. I also tried, using the 
> `WriteResult` object returned from the bigQuery sink/write, to get a handle 
> on the error but it does not work. Users need a way to catch and catch failed 
> load jobs when using `FILE_LOADS`.
>  
> {code:java}
> public class TemplatePipeline {
>  private static final String TOPIC = 
> "projects/etl-demo-269105/topics/test-micro-batching";
>  private static final String BIGQUERY_DESTINATION_FILTERED = 
> "etl-demo-269105:etl_demo_fun.micro_batch_test_xxx";
>  public static void main(String[] args) throws Exception {
>  try {
>  PipelineOptionsFactory.register(DataflowPipelineOptions.class);
>  DataflowPipelineOptions options = PipelineOptionsFactory
>  .fromArgs(args)
>  .withoutStrictParsing()
>  .as(DataflowPipelineOptions.class);
>  Pipeline pipeline = Pipeline.create(options);
>  PCollection<PubsubMessage> messages = pipeline
>  .apply(PubsubIO.readMessagesWithAttributes().fromTopic(String.format(TOPIC, 
> options.getProject())))
>  .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
>  WriteResult result = messages.apply(ParDo.of(new RowToBQRow()))
>  .apply(BigQueryIO.writeTableRows()
>  .to(String.format(BIGQUERY_DESTINATION_FILTERED, options.getProject()))
>  .withCreateDisposition(CREATE_NEVER)
>  .withWriteDisposition(WRITE_APPEND)
>  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>  .withTriggeringFrequency(Duration.standardSeconds(5))
>  .withNumFileShards(1)
>  .withExtendedErrorInfo()
>  .withSchema(getTableSchema()));
> // result.getFailedInsertsWithErr().apply(ParDo.of(new 
> DoFn<BigQueryInsertError, String>() {
> // @ProcessElement
> // public void processElement(ProcessContext c) {
> // for(ErrorProto err : c.element().getError().getErrors()){
> // throw new RuntimeException(err.getMessage());
> // }
> //
> // }
> // }));
>  result.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, String>() {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  System.out.println(c.element());
>  c.output("foo");
>  throw new RuntimeException("Failed to load");
>  }
>  @FinishBundle
>  public void finishUp(FinishBundleContext finishBundleContextc){
>  System.out.println("Got here");
>  }
>  }));
>  pipeline.run();
>  } catch (Exception e) {
>  e.printStackTrace();
>  throw new Exception(e);
>  }
>  }
>  private static TableSchema getTableSchema() {
>  List<TableFieldSchema> fields = new ArrayList<>();
>  fields.add(new TableFieldSchema().setName("timestamp").setType("INTEGER"));
>  fields.add(new TableFieldSchema().setName("payload").setType("STRING"));
>  return new TableSchema().setFields(fields);
>  }
>  public static class RowToBQRow extends DoFn<PubsubMessage, TableRow> {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  String payload = new String(c.element().getPayload(), 
> StandardCharsets.UTF_8);
>  c.output(new TableRow()
>  .set("timestamp", System.currentTimeMillis())
>  .set("payload", payload)
>  );
>  }
>  }
> }{code}
> Stack trace is attached showing problem of same job id being used/inserted to 
> BigQuery on each retry.
> {noformat}
>  {noformat}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to