[ 
https://issues.apache.org/jira/browse/BEAM-5105?focusedWorklogId=152098&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152098
 ]

ASF GitHub Bot logged work on BEAM-5105:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Oct/18 19:41
            Start Date: 07/Oct/18 19:41
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6416: [BEAM-5105] Better 
parallelize BigQuery load jobs
URL: https://github.com/apache/beam/pull/6416
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 83af04125df..7469a19a451 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -20,15 +20,18 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,10 +43,12 @@
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.BackOffAdapter;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,24 +84,210 @@ public RetryJobIdResult(RetryJobId jobId, boolean 
shouldRetry) {
     }
   }
 
+  // A class that waits for pending jobs, retrying them according to policy if 
they fail.
+  static class PendingJobManager {
+    private static class JobInfo {
+      private final PendingJob pendingJob;
+      @Nullable private final SerializableFunction<PendingJob, Exception> 
onSuccess;
+
+      public JobInfo(PendingJob pendingJob, SerializableFunction<PendingJob, 
Exception> onSuccess) {
+        this.pendingJob = pendingJob;
+        this.onSuccess = onSuccess;
+      }
+    }
+
+    private List<JobInfo> pendingJobs = Lists.newArrayList();
+    private final BackOff backOff;
+
+    PendingJobManager() {
+      this(
+          BackOffAdapter.toGcpBackOff(
+              FluentBackoff.DEFAULT
+                  .withMaxRetries(Integer.MAX_VALUE)
+                  .withInitialBackoff(Duration.standardSeconds(1))
+                  .withMaxBackoff(Duration.standardMinutes(1))
+                  .backoff()));
+    }
+
+    PendingJobManager(BackOff backOff) {
+      this.backOff = backOff;
+    }
+
+    // Add a pending job and a function to call when the job has completed 
successfully.
+    PendingJobManager addPendingJob(
+        PendingJob pendingJob, @Nullable SerializableFunction<PendingJob, 
Exception> onSuccess) {
+      this.pendingJobs.add(new JobInfo(pendingJob, onSuccess));
+      return this;
+    }
+
+    void waitForDone() throws Exception {
+      LOG.info("Waiting for jobs to complete.");
+      Sleeper sleeper = Sleeper.DEFAULT;
+      while (!pendingJobs.isEmpty()) {
+        List<JobInfo> retryJobs = Lists.newArrayList();
+        for (JobInfo jobInfo : pendingJobs) {
+          if (jobInfo.pendingJob.pollJob()) {
+            // Job has completed successfully.
+            LOG.info("Job {} completed successfully.", 
jobInfo.pendingJob.currentJobId);
+            Exception e = jobInfo.onSuccess.apply(jobInfo.pendingJob);
+            if (e != null) {
+              throw e;
+            }
+          } else {
+            // Job failed, schedule it again.
+            LOG.info("Job {} failed. retrying.", 
jobInfo.pendingJob.currentJobId);
+            retryJobs.add(jobInfo);
+          }
+        }
+        pendingJobs = retryJobs;
+        if (!pendingJobs.isEmpty()) {
+          // Sleep before retrying.
+          nextBackOff(sleeper, backOff);
+          // Run the jobs to retry. If a job has hit the maximum number of 
retries then runJob
+          // will raise an exception.
+          for (JobInfo job : pendingJobs) {
+            job.pendingJob.runJob();
+          }
+        }
+      }
+    }
+
+    /** Identical to {@link BackOffUtils#next} but without checked 
IOException. */
+    private static boolean nextBackOff(Sleeper sleeper, BackOff backOff)
+        throws InterruptedException {
+      try {
+        return BackOffUtils.next(sleeper, backOff);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  static class PendingJob {
+    private final SerializableFunction<RetryJobId, Void> executeJob;
+    private final SerializableFunction<RetryJobId, Job> pollJob;
+    private final SerializableFunction<RetryJobId, Job> lookupJob;
+    private final int maxRetries;
+    private int currentAttempt;
+    RetryJobId currentJobId;
+    Job lastJobAttempted;
+    boolean started;
+
+    PendingJob(
+        SerializableFunction<RetryJobId, Void> executeJob,
+        SerializableFunction<RetryJobId, Job> pollJob,
+        SerializableFunction<RetryJobId, Job> lookupJob,
+        int maxRetries,
+        String jobIdPrefix) {
+      this.executeJob = executeJob;
+      this.pollJob = pollJob;
+      this.lookupJob = lookupJob;
+      this.maxRetries = maxRetries;
+      this.currentAttempt = 0;
+      currentJobId = new RetryJobId(jobIdPrefix, 0);
+      this.started = false;
+    }
+
+    // Run the job.
+    void runJob() throws IOException {
+      ++currentAttempt;
+      if (!shouldRetry()) {
+        throw new RuntimeException(
+            String.format(
+                "Failed to create job with prefix %s, "
+                    + "reached max retries: %d, last failed job: %s.",
+                currentJobId.getJobIdPrefix(),
+                maxRetries,
+                BigQueryHelpers.jobToPrettyString(lastJobAttempted)));
+      }
+
+      try {
+        this.started = false;
+        executeJob.apply(currentJobId);
+      } catch (RuntimeException e) {
+        LOG.warn("Job {} failed with {}", currentJobId.getJobId(), e);
+        // It's possible that the job actually made it to BQ even though we 
got a failure here.
+        // For example, the response from BQ may have timed out returning. 
getRetryJobId will
+        // return the correct job id to use on retry, or a job id to continue 
polling (if it turns
+        // out that the job has not actually failed yet).
+        RetryJobIdResult result = getRetryJobId(currentJobId, lookupJob);
+        currentJobId = result.jobId;
+        if (result.shouldRetry) {
+          // Otherwise the jobs either never started or started and failed. 
Try the job again with
+          // the job id returned by getRetryJobId.
+          LOG.info("Will retry with job id {}", currentJobId.getJobId());
+          return;
+        }
+      }
+      LOG.info("job {} started", currentJobId.getJobId());
+      // The job has reached BigQuery and is in either the PENDING state or 
has completed
+      // successfully.
+      this.started = true;
+    }
+
+    // Poll the status of the job. Returns true if the job has completed 
successfully and false
+    // otherwise.
+    boolean pollJob() throws IOException {
+      if (started) {
+        Job job = pollJob.apply(currentJobId);
+        this.lastJobAttempted = job;
+        Status jobStatus = parseStatus(job);
+        switch (jobStatus) {
+          case SUCCEEDED:
+            LOG.info("Load job {} succeeded. Statistics: {}", currentJobId, 
job.getStatistics());
+            return true;
+          case UNKNOWN:
+            // This might happen if BigQuery's job listing is slow. Retry with 
the same
+            // job id.
+            LOG.info(
+                "Load job {} finished in unknown state: {}: {}",
+                currentJobId,
+                job.getStatus(),
+                shouldRetry() ? "will retry" : "will not retry");
+            return false;
+          case FAILED:
+            String oldJobId = currentJobId.getJobId();
+            currentJobId = BigQueryHelpers.getRetryJobId(currentJobId, 
lookupJob).jobId;
+            LOG.info(
+                "Load job {} failed, {}: {}. Next job id {}",
+                oldJobId,
+                shouldRetry() ? "will retry" : "will not retry",
+                job.getStatus(),
+                currentJobId);
+            return false;
+          default:
+            throw new IllegalStateException(
+                String.format(
+                    "Unexpected status [%s] of load job: %s.",
+                    job.getStatus(), BigQueryHelpers.jobToPrettyString(job)));
+        }
+      }
+      return false;
+    }
+
+    boolean shouldRetry() {
+      return currentAttempt < maxRetries + 1;
+    }
+  }
+
   static class RetryJobId {
     private final String jobIdPrefix;
     private final int retryIndex;
 
-    public RetryJobId(String jobIdPrefix, int retryIndex) {
+    RetryJobId(String jobIdPrefix, int retryIndex) {
       this.jobIdPrefix = jobIdPrefix;
       this.retryIndex = retryIndex;
     }
 
-    public String getJobIdPrefix() {
+    String getJobIdPrefix() {
       return jobIdPrefix;
     }
 
-    public int getRetryIndex() {
+    int getRetryIndex() {
       return retryIndex;
     }
 
-    public String getJobId() {
+    String getJobId() {
       return jobIdPrefix + "-" + retryIndex;
     }
 
@@ -107,17 +298,11 @@ public String toString() {
   }
 
   static RetryJobIdResult getRetryJobId(
-      RetryJobId currentJobId, String projectId, String bqLocation, JobService 
jobService)
-      throws InterruptedException {
+      RetryJobId currentJobId, SerializableFunction<RetryJobId, Job> 
lookupJob) {
     for (int retryIndex = currentJobId.getRetryIndex(); ; retryIndex++) {
       RetryJobId jobId = new RetryJobId(currentJobId.getJobIdPrefix(), 
retryIndex);
-      JobReference jobRef =
-          new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId.getJobId())
-              .setLocation(bqLocation);
       try {
-        Job loadJob = jobService.getJob(jobRef);
+        Job loadJob = lookupJob.apply(jobId);
         if (loadJob == null) {
           LOG.info("job id {} not found, so retrying with that id", jobId);
           // This either means that the original job was never properly issued 
(on the first
@@ -147,8 +332,8 @@ static RetryJobIdResult getRetryJobId(
         // This job has failed, so we assume the data cannot enter BigQuery. 
We will check the next
         // job in the sequence (with the same unique prefix) to see if is 
either pending/succeeded
         // or can be used to generate a retry job.
-        LOG.info("job {} is failed. Checking the next job id.", jobId);
-      } catch (IOException e) {
+        LOG.info("job {} is failed. Checking the next job id", jobId);
+      } catch (RuntimeException e) {
         LOG.info("caught exception while querying job {}", jobId);
         return new RetryJobIdResult(jobId, true);
       }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 86f2966b2e7..6e5e77f9958 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -18,10 +18,6 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
@@ -32,21 +28,17 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobId;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobIdResult;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.BackOffAdapter;
-import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +59,23 @@
   private final CreateDisposition firstPaneCreateDisposition;
   private final int maxRetryJobs;
 
+  private static class PendingJobData {
+    final BigQueryHelpers.PendingJob retryJob;
+    final TableDestination tableDestination;
+    final List<TableReference> tempTables;
+
+    public PendingJobData(
+        BigQueryHelpers.PendingJob retryJob,
+        TableDestination tableDestination,
+        List<TableReference> tempTables) {
+      this.retryJob = retryJob;
+      this.tableDestination = tableDestination;
+      this.tempTables = tempTables;
+    }
+  }
+  // All pending copy jobs.
+  private List<PendingJobData> pendingJobs = Lists.newArrayList();
+
   public WriteRename(
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
@@ -80,6 +89,11 @@ public WriteRename(
     this.maxRetryJobs = maxRetryJobs;
   }
 
+  @StartBundle
+  public void startBundle(StartBundleContext c) {
+    pendingJobs.clear();
+  }
+
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
@@ -88,58 +102,78 @@ public void processElement(ProcessContext c) throws 
Exception {
     }
     for (Map.Entry<TableDestination, Collection<String>> entry : 
tempTables.asMap().entrySet()) {
       // Process each destination table.
-      writeRename(entry.getKey(), entry.getValue(), c);
+      // Do not copy if no temp tables are provided.
+      if (!entry.getValue().isEmpty()) {
+        pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c));
+      }
+    }
+  }
+
+  @FinishBundle
+  public void finishBundle(FinishBundleContext c) throws Exception {
+    DatasetService datasetService =
+        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+    PendingJobManager jobManager = new PendingJobManager();
+    for (PendingJobData pendingJob : pendingJobs) {
+      jobManager.addPendingJob(
+          pendingJob.retryJob,
+          j -> {
+            try {
+              if (pendingJob.tableDestination.getTableDescription() != null) {
+                TableReference ref = 
pendingJob.tableDestination.getTableReference();
+                datasetService.patchTableDescription(
+                    ref.clone()
+                        
.setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+                    pendingJob.tableDestination.getTableDescription());
+              }
+              removeTemporaryTables(datasetService, pendingJob.tempTables);
+              return null;
+            } catch (IOException | InterruptedException e) {
+              return e;
+            }
+          });
     }
+    jobManager.waitForDone();
   }
 
-  private void writeRename(
+  private PendingJobData startWriteRename(
       TableDestination finalTableDestination, Iterable<String> tempTableNames, 
ProcessContext c)
       throws Exception {
     WriteDisposition writeDisposition =
         (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : 
WriteDisposition.WRITE_APPEND;
     CreateDisposition createDisposition =
         (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : 
CreateDisposition.CREATE_NEVER;
-    List<String> tempTablesJson = Lists.newArrayList(tempTableNames);
-    // Do not copy if no temp tables are provided
-    if (tempTablesJson.isEmpty()) {
-      return;
-    }
-
-    List<TableReference> tempTables = Lists.newArrayList();
-    for (String table : tempTablesJson) {
-      tempTables.add(BigQueryHelpers.fromJsonString(table, 
TableReference.class));
-    }
+    List<TableReference> tempTables =
+        StreamSupport.stream(tempTableNames.spliterator(), false)
+            .map(table -> BigQueryHelpers.fromJsonString(table, 
TableReference.class))
+            .collect(Collectors.toList());
+    ;
 
     // Make sure each destination table gets a unique job id.
     String jobIdPrefix =
         BigQueryHelpers.createJobId(
             c.sideInput(jobIdToken), finalTableDestination, -1, 
c.pane().getIndex());
 
-    copy(
-        
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        jobIdPrefix,
-        finalTableDestination.getTableReference(),
-        tempTables,
-        writeDisposition,
-        createDisposition,
-        finalTableDestination.getTableDescription());
-
-    DatasetService tableService =
-        
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
-    removeTemporaryTables(tableService, tempTables);
+    BigQueryHelpers.PendingJob retryJob =
+        startCopy(
+            
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+            jobIdPrefix,
+            finalTableDestination.getTableReference(),
+            tempTables,
+            writeDisposition,
+            createDisposition);
+    return new PendingJobData(retryJob, finalTableDestination, tempTables);
   }
 
-  private void copy(
+  private BigQueryHelpers.PendingJob startCopy(
       JobService jobService,
       DatasetService datasetService,
       String jobIdPrefix,
       TableReference ref,
       List<TableReference> tempTables,
       WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      @Nullable String tableDescription)
-      throws InterruptedException, IOException {
+      CreateDisposition createDisposition) {
     JobConfigurationTableCopy copyConfig =
         new JobConfigurationTableCopy()
             .setSourceTables(tempTables)
@@ -147,89 +181,60 @@ private void copy(
             .setWriteDisposition(writeDisposition.name())
             .setCreateDisposition(createDisposition.name());
 
-    String projectId = ref.getProjectId();
-    Job lastFailedCopyJob = null;
-    RetryJobId jobId = new RetryJobId(jobIdPrefix, 0);
     String bqLocation =
         BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
-    BackOff backoff =
-        BackOffAdapter.toGcpBackOff(
-            FluentBackoff.DEFAULT
-                .withMaxRetries(maxRetryJobs)
-                .withInitialBackoff(Duration.standardSeconds(1))
-                .withMaxBackoff(Duration.standardMinutes(1))
-                .backoff());
-    Sleeper sleeper = Sleeper.DEFAULT;
-    int i = 0;
-    do {
-      ++i;
-      JobReference jobRef =
-          new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId.getJobId())
-              .setLocation(bqLocation);
-      LOG.info("Starting copy job for table {} using  {}, attempt {}", ref, 
jobRef, i);
-      try {
-        jobService.startCopyJob(jobRef, copyConfig);
-      } catch (IOException e) {
-        LOG.warn("Copy job {} failed with {}", jobRef, e);
-        // It's possible that the job actually made it to BQ even though we 
got a failure here.
-        // For example, the response from BQ may have timed out returning. 
getRetryJobId will
-        // return the correct job id to use on retry, or a job id to continue 
polling (if it turns
-        // out the the job has not actually failed yet).
-        RetryJobIdResult result =
-            BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService);
-        jobId = result.jobId;
-        if (result.shouldRetry) {
-          // Try the load again with the new job id.
-          continue;
-        }
-        // Otherwise,the job has reached BigQuery and is in either the PENDING 
state or has
-        // completed successfully.
-      }
-      Job copyJob = jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
-      Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
-      switch (jobStatus) {
-        case SUCCEEDED:
-          if (tableDescription != null) {
-            datasetService.patchTableDescription(ref, tableDescription);
-          }
-          return;
-        case UNKNOWN:
-          // This might happen if BigQuery's job listing is slow. Retry with 
the same
-          // job id.
-          LOG.info(
-              "Copy job {} finished in unknown state: {}: {}",
-              jobRef,
-              copyJob.getStatus(),
-              (i < maxRetryJobs - 1) ? "will retry" : "will not retry");
-          lastFailedCopyJob = copyJob;
-          continue;
-        case FAILED:
-          lastFailedCopyJob = copyJob;
-          jobId = BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService).jobId;
-          continue;
-        default:
-          throw new IllegalStateException(
-              String.format(
-                  "Unexpected status [%s] of load job: %s.",
-                  jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
-      }
-    } while (nextBackOff(sleeper, backoff));
-    throw new RuntimeException(
-        String.format(
-            "Failed to create copy job with id prefix %s, "
-                + "reached max retries: %d, last failed copy job: %s.",
-            jobIdPrefix, maxRetryJobs, 
BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
-  }
 
-  /** Identical to {@link BackOffUtils#next} but without checked IOException. 
*/
-  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws 
InterruptedException {
-    try {
-      return BackOffUtils.next(sleeper, backoff);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    String projectId = ref.getProjectId();
+    BigQueryHelpers.PendingJob retryJob =
+        new BigQueryHelpers.PendingJob(
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              LOG.info(
+                  "Starting copy job for table {} using  {}, job id iteration 
{}",
+                  ref,
+                  jobRef,
+                  jobId.getRetryIndex());
+              try {
+                jobService.startCopyJob(jobRef, copyConfig);
+              } catch (IOException | InterruptedException e) {
+                LOG.warn("Copy job {} failed with {}", jobRef, e);
+                throw new RuntimeException(e);
+              }
+              return null;
+            },
+            // Function to poll the result of a load job.
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              try {
+                return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            },
+            // Function to lookup a job.
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              try {
+                return jobService.getJob(jobRef);
+              } catch (InterruptedException | IOException e) {
+                throw new RuntimeException(e);
+              }
+            },
+            maxRetryJobs,
+            jobIdPrefix);
+    return retryJob;
   }
 
   static void removeTemporaryTables(DatasetService tableService, 
List<TableReference> tempTables) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index a509fa2eda7..e055da6dec0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -20,10 +20,6 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
@@ -42,9 +38,8 @@
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobId;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.RetryJobIdResult;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJob;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -57,11 +52,10 @@
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.BackOffAdapter;
-import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -69,7 +63,6 @@
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -108,15 +101,40 @@
       extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, 
KV<TableDestination, String>> {
     private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
 
+    // Represents a pending BigQuery load job.
+    private class PendingJobData {
+      final BoundedWindow window;
+      final BigQueryHelpers.PendingJob retryJob;
+      final List<String> partitionFiles;
+      final TableDestination tableDestination;
+      final TableReference tableReference;
+
+      public PendingJobData(
+          BoundedWindow window,
+          BigQueryHelpers.PendingJob retryJob,
+          List<String> partitionFiles,
+          TableDestination tableDestination,
+          TableReference tableReference) {
+        this.window = window;
+        this.retryJob = retryJob;
+        this.partitionFiles = partitionFiles;
+        this.tableDestination = tableDestination;
+        this.tableReference = tableReference;
+      }
+    }
+    // All pending load jobs.
+    private List<PendingJobData> pendingJobs = Lists.newArrayList();
+
     @StartBundle
     public void startBundle(StartBundleContext c) {
       // Clear the map on each bundle so we can notice side-input updates.
       // (alternative is to use a cache with a TTL).
       jsonSchemas.clear();
+      pendingJobs.clear();
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, BoundedWindow window) throws 
Exception {
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
       DestinationT destination = c.element().getKey().getKey();
       TableSchema tableSchema;
@@ -166,22 +184,64 @@ public void processElement(ProcessContext c) throws 
Exception {
           (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : 
WriteDisposition.WRITE_APPEND;
       CreateDisposition createDisposition =
           (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : 
CreateDisposition.CREATE_NEVER;
-      load(
-          
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-          
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-          jobIdPrefix,
-          tableReference,
-          tableDestination.getTimePartitioning(),
-          tableSchema,
-          partitionFiles,
-          writeDisposition,
-          createDisposition,
-          tableDestination.getTableDescription());
-      c.output(
-          mainOutputTag, KV.of(tableDestination, 
BigQueryHelpers.toJsonString(tableReference)));
-      for (String file : partitionFiles) {
-        c.output(temporaryFilesTag, file);
+
+      BigQueryHelpers.PendingJob retryJob =
+          startLoad(
+              
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+              
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+              jobIdPrefix,
+              tableReference,
+              tableDestination.getTimePartitioning(),
+              tableSchema,
+              partitionFiles,
+              writeDisposition,
+              createDisposition);
+      pendingJobs.add(
+          new PendingJobData(window, retryJob, partitionFiles, 
tableDestination, tableReference));
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) throws Exception {
+      DatasetService datasetService =
+          
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+
+      PendingJobManager jobManager = new PendingJobManager();
+      for (PendingJobData pendingJob : pendingJobs) {
+        jobManager =
+            jobManager.addPendingJob(
+                pendingJob.retryJob,
+                // Lambda called when the job is done.
+                j -> {
+                  try {
+                    if (pendingJob.tableDestination.getTableDescription() != 
null) {
+                      TableReference ref = pendingJob.tableReference;
+                      datasetService.patchTableDescription(
+                          ref.clone()
+                              .setTableId(
+                                  
BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+                          pendingJob.tableDestination.getTableDescription());
+                    }
+                    c.output(
+                        mainOutputTag,
+                        KV.of(
+                            pendingJob.tableDestination,
+                            
BigQueryHelpers.toJsonString(pendingJob.tableReference)),
+                        pendingJob.window.maxTimestamp(),
+                        pendingJob.window);
+                    for (String file : pendingJob.partitionFiles) {
+                      c.output(
+                          temporaryFilesTag,
+                          file,
+                          pendingJob.window.maxTimestamp(),
+                          pendingJob.window);
+                    }
+                    return null;
+                  } catch (IOException | InterruptedException e) {
+                    return e;
+                  }
+                });
       }
+      jobManager.waitForDone();
     }
   }
 
@@ -248,7 +308,7 @@ public WriteTables(
     return writeTablesOutputs.get(mainOutputTag);
   }
 
-  private void load(
+  private PendingJob startLoad(
       JobService jobService,
       DatasetService datasetService,
       String jobIdPrefix,
@@ -257,9 +317,7 @@ private void load(
       @Nullable TableSchema schema,
       List<String> gcsUris,
       WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      @Nullable String tableDescription)
-      throws InterruptedException, IOException {
+      CreateDisposition createDisposition) {
     JobConfigurationLoad loadConfig =
         new JobConfigurationLoad()
             .setDestinationTable(ref)
@@ -273,103 +331,61 @@ private void load(
       loadConfig.setTimePartitioning(timePartitioning);
     }
     String projectId = loadJobProjectId == null ? ref.getProjectId() : 
loadJobProjectId.get();
-    Job lastFailedLoadJob = null;
     String bqLocation =
         BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), 
ref.getDatasetId());
 
-    BackOff backoff =
-        BackOffAdapter.toGcpBackOff(
-            FluentBackoff.DEFAULT
-                .withMaxRetries(maxRetryJobs)
-                .withInitialBackoff(Duration.standardSeconds(1))
-                .withMaxBackoff(Duration.standardMinutes(1))
-                .backoff());
-    Sleeper sleeper = Sleeper.DEFAULT;
-    // First attempt is always jobIdPrefix-0.
-    RetryJobId jobId = new RetryJobId(jobIdPrefix, 0);
-    int i = 0;
-    do {
-      ++i;
-      JobReference jobRef =
-          new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId.getJobId())
-              .setLocation(bqLocation);
-
-      LOG.info("Loading {} files into {} using job {}, attempt {}", 
gcsUris.size(), ref, jobRef, i);
-      try {
-        jobService.startLoadJob(jobRef, loadConfig);
-      } catch (IOException e) {
-        LOG.warn("Load job {} failed with {}", jobRef, e);
-        // It's possible that the job actually made it to BQ even though we 
got a failure here.
-        // For example, the response from BQ may have timed out returning. 
getRetryJobId will
-        // return the correct job id to use on retry, or a job id to continue 
polling (if it turns
-        // out the the job has not actually failed yet).
-        RetryJobIdResult result =
-            BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService);
-        jobId = result.jobId;
-        if (result.shouldRetry) {
-          // Try the load again with the new job id.
-          continue;
-        }
-        // Otherwise,the job has reached BigQuery and is in either the PENDING 
state or has
-        // completed successfully.
-      }
-      LOG.info("Load job {} started", jobRef);
-      // Try to wait until the job is done (succeeded or failed).
-      Job loadJob = jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
-
-      Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
-      switch (jobStatus) {
-        case SUCCEEDED:
-          LOG.info("Load job {} succeeded. Statistics: {}", jobRef, 
loadJob.getStatistics());
-          if (tableDescription != null) {
-            datasetService.patchTableDescription(
-                
ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
-                tableDescription);
-          }
-          return;
-        case UNKNOWN:
-          // This might happen if BigQuery's job listing is slow. Retry with 
the same
-          // job id.
-          LOG.info(
-              "Load job {} finished in unknown state: {}: {}",
-              jobRef,
-              loadJob.getStatus(),
-              (i < maxRetryJobs - 1) ? "will retry" : "will not retry");
-          lastFailedLoadJob = loadJob;
-          continue;
-        case FAILED:
-          lastFailedLoadJob = loadJob;
-          jobId = BigQueryHelpers.getRetryJobId(jobId, projectId, bqLocation, 
jobService).jobId;
-          LOG.info(
-              "Load job {} failed, {}: {}. Next job id {}",
-              jobRef,
-              (i < maxRetryJobs - 1) ? "will retry" : "will not retry",
-              loadJob.getStatus(),
-              jobId);
-          continue;
-        default:
-          throw new IllegalStateException(
-              String.format(
-                  "Unexpected status [%s] of load job: %s.",
-                  loadJob.getStatus(), 
BigQueryHelpers.jobToPrettyString(loadJob)));
-      }
-    } while (nextBackOff(sleeper, backoff));
-    throw new RuntimeException(
-        String.format(
-            "Failed to create load job with id prefix %s, "
-                + "reached max retries: %d, last failed load job: %s.",
-            jobIdPrefix, maxRetryJobs, 
BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
-  }
-
-  /** Identical to {@link BackOffUtils#next} but without checked IOException. 
*/
-  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws 
InterruptedException {
-    try {
-      return BackOffUtils.next(sleeper, backoff);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    PendingJob retryJob =
+        new PendingJob(
+            // Function to load the data.
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              LOG.info(
+                  "Loading {} files into {} using job {}, job id iteration {}",
+                  gcsUris.size(),
+                  ref,
+                  jobRef,
+                  jobId.getRetryIndex());
+              try {
+                jobService.startLoadJob(jobRef, loadConfig);
+              } catch (IOException | InterruptedException e) {
+                LOG.warn("Load job {} failed with {}", jobRef, e.toString());
+                throw new RuntimeException(e);
+              }
+              return null;
+            },
+            // Function to poll the result of a load job.
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              try {
+                return jobService.pollJob(jobRef, 
BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+            },
+            // Function to lookup a job.
+            jobId -> {
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(projectId)
+                      .setJobId(jobId.getJobId())
+                      .setLocation(bqLocation);
+              try {
+                return jobService.getJob(jobRef);
+              } catch (InterruptedException | IOException e) {
+                throw new RuntimeException(e);
+              }
+            },
+            maxRetryJobs,
+            jobIdPrefix);
+    return retryJob;
   }
 
   static void removeTemporaryFiles(Iterable<String> files) throws IOException {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
index 89a2842b84d..9542b9a0f07 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpersTest.java
@@ -20,23 +20,36 @@
 import static org.junit.Assert.assertEquals;
 
 import com.google.api.client.util.Data;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.collect.ImmutableSet;
+import java.util.Random;
+import java.util.Set;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJob;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.PendingJobManager;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.util.BackOffAdapter;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Duration;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.testng.collections.Sets;
 
 /** Tests for {@link BigQueryHelpers}. */
 @RunWith(JUnit4.class)
@@ -131,4 +144,63 @@ public void testComplexCoderSerializable() {
             KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), 
TableRowInfoCoder.of()),
             IntervalWindow.getCoder()));
   }
+
+  @Test
+  public void testPendingJobManager() throws Exception {
+    PendingJobManager jobManager =
+        new PendingJobManager(
+            BackOffAdapter.toGcpBackOff(
+                FluentBackoff.DEFAULT
+                    .withMaxRetries(Integer.MAX_VALUE)
+                    .withInitialBackoff(Duration.millis(10))
+                    .withMaxBackoff(Duration.millis(10))
+                    .backoff()));
+
+    Set<String> succeeded = Sets.newHashSet();
+    for (int i = 0; i < 5; i++) {
+      Job currentJob = new Job();
+      currentJob.setKind(" bigquery#job");
+      PendingJob pendingJob =
+          new PendingJob(
+              retryId -> {
+                if (new Random().nextInt(2) == 0) {
+                  throw new RuntimeException("Failing to start.");
+                }
+                currentJob.setJobReference(
+                    new JobReference()
+                        .setProjectId("")
+                        .setLocation("")
+                        .setJobId(retryId.getJobId()));
+                return null;
+              },
+              retryId -> {
+                if (retryId.getRetryIndex() < 5) {
+                  currentJob.setStatus(new JobStatus().setErrorResult(new 
ErrorProto()));
+                } else {
+                  currentJob.setStatus(new JobStatus().setErrorResult(null));
+                }
+                return currentJob;
+              },
+              retryId -> {
+                if 
(retryId.getJobId().equals(currentJob.getJobReference().getJobId())) {
+                  return currentJob;
+                } else {
+                  return null;
+                }
+              },
+              100,
+              "JOB_" + i);
+      jobManager.addPendingJob(
+          pendingJob,
+          j -> {
+            succeeded.add(j.currentJobId.getJobId());
+            return null;
+          });
+    }
+
+    jobManager.waitForDone();
+    Set<String> expectedJobs =
+        ImmutableSet.of("JOB_0-5", "JOB_1-5", "JOB_2-5", "JOB_3-5", "JOB_4-5");
+    assertEquals(expectedJobs, succeeded);
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 971fed8d52f..14a37d4abe1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -757,7 +757,7 @@ public void testWriteUnknown() throws Exception {
                 .withoutValidation());
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Failed to create load job");
+    thrown.expectMessage("Failed to create job");
     p.run();
   }
 
@@ -777,9 +777,9 @@ public void testWriteFailedJobs() throws Exception {
                 .withoutValidation());
 
     thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Failed to create load job with id prefix");
+    thrown.expectMessage("Failed to create job with prefix");
     thrown.expectMessage("reached max retries");
-    thrown.expectMessage("last failed load job");
+    thrown.expectMessage("last failed job");
 
     p.run();
   }
@@ -1281,6 +1281,7 @@ public void testWriteRename() throws Exception {
     DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = 
DoFnTester.of(writeRename);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(tempTablesElement);
+    tester.finishBundle();
 
     for (Map.Entry<TableDestination, Collection<String>> entry : 
tempTables.asMap().entrySet()) {
       TableDestination tableDestination = entry.getKey();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 152098)
    Time Spent: 2h 20m  (was: 2h 10m)

> Move load job poll to finishBundle() method to better parallelize execution
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-5105
>                 URL: https://issues.apache.org/jira/browse/BEAM-5105
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> It appears that when we write to BigQuery using WriteTablesDoFn we start a 
> load job and wait for that job to finish.
> [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L318]
>  
> In cases where we are trying to write a PCollection of tables (for example, 
> when user use dynamic destinations feature) this relies on dynamic work 
> rebalancing to parallellize execution of load jobs. If the runner does not 
> support dynamic work rebalancing or does not execute dynamic work rebalancing 
> from some reason this could have significant performance drawbacks. For 
> example, scheduling times for load jobs will add up.
>  
> A better approach might be to start load jobs at process() method but wait 
> for all load jobs to finish at finishBundle() method. This will parallelize 
> any overheads as well as job execution (assuming more than one job is 
> schedule by BQ.).
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to