[ 
https://issues.apache.org/jira/browse/BEAM-10248?focusedWorklogId=464915&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-464915
 ]

ASF GitHub Bot logged work on BEAM-10248:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/20 07:24
            Start Date: 31/Jul/20 07:24
    Worklog Time Spent: 10m 
      Work Description: chamikaramj opened a new pull request #12431:
URL: https://github.com/apache/beam/pull/12431


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
 | ---
   XLang | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
 | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 464915)
    Remaining Estimate: 0h
            Time Spent: 10m

> Beam does not set correct region for BigQuery when requesting load job status
> -----------------------------------------------------------------------------
>
>                 Key: BEAM-10248
>                 URL: https://issues.apache.org/jira/browse/BEAM-10248
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.22.0
>         Environment: -Beam 2.22.0
> -DirectRunner & DataflowRunner
> -Java 11
>            Reporter: Graham Polley
>            Assignee: Chamikara Madhusanka Jayalath
>            Priority: P1
>         Attachments: Untitled document.pdf
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I am using `FILE_LOADS` (micro-batching) from Pub/Sub and writing to 
> BigQuery. My BigQuery dataset is in region `australia-southeast1`.
> If the load job into BigQuery fails for some reason (e.g. table does not 
> exist, or schema has changed), then an error from BigQuery is returned for 
> the load.
> Beam then enters into a loop of retrying the load job. However, instead of 
> using a new job id (where the suffix is incremented by 1), it wrongly tries 
> to reinsert the job using the same job id. This is because when it tries to 
> look up if the job id already exists, it does not take into consideration the 
> region where the dataset is. Instead, it defaults to the US but the job was 
> created in `australia-southeast1`, so it returns `null`.
> Bug is on LN308 of `BigQueryHelpers.java`. It should not return `null`. It 
> returns `null` because it's looking in the wrong region.
> If I *test* using a dataset in BigQuery that is in the US region, then it 
> correctly finds the job id and begins to retry the job with a new job id 
> (suffixed with the retry count). We have seen this bug in other areas of Beam 
> before and in other tools/services on GCP e.g. Cloud Composer.
> However, that leads me to my next problem/bug. Even if that is fixed, the 
> number of retries is set to `Integer.MX_VALUE`, so Beam will keep retrying 
> the job 2,147,483,647 times. This is not good.
> The exception is swallowed up by the Beam SDK and never propagated back up 
> the stack for users to catch and handle. So, if a load job fails there is no 
> way to handle it and react for users.
> I attempted to set `withFailedInsertRetryPolicy()` to transient errors only, 
> but it is not supported with `FILE_LOADS`. I also tried, using the 
> `WriteResult` object returned from the bigQuery sink/write, to get a handle 
> on the error but it does not work. Users need a way to catch and catch failed 
> load jobs when using `FILE_LOADS`.
>  
> {code:java}
> public class TemplatePipeline {
>  private static final String TOPIC = 
> "projects/etl-demo-269105/topics/test-micro-batching";
>  private static final String BIGQUERY_DESTINATION_FILTERED = 
> "etl-demo-269105:etl_demo_fun.micro_batch_test_xxx";
>  public static void main(String[] args) throws Exception {
>  try {
>  PipelineOptionsFactory.register(DataflowPipelineOptions.class);
>  DataflowPipelineOptions options = PipelineOptionsFactory
>  .fromArgs(args)
>  .withoutStrictParsing()
>  .as(DataflowPipelineOptions.class);
>  Pipeline pipeline = Pipeline.create(options);
>  PCollection<PubsubMessage> messages = pipeline
>  .apply(PubsubIO.readMessagesWithAttributes().fromTopic(String.format(TOPIC, 
> options.getProject())))
>  .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
>  WriteResult result = messages.apply(ParDo.of(new RowToBQRow()))
>  .apply(BigQueryIO.writeTableRows()
>  .to(String.format(BIGQUERY_DESTINATION_FILTERED, options.getProject()))
>  .withCreateDisposition(CREATE_NEVER)
>  .withWriteDisposition(WRITE_APPEND)
>  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>  .withTriggeringFrequency(Duration.standardSeconds(5))
>  .withNumFileShards(1)
>  .withExtendedErrorInfo()
>  .withSchema(getTableSchema()));
> // result.getFailedInsertsWithErr().apply(ParDo.of(new 
> DoFn<BigQueryInsertError, String>() {
> // @ProcessElement
> // public void processElement(ProcessContext c) {
> // for(ErrorProto err : c.element().getError().getErrors()){
> // throw new RuntimeException(err.getMessage());
> // }
> //
> // }
> // }));
>  result.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, String>() {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  System.out.println(c.element());
>  c.output("foo");
>  throw new RuntimeException("Failed to load");
>  }
>  @FinishBundle
>  public void finishUp(FinishBundleContext finishBundleContextc){
>  System.out.println("Got here");
>  }
>  }));
>  pipeline.run();
>  } catch (Exception e) {
>  e.printStackTrace();
>  throw new Exception(e);
>  }
>  }
>  private static TableSchema getTableSchema() {
>  List<TableFieldSchema> fields = new ArrayList<>();
>  fields.add(new TableFieldSchema().setName("timestamp").setType("INTEGER"));
>  fields.add(new TableFieldSchema().setName("payload").setType("STRING"));
>  return new TableSchema().setFields(fields);
>  }
>  public static class RowToBQRow extends DoFn<PubsubMessage, TableRow> {
>  @ProcessElement
>  public void processElement(ProcessContext c) {
>  String payload = new String(c.element().getPayload(), 
> StandardCharsets.UTF_8);
>  c.output(new TableRow()
>  .set("timestamp", System.currentTimeMillis())
>  .set("payload", payload)
>  );
>  }
>  }
> }{code}
> Stack trace is attached showing problem of same job id being used/inserted to 
> BigQuery on each retry.
> {noformat}
>  {noformat}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to