[
https://issues.apache.org/jira/browse/BEAM-5105?focusedWorklogId=152058&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-152058
]
ASF GitHub Bot logged work on BEAM-5105:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Oct/18 11:42
Start Date: 07/Oct/18 11:42
Worklog Time Spent: 10m
Work Description: reuvenlax commented on a change in pull request #6416:
[BEAM-5105] Better parallelize BigQuery load jobs
URL: https://github.com/apache/beam/pull/6416#discussion_r223210015
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##########
@@ -79,24 +86,202 @@ public RetryJobIdResult(RetryJobId jobId, boolean
shouldRetry) {
}
}
+ // A class that waits for pending jobs, retrying them according to policy if
they fail.
+ static class PendingJobManager {
+ private static class JobInfo {
+ private final PendingJob pendingJob;
+ @Nullable private final SerializableFunction<PendingJob, Exception>
onSuccess;
+
+ public JobInfo(PendingJob pendingJob, SerializableFunction<PendingJob,
Exception> onSuccess) {
+ this.pendingJob = pendingJob;
+ this.onSuccess = onSuccess;
+ }
+ }
+
+ private List<JobInfo> pendingJobs = Lists.newArrayList();
+
+ // Add a pending job and a function to call when the job has completed
successfully.
+ PendingJobManager addPendingJob(
+ PendingJob pendingJob, @Nullable SerializableFunction<PendingJob,
Exception> onSuccess) {
+ this.pendingJobs.add(new JobInfo(pendingJob, onSuccess));
+ return this;
+ }
+
+ void waitForDone() throws Exception {
+ BackOff backoff =
+ BackOffAdapter.toGcpBackOff(
+ FluentBackoff.DEFAULT
+ .withMaxRetries(Integer.MAX_VALUE)
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .backoff());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ while (!pendingJobs.isEmpty()) {
+ List<JobInfo> retryJobs = Lists.newArrayList();
+ for (JobInfo jobInfo : pendingJobs) {
+ if (jobInfo.pendingJob.pollJob()) {
+ // Job has completed successfully.
+ Exception e = jobInfo.onSuccess.apply(null);
+ if (e != null) {
+ throw e;
+ }
+ } else {
+ // Job failed, start it again. If it has hit the maximum number of
retries then runJob
+ // will throw an exception.
+ jobInfo.pendingJob.runJob();
+ retryJobs.add(jobInfo);
+ }
+ }
+ pendingJobs = retryJobs;
+ if (!pendingJobs.isEmpty()) {
+ // Sleep before retrying.
+ nextBackOff(sleeper, backoff);
+ }
+ }
+ }
+
+ /** Identical to {@link BackOffUtils#next} but without checked
IOException. */
+ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
+ throws InterruptedException {
+ try {
+ return BackOffUtils.next(sleeper, backoff);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static class PendingJob {
+ private final SerializableFunction<RetryJobId, Void> executeJob;
+ private final SerializableFunction<RetryJobId, Job> pollJob;
+ private final String projectId;
+ private final String bqLocation;
+ private final JobService jobService;
+ private final int maxRetries;
+ private int currentAttempt;
+ RetryJobId currentJobId;
+ Job lastJobAttempted;
+ boolean started;
+
+ PendingJob(
+ SerializableFunction<RetryJobId, Void> executeJob,
+ SerializableFunction<RetryJobId, Job> pollJob,
+ String projectId,
+ String bqLocation,
+ JobService jobService,
+ int maxRetries,
+ String jobIdPrefix) {
+ this.executeJob = executeJob;
+ this.pollJob = pollJob;
+ this.projectId = projectId;
+ this.bqLocation = bqLocation;
+ this.jobService = jobService;
+ this.maxRetries = maxRetries;
+ this.currentAttempt = 0;
+ currentJobId = new RetryJobId(jobIdPrefix, 0);
+ this.started = false;
+ }
+
+ // Run the job.
+ void runJob() throws InterruptedException, IOException {
+ ++currentAttempt;
+ if (!shouldRetry()) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to create job with prefix %s, "
+ + "reached max retries: %d, last failed job: %s.",
+ currentJobId.getJobIdPrefix(),
+ maxRetries,
+ BigQueryHelpers.jobToPrettyString(lastJobAttempted)));
+ }
+
+ try {
+ executeJob.apply(currentJobId);
+ } catch (RuntimeException e) {
+ LOG.warn("Job {} failed with {}", currentJobId.getJobId(), e);
+ // It's possible that the job actually made it to BQ even though we
got a failure here.
+ // For example, the response from BQ may have timed out returning.
getRetryJobId will
+ // return the correct job id to use on retry, or a job id to continue
polling (if it turns
+ // out the the job has not actually failed yet).
+ RetryJobIdResult result = getRetryJobId(currentJobId, projectId,
bqLocation, jobService);
+ currentJobId = result.jobId;
+ if (result.shouldRetry) {
+ // Otherwise the jobs either never started or started and failed.
Try the job again with
+ // the job id returned by getRetryJobId.
+ return;
+ }
+ }
+ LOG.info("job {} started", currentJobId.getJobId());
+ // The job has reached BigQuery and is in either the PENDING state or
has completed
+ // successfully.
+ this.started = true;
+ }
+
+ // Poll the status of the job. Returns true if the job has completed
successfully and false
+ // otherwise.
+ boolean pollJob() throws InterruptedException, IOException {
+ if (started) {
+ Job job = pollJob.apply(currentJobId);
+ this.lastJobAttempted = job;
+ Status jobStatus = parseStatus(job);
+ switch (jobStatus) {
+ case SUCCEEDED:
+ LOG.info("Load job {} succeeded. Statistics: {}", currentJobId,
job.getStatistics());
+ return true;
+ case UNKNOWN:
+ // This might happen if BigQuery's job listing is slow. Retry with
the same
+ // job id.
+ LOG.info(
+ "Load job {} finished in unknown state: {}: {}",
+ currentJobId,
+ job.getStatus(),
+ shouldRetry() ? "will retry" : "will not retry");
+ return false;
+ case FAILED:
+ String oldJobId = currentJobId.getJobId();
+ currentJobId =
+ BigQueryHelpers.getRetryJobId(currentJobId, projectId,
bqLocation, jobService)
+ .jobId;
+ LOG.info(
+ "Load job {} failed, {}: {}. Next job id {}",
+ oldJobId,
+ shouldRetry() ? "will retry" : "will not retry",
+ job.getStatus(),
+ currentJobId);
+ return false;
+ default:
Review comment:
no
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 152058)
Time Spent: 1.5h (was: 1h 20m)
> Move load job poll to finishBundle() method to better parallelize execution
> ---------------------------------------------------------------------------
>
> Key: BEAM-5105
> URL: https://issues.apache.org/jira/browse/BEAM-5105
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Chamikara Jayalath
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> It appears that when we write to BigQuery using WriteTablesDoFn we start a
> load job and wait for that job to finish.
> [https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L318]
>
> In cases where we are trying to write a PCollection of tables (for example,
> when user use dynamic destinations feature) this relies on dynamic work
> rebalancing to parallellize execution of load jobs. If the runner does not
> support dynamic work rebalancing or does not execute dynamic work rebalancing
> from some reason this could have significant performance drawbacks. For
> example, scheduling times for load jobs will add up.
>
> A better approach might be to start load jobs at process() method but wait
> for all load jobs to finish at finishBundle() method. This will parallelize
> any overheads as well as job execution (assuming more than one job is
> schedule by BQ.).
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)