[ 
https://issues.apache.org/jira/browse/BEAM-5040?focusedWorklogId=129407&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-129407
 ]

ASF GitHub Bot logged work on BEAM-5040:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jul/18 17:57
            Start Date: 31/Jul/18 17:57
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6080: [BEAM-5040] Fix 
retry bug for BigQuery jobs.
URL: https://github.com/apache/beam/pull/6080
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 160a4971f31..16c89e7f781 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -111,8 +111,7 @@
   // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job 
finishes.
   static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-  // The maximum number of retry jobs.
-  static final int MAX_RETRY_JOBS = 3;
+  static final int DEFAULT_MAX_RETRY_JOBS = 3;
 
   private BigQueryServices bigQueryServices;
   private final WriteDisposition writeDisposition;
@@ -129,6 +128,9 @@
   private ValueProvider<String> customGcsTempLocation;
   private ValueProvider<String> loadJobProjectId;
 
+  // The maximum number of times to retry failed load or copy jobs.
+  private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
+
   BatchLoads(
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
@@ -169,6 +171,14 @@ public void setTriggeringFrequency(Duration 
triggeringFrequency) {
     this.triggeringFrequency = triggeringFrequency;
   }
 
+  public int getMaxRetryJobs() {
+    return maxRetryJobs;
+  }
+
+  public void setMaxRetryJobs(int maxRetryJobs) {
+    this.maxRetryJobs = maxRetryJobs;
+  }
+
   public void setNumFileShards(int numFileShards) {
     this.numFileShards = numFileShards;
   }
@@ -287,7 +297,11 @@ private WriteResult 
expandTriggered(PCollection<KV<DestinationT, TableRow>> inpu
             "WriteRenameTriggered",
             ParDo.of(
                     new WriteRename(
-                        bigQueryServices, loadJobIdPrefixView, 
writeDisposition, createDisposition))
+                        bigQueryServices,
+                        loadJobIdPrefixView,
+                        writeDisposition,
+                        createDisposition,
+                        maxRetryJobs))
                 .withSideInputs(loadJobIdPrefixView));
     writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView);
     return writeResult(p);
@@ -343,7 +357,11 @@ public WriteResult 
expandUntriggered(PCollection<KV<DestinationT, TableRow>> inp
             "WriteRenameUntriggered",
             ParDo.of(
                     new WriteRename(
-                        bigQueryServices, loadJobIdPrefixView, 
writeDisposition, createDisposition))
+                        bigQueryServices,
+                        loadJobIdPrefixView,
+                        writeDisposition,
+                        createDisposition,
+                        maxRetryJobs))
                 .withSideInputs(loadJobIdPrefixView));
     writeSinglePartition(partitions.get(singlePartitionTag), 
loadJobIdPrefixView);
     return writeResult(p);
@@ -513,7 +531,8 @@ public void processElement(ProcessContext c) {
                 CreateDisposition.CREATE_IF_NEEDED,
                 sideInputs,
                 dynamicDestinations,
-                loadJobProjectId));
+                loadJobProjectId,
+                maxRetryJobs));
   }
 
   // In the case where the files fit into a single load job, there's no need 
to write temporary
@@ -543,7 +562,8 @@ void writeSinglePartition(
                 createDisposition,
                 sideInputs,
                 dynamicDestinations,
-                loadJobProjectId));
+                loadJobProjectId,
+                maxRetryJobs));
   }
 
   private WriteResult writeResult(Pipeline p) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 0943f914597..83af04125df 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -22,6 +22,7 @@
 
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -39,9 +40,12 @@
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A set of helper functions and classes used by {@link BigQueryIO}. */
 public class BigQueryHelpers {
@@ -55,6 +59,102 @@
           + " an earlier stage of the pipeline, this validation can be 
disabled using"
           + " #withoutValidation.";
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryHelpers.class);
+
+  // Given a potential failure and a current job-id, return the next job-id to 
be used on retry.
+  // Algorithm is as follows (given input of job_id_prefix-N)
+  //   If BigQuery has no status for job_id_prefix-n, we should retry with the 
same id.
+  //   If job-id-prefix-n is in the PENDING or successful states, no retry is 
needed.
+  //   Otherwise (job-id-prefix-n completed with errors), try again with 
job-id-prefix-(n+1)
+  //
+  // We continue to loop through these job ids until we find one that has 
either succeed, or that
+  // has not been issued yet.
+  static class RetryJobIdResult {
+    public final RetryJobId jobId;
+    public final boolean shouldRetry;
+
+    public RetryJobIdResult(RetryJobId jobId, boolean shouldRetry) {
+      this.jobId = jobId;
+      this.shouldRetry = shouldRetry;
+    }
+  }
+
+  static class RetryJobId {
+    private final String jobIdPrefix;
+    private final int retryIndex;
+
+    public RetryJobId(String jobIdPrefix, int retryIndex) {
+      this.jobIdPrefix = jobIdPrefix;
+      this.retryIndex = retryIndex;
+    }
+
+    public String getJobIdPrefix() {
+      return jobIdPrefix;
+    }
+
+    public int getRetryIndex() {
+      return retryIndex;
+    }
+
+    public String getJobId() {
+      return jobIdPrefix + "-" + retryIndex;
+    }
+
+    @Override
+    public String toString() {
+      return getJobId();
+    }
+  }
+
+  static RetryJobIdResult getRetryJobId(
+      RetryJobId currentJobId, String projectId, String bqLocation, JobService 
jobService)
+      throws InterruptedException {
+    for (int retryIndex = currentJobId.getRetryIndex(); ; retryIndex++) {
+      RetryJobId jobId = new RetryJobId(currentJobId.getJobIdPrefix(), 
retryIndex);
+      JobReference jobRef =
+          new JobReference()
+              .setProjectId(projectId)
+              .setJobId(jobId.getJobId())
+              .setLocation(bqLocation);
+      try {
+        Job loadJob = jobService.getJob(jobRef);
+        if (loadJob == null) {
+          LOG.info("job id {} not found, so retrying with that id", jobId);
+          // This either means that the original job was never properly issued 
(on the first
+          // iteration of the loop) or that we've found a retry id that has 
not been used yet. Try
+          // again with this job id.
+          return new RetryJobIdResult(jobId, true);
+        }
+        JobStatus jobStatus = loadJob.getStatus();
+        if (jobStatus == null) {
+          LOG.info("job status for {} not found, so retrying with that job 
id", jobId);
+          return new RetryJobIdResult(jobId, true);
+        }
+        if ("PENDING".equals(jobStatus.getState()) || 
"RUNNING".equals(jobStatus.getState())) {
+          // The job id has been issued and is currently pending. This can 
happen after receiving
+          // an error from the load or copy job creation (e.g. that error 
might come because the
+          // job already exists). Return to the caller which job id is pending 
(it might not be the
+          // one passed in) so the caller can then wait for this job to finish.
+          LOG.info("job {} in pending or running state, so continuing with 
that job id", jobId);
+          return new RetryJobIdResult(jobId, false);
+        }
+        if (jobStatus.getErrorResult() == null
+            && (jobStatus.getErrors() == null || 
jobStatus.getErrors().isEmpty())) {
+          // Import succeeded. No retry needed.
+          LOG.info("job {} succeeded, so not retrying ", jobId);
+          return new RetryJobIdResult(jobId, false);
+        }
+        // This job has failed, so we assume the data cannot enter BigQuery. 
We will check the next
+        // job in the sequence (with the same unique prefix) to see if is 
either pending/succeeded
+        // or can be used to generate a retry job.
+        LOG.info("job {} is failed. Checking the next job id.", jobId);
+      } catch (IOException e) {
+        LOG.info("caught exception while querying job {}", jobId);
+        return new RetryJobIdResult(jobId, true);
+      }
+    }
+  }
+
   /** Status of a BigQuery job or request. */
   enum Status {
     SUCCEEDED,
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e639f0c2acf..9b1515f9bd6 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1705,6 +1705,11 @@ public WriteResult expand(PCollection<T> input) {
         if (getMaxFileSize() != null) {
           batchLoads.setMaxFileSize(getMaxFileSize());
         }
+        // When running in streaming (unbounded mode) we want to retry failed 
load jobs
+        // indefinitely. Failing the bundle is expensive, so we set a fairly 
high limit on retries.
+        if (IsBounded.UNBOUNDED.equals(input.isBounded())) {
+          batchLoads.setMaxRetryJobs(1000);
+        }
         batchLoads.setTriggeringFrequency(getTriggeringFrequency());
         batchLoads.setNumFileShards(getNumFileShards());
         return rowsWithDestination.apply(batchLoads);
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index b52cfec7592..12bbd359a9f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -65,7 +65,7 @@ void startCopyJob(JobReference jobRef, 
JobConfigurationTableCopy copyConfig)
      *
      * <p>Returns null if the {@code maxAttempts} retries reached.
      */
-    Job pollJob(JobReference jobRef, int maxAttempts) throws 
InterruptedException, IOException;
+    Job pollJob(JobReference jobRef, int maxAttempts) throws 
InterruptedException;
 
     /** Dry runs the query in the given project. */
     JobStatistics dryRunQuery(String projectId, JobConfigurationQuery 
queryConfig, String location)
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 05fe01bc487..fd4b4396330 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -257,8 +257,16 @@ Job pollJob(JobReference jobRef, Sleeper sleeper, BackOff 
backoff) throws Interr
                   .get(jobRef.getProjectId(), jobRef.getJobId())
                   .setLocation(jobRef.getLocation())
                   .execute();
+          if (job == null) {
+            LOG.info("Still waiting for BigQuery job {} to start", jobRef);
+            continue;
+          }
           JobStatus status = job.getStatus();
-          if (status != null && "DONE".equals(status.getState())) {
+          if (status == null) {
+            LOG.info("Still waiting for BigQuery job {} to enter pending 
state", jobRef);
+            continue;
+          }
+          if ("DONE".equals(status.getState())) {
             LOG.info("BigQuery job {} completed in state DONE", jobRef);
             return job;
           }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index c9d08f350c9..86f2966b2e7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -18,6 +18,9 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
@@ -30,6 +33,8 @@
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobIdResult;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -37,8 +42,11 @@
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,16 +65,19 @@
   // append to the table, and so use CREATE_NEVER and WRITE_APPEND 
dispositions respectively.
   private final WriteDisposition firstPaneWriteDisposition;
   private final CreateDisposition firstPaneCreateDisposition;
+  private final int maxRetryJobs;
 
   public WriteRename(
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       WriteDisposition writeDisposition,
-      CreateDisposition createDisposition) {
+      CreateDisposition createDisposition,
+      int maxRetryJobs) {
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.firstPaneWriteDisposition = writeDisposition;
     this.firstPaneCreateDisposition = createDisposition;
+    this.maxRetryJobs = maxRetryJobs;
   }
 
   @ProcessElement
@@ -138,10 +149,44 @@ private void copy(
 
     String projectId = ref.getProjectId();
     Job lastFailedCopyJob = null;
-    for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
-      String jobId = jobIdPrefix + "-" + i;
-      JobReference jobRef = new 
JobReference().setProjectId(projectId).setJobId(jobId);
-      jobService.startCopyJob(jobRef, copyConfig);
+    RetryJobId jobId = new RetryJobId(jobIdPrefix, 0);
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
+    BackOff backoff =
+        BackOffAdapter.toGcpBackOff(
+            FluentBackoff.DEFAULT
+                .withMaxRetries(maxRetryJobs)
+                .withInitialBackoff(Duration.standardSeconds(1))
+                .withMaxBackoff(Duration.standardMinutes(1))
+                .backoff());
+    Sleeper sleeper = Sleeper.DEFAULT;
+    int i = 0;
+    do {
+      ++i;
+      JobReference jobRef =
+          new JobReference()
+              .setProjectId(projectId)
+              .setJobId(jobId.getJobId())
+              .setLocation(bqLocation);
+      LOG.info("Starting copy job for table {} using  {}, attempt {}", ref, 
jobRef, i);
+      try {
+        jobService.startCopyJob(jobRef, copyConfig);
+      } catch (IOException e) {
+        LOG.warn("Copy job {} failed with {}", jobRef, e);
+        // It's possible that the job actually made it to BQ even though we 
got a failure here.
+        // For example, the response from BQ may have timed out returning. 
getRetryJobId will
+        // return the correct job id to use on retry, or a job id to continue 
polling (if it turns
+        // out the the job has not actually failed yet).
+        RetryJobIdResult result =
+            BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService);
+        jobId = result.jobId;
+        if (result.shouldRetry) {
+          // Try the load again with the new job id.
+          continue;
+        }
+        // Otherwise,the job has reached BigQuery and is in either the PENDING 
state or has
+        // completed successfully.
+      }
       Job copyJob = jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
       Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
       switch (jobStatus) {
@@ -151,12 +196,18 @@ private void copy(
           }
           return;
         case UNKNOWN:
-          throw new RuntimeException(
-              String.format(
-                  "UNKNOWN status of copy job [%s]: %s.",
-                  jobId, BigQueryHelpers.jobToPrettyString(copyJob)));
+          // This might happen if BigQuery's job listing is slow. Retry with 
the same
+          // job id.
+          LOG.info(
+              "Copy job {} finished in unknown state: {}: {}",
+              jobRef,
+              copyJob.getStatus(),
+              (i < maxRetryJobs - 1) ? "will retry" : "will not retry");
+          lastFailedCopyJob = copyJob;
+          continue;
         case FAILED:
           lastFailedCopyJob = copyJob;
+          jobId = BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService).jobId;
           continue;
         default:
           throw new IllegalStateException(
@@ -164,14 +215,21 @@ private void copy(
                   "Unexpected status [%s] of load job: %s.",
                   jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
       }
-    }
+    } while (nextBackOff(sleeper, backoff));
     throw new RuntimeException(
         String.format(
             "Failed to create copy job with id prefix %s, "
                 + "reached max retries: %d, last failed copy job: %s.",
-            jobIdPrefix,
-            BatchLoads.MAX_RETRY_JOBS,
-            BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
+            jobIdPrefix, maxRetryJobs, 
BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
+  }
+
+  /** Identical to {@link BackOffUtils#next} but without checked IOException. 
*/
+  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws 
InterruptedException {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   static void removeTemporaryTables(DatasetService tableService, 
List<TableReference> tempTables) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 90407927191..b6ad1378dbf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -20,6 +20,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
@@ -39,6 +42,8 @@
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobIdResult;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
@@ -55,6 +60,8 @@
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -62,6 +69,7 @@
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +101,7 @@
   private final TupleTag<KV<TableDestination, String>> mainOutputTag;
   private final TupleTag<String> temporaryFilesTag;
   private final ValueProvider<String> loadJobProjectId;
+  private final int maxRetryJobs;
 
   private class WriteTablesDoFn
       extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, 
KV<TableDestination, String>> {
@@ -190,7 +199,8 @@ public WriteTables(
       CreateDisposition createDisposition,
       List<PCollectionView<?>> sideInputs,
       DynamicDestinations<?, DestinationT> dynamicDestinations,
-      @Nullable ValueProvider<String> loadJobProjectId) {
+      @Nullable ValueProvider<String> loadJobProjectId,
+      int maxRetryJobs) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.loadJobIdPrefixView = loadJobIdPrefixView;
@@ -201,6 +211,7 @@ public WriteTables(
     this.mainOutputTag = new TupleTag<>("WriteTablesMainOutput");
     this.temporaryFilesTag = new TupleTag<>("TemporaryFiles");
     this.loadJobProjectId = loadJobProjectId;
+    this.maxRetryJobs = maxRetryJobs;
   }
 
   @Override
@@ -261,16 +272,47 @@ private void load(
     Job lastFailedLoadJob = null;
     String bqLocation =
         BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
-    for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
-      String jobId = jobIdPrefix + "-" + i;
 
+    BackOff backoff =
+        BackOffAdapter.toGcpBackOff(
+            FluentBackoff.DEFAULT
+                .withMaxRetries(maxRetryJobs)
+                .withInitialBackoff(Duration.standardSeconds(1))
+                .withMaxBackoff(Duration.standardMinutes(1))
+                .backoff());
+    Sleeper sleeper = Sleeper.DEFAULT;
+    // First attempt is always jobIdPrefix-0.
+    RetryJobId jobId = new RetryJobId(jobIdPrefix, 0);
+    int i = 0;
+    do {
+      ++i;
       JobReference jobRef =
-          new 
JobReference().setProjectId(projectId).setJobId(jobId).setLocation(bqLocation);
+          new JobReference()
+              .setProjectId(projectId)
+              .setJobId(jobId.getJobId())
+              .setLocation(bqLocation);
 
       LOG.info("Loading {} files into {} using job {}, attempt {}", 
gcsUris.size(), ref, jobRef, i);
-      jobService.startLoadJob(jobRef, loadConfig);
+      try {
+        jobService.startLoadJob(jobRef, loadConfig);
+      } catch (IOException e) {
+        LOG.warn("Load job {} failed with {}", jobRef, e);
+        // It's possible that the job actually made it to BQ even though we 
got a failure here.
+        // For example, the response from BQ may have timed out returning. 
getRetryJobId will
+        // return the correct job id to use on retry, or a job id to continue 
polling (if it turns
+        // out the the job has not actually failed yet).
+        RetryJobIdResult result =
+            BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService);
+        jobId = result.jobId;
+        if (result.shouldRetry) {
+          // Try the load again with the new job id.
+          continue;
+        }
+        // Otherwise,the job has reached BigQuery and is in either the PENDING 
state or has
+        // completed successfully.
+      }
       LOG.info("Load job {} started", jobRef);
-
+      // Try to wait until the job is done (succeeded or failed).
       Job loadJob = jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
 
       Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
@@ -284,18 +326,24 @@ private void load(
           }
           return;
         case UNKNOWN:
-          LOG.info("Load job {} finished in unknown state: {}", jobRef, 
loadJob.getStatus());
-          throw new RuntimeException(
-              String.format(
-                  "UNKNOWN status of load job [%s]: %s.",
-                  jobId, BigQueryHelpers.jobToPrettyString(loadJob)));
-        case FAILED:
+          // This might happen if BigQuery's job listing is slow. Retry with 
the same
+          // job id.
           LOG.info(
-              "Load job {} failed, {}: {}",
+              "Load job {} finished in unknown state: {}: {}",
               jobRef,
-              (i < BatchLoads.MAX_RETRY_JOBS - 1) ? "will retry" : "will not 
retry",
-              loadJob.getStatus());
+              loadJob.getStatus(),
+              (i < maxRetryJobs - 1) ? "will retry" : "will not retry");
+          lastFailedLoadJob = loadJob;
+          continue;
+        case FAILED:
           lastFailedLoadJob = loadJob;
+          jobId = BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService).jobId;
+          LOG.info(
+              "Load job {} failed, {}: {}. Next job id {}",
+              jobRef,
+              (i < maxRetryJobs - 1) ? "will retry" : "will not retry",
+              loadJob.getStatus(),
+              jobId);
           continue;
         default:
           throw new IllegalStateException(
@@ -303,14 +351,21 @@ private void load(
                   "Unexpected status [%s] of load job: %s.",
                   loadJob.getStatus(), 
BigQueryHelpers.jobToPrettyString(loadJob)));
       }
-    }
+    } while (nextBackOff(sleeper, backoff));
     throw new RuntimeException(
         String.format(
             "Failed to create load job with id prefix %s, "
                 + "reached max retries: %d, last failed load job: %s.",
-            jobIdPrefix,
-            BatchLoads.MAX_RETRY_JOBS,
-            BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
+            jobIdPrefix, maxRetryJobs, 
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
+  }
+
+  /** Identical to {@link BackOffUtils#next} but without checked IOException. 
*/
+  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws 
InterruptedException {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   static void removeTemporaryFiles(Iterable<String> files) throws IOException {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 39e81e4cff0..b4d0a6b14f1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1182,6 +1182,7 @@ public void testWriteTables() throws Exception {
         p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
     List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);
 
+    fakeJobService.setNumFailuresExpected(3);
     WriteTables<String> writeTables =
         new WriteTables<>(
             false,
@@ -1191,7 +1192,8 @@ public void testWriteTables() throws Exception {
             BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
             sideInputs,
             new IdentityDynamicTables(),
-            null);
+            null,
+            4);
 
     PCollection<KV<TableDestination, String>> writeTablesOutput =
         writeTablesInput.apply(writeTables);
@@ -1272,7 +1274,8 @@ public void testWriteRename() throws Exception {
             fakeBqServices,
             jobIdTokenView,
             BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
-            BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED);
+            BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
+            3);
 
     DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = 
DoFnTester.of(writeRename);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index 97da8214dcb..ad92387b433 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -84,6 +84,10 @@
   // the next 2 will return the job as running, and only then will the job 
report as done.
   private static final int GET_JOBS_TRANSITION_INTERVAL = 2;
 
+  // The number of times to simulate a failure and trigger a retry.
+  private int numFailuresExpected;
+  private int numFailures = 0;
+
   private final FakeDatasetService datasetService;
 
   private static class JobInfo {
@@ -102,7 +106,16 @@
   private static com.google.common.collect.Table<String, String, 
JobStatistics> dryRunQueryResults;
 
   public FakeJobService() {
+    this(0);
+  }
+
+  public FakeJobService(int numFailures) {
     this.datasetService = new FakeDatasetService();
+    this.numFailuresExpected = numFailures;
+  }
+
+  public void setNumFailuresExpected(int numFailuresExpected) {
+    this.numFailuresExpected = numFailuresExpected;
   }
 
   public static void setUp() {
@@ -247,10 +260,17 @@ public Job getJob(JobReference jobRef) {
         }
         try {
           ++job.getJobCount;
-          if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
-            job.job.getStatus().setState("RUNNING");
-          } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 1) {
-            job.job.setStatus(runJob(job.job));
+          if (!"FAILED".equals(job.job.getStatus().getState())) {
+            if (numFailures < numFailuresExpected) {
+              ++numFailures;
+              throw new Exception("Failure number " + numFailures);
+            }
+
+            if (job.getJobCount == GET_JOBS_TRANSITION_INTERVAL + 1) {
+              job.job.getStatus().setState("RUNNING");
+            } else if (job.getJobCount == 2 * GET_JOBS_TRANSITION_INTERVAL + 
1) {
+              job.job.setStatus(runJob(job.job));
+            }
           }
         } catch (Exception e) {
           job.job
@@ -261,6 +281,9 @@ public Job getJob(JobReference jobRef) {
                       .setMessage(
                           String.format(
                               "Job %s failed: %s", job.job.getConfiguration(), 
e.toString())));
+          List<ResourceId> sourceFiles =
+              filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
+          FileSystems.delete(sourceFiles);
         }
         return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), 
Job.class);
       }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 129407)
    Time Spent: 2h 20m  (was: 2h 10m)

> BigQueryIO retries infinitely in WriteTable and WriteRename
> -----------------------------------------------------------
>
>                 Key: BEAM-5040
>                 URL: https://issues.apache.org/jira/browse/BEAM-5040
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.5.0
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> BigQueryIO retries infinitely in WriteTable and WriteRename
> Several failure scenarios with the current code:
>  # It's possible for a load job to return failure even though it actually 
> succeeded (e.g. the reply might have timed out). In this case, BigQueryIO 
> will retry the job which will fail again (because the job id has already been 
> used), leading to indefinite retries. Correct behavior is to stop retrying as 
> the load job has succeeded.
>  # It's possible for a load job to be accepted by BigQuery, but then to fail 
> on the BigQuery side. In this case a retry with the same job id will fail as 
> that job id has already been used. BigQueryIO will sometimes detect this, but 
> if the worker has restarted it will instead issue a load with the old job id 
> and go into a retry loop. Correct behavior is to generate a new deterministic 
> job id and retry using that new job id.
>  # In many cases of worker restart, BigQueryIO ends up in infinite retry 
> loops.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to