http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 6aff3b0..8b5e8c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -53,14 +53,13 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; -import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Transport; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,14 +71,14 @@ class BigQueryServicesImpl implements BigQueryServices { private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); - // The maximum number of attempts to execute a BigQuery RPC. - private static final int MAX_RPC_ATTEMPTS = 10; + // The maximum number of retries to execute a BigQuery RPC. + private static final int MAX_RPC_RETRIES = 9; // The initial backoff for executing a BigQuery RPC. - private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds(1); // The initial backoff for polling the status of a BigQuery job. - private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1); @Override public JobService getJobService(BigQueryOptions options) { @@ -121,9 +120,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void startLoadJob( @@ -139,9 +138,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) @@ -157,9 +156,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) @@ -175,9 +174,9 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) @@ -194,7 +193,8 @@ class BigQueryServicesImpl implements BigQueryServices { ApiErrorExtractor errorExtractor, Bigquery client) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff); } @@ -227,15 +227,17 @@ class BigQueryServicesImpl implements BigQueryServices { throw new IOException( String.format( "Unable to insert job: %s, aborting after %d .", - jobRef.getJobId(), MAX_RPC_ATTEMPTS), + jobRef.getJobId(), MAX_RPC_RETRIES), lastException); } @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { - BackOff backoff = new AttemptBoundedExponentialBackOff( - maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS); + BackOff backoff = + FluentBackoff.DEFAULT + .withMaxRetries(maxAttempts).withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF) + .backoff(); return pollJob(jobRef, Sleeper.DEFAULT, backoff); } @@ -270,12 +272,13 @@ class BigQueryServicesImpl implements BigQueryServices { .setQuery(query)) .setDryRun(true)); BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.jobs().insert(projectId, job), String.format( "Unable to dry run query: %s, aborting after %d retries.", - query, MAX_RPC_ATTEMPTS), + query, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff).getStatistics(); } @@ -289,15 +292,14 @@ class BigQueryServicesImpl implements BigQueryServices { // The maximum number of rows to upload per InsertAll request. private static final long MAX_ROWS_PER_BATCH = 500; - // The maximum number of times to retry inserting rows into BigQuery. - private static final int MAX_INSERT_ATTEMPTS = 5; - - // The initial backoff after a failure inserting rows into BigQuery. - private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L; + private static final FluentBackoff INSERT_BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5); - // Backoff time bounds for rate limit exceeded errors. - private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1); - private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2); + // A backoff for rate limit exceeded errors. Retries forever. + private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxBackoff(Duration.standardMinutes(2)); private final ApiErrorExtractor errorExtractor; private final Bigquery client; @@ -335,20 +337,21 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public Table getTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.tables().get(projectId, datasetId, tableId), String.format( "Unable to get table: %s, aborting after %d retries.", - tableId, MAX_RPC_ATTEMPTS), + tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff); } @@ -356,20 +359,21 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void deleteTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); executeWithRetries( client.tables().delete(projectId, datasetId, tableId), String.format( "Unable to delete table: %s, aborting after %d retries.", - tableId, MAX_RPC_ATTEMPTS), + tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff); } @@ -378,12 +382,13 @@ class BigQueryServicesImpl implements BigQueryServices { public boolean isTableEmpty(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); TableDataList dataList = executeWithRetries( client.tabledata().list(projectId, datasetId, tableId), String.format( "Unable to list table data: %s, aborting after %d retries.", - tableId, MAX_RPC_ATTEMPTS), + tableId, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff); return dataList.getRows() == null || dataList.getRows().isEmpty(); @@ -392,20 +397,21 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public Dataset getDataset(String projectId, String datasetId) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); return executeWithRetries( client.datasets().get(projectId, datasetId), String.format( "Unable to get dataset: %s, aborting after %d retries.", - datasetId, MAX_RPC_ATTEMPTS), + datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff); } @@ -413,21 +419,21 @@ class BigQueryServicesImpl implements BigQueryServices { /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void createDataset( String projectId, String datasetId, String location, String description) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); } - @VisibleForTesting - void createDataset( + private void createDataset( String projectId, String datasetId, String location, @@ -464,27 +470,28 @@ class BigQueryServicesImpl implements BigQueryServices { throw new IOException( String.format( "Unable to create dataset: %s, aborting after %d .", - datasetId, MAX_RPC_ATTEMPTS), + datasetId, MAX_RPC_RETRIES), lastException); } /** * {@inheritDoc} * - * <p>Tries executing the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. * - * @throws IOException if it exceeds {@code MAX_RPC_ATTEMPTS} attempts. + * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. */ @Override public void deleteDataset(String projectId, String datasetId) throws IOException, InterruptedException { BackOff backoff = - new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff(); executeWithRetries( client.datasets().delete(projectId, datasetId), String.format( "Unable to delete table: %s, aborting after %d retries.", - datasetId, MAX_RPC_ATTEMPTS), + datasetId, MAX_RPC_RETRIES), Sleeper.DEFAULT, backoff); } @@ -502,9 +509,7 @@ class BigQueryServicesImpl implements BigQueryServices { + "as many elements as rowList"); } - AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff( - MAX_INSERT_ATTEMPTS, - INITIAL_INSERT_BACKOFF_INTERVAL_MS); + BackOff backoff = INSERT_BACKOFF_FACTORY.backoff(); long retTotalDataSize = 0; List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>(); @@ -547,8 +552,7 @@ class BigQueryServicesImpl implements BigQueryServices { executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() { @Override public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException { - BackOff backoff = new IntervalBoundedExponentialBackOff( - MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS); + BackOff backoff = DEFAULT_BACKOFF_FACTORY.backoff(); while (true) { try { return insert.execute().getInsertErrors(); @@ -603,21 +607,24 @@ class BigQueryServicesImpl implements BigQueryServices { throw new RuntimeException(e.getCause()); } - if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) { - try { - Thread.sleep(backoff.nextBackOffMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException( - "Interrupted while waiting before retrying insert of " + retryRows); - } - LOG.info("Retrying failed inserts to BigQuery"); - rowsToPublish = retryRows; - idsToPublish = retryIds; - allErrors.clear(); - } else { + if (allErrors.isEmpty()) { + break; + } + long nextBackoffMillis = backoff.nextBackOffMillis(); + if (nextBackoffMillis == BackOff.STOP) { break; } + try { + Thread.sleep(backoff.nextBackOffMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + "Interrupted while waiting before retrying insert of " + retryRows); + } + LOG.info("Retrying failed inserts to BigQuery"); + rowsToPublish = retryRows; + idsToPublish = retryIds; + allErrors.clear(); } if (!allErrors.isEmpty()) { throw new IOException("Insert failed: " + allErrors); @@ -628,7 +635,7 @@ class BigQueryServicesImpl implements BigQueryServices { } private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { - BigQueryTableRowIterator iterator; + private BigQueryTableRowIterator iterator; private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { this.iterator = iterator; @@ -706,7 +713,6 @@ class BigQueryServicesImpl implements BigQueryServices { /** * Identical to {@link BackOffUtils#next} but without checked IOException. - * @throws InterruptedException */ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { try {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java index 729da97..677c661 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java @@ -58,7 +58,7 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -426,7 +426,8 @@ class BigQueryTableRowIterator implements AutoCloseable { Object... errorArgs) throws IOException, InterruptedException { Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = - new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis()); + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff(); T result = null; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 6bd03b5..45871f1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -76,12 +76,13 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -862,16 +863,11 @@ public class DatastoreV1 { private final V1DatastoreFactory datastoreFactory; // Current batch of mutations to be written. private final List<Mutation> mutations = new ArrayList<>(); - /** - * Since a bundle is written in batches, we should retry the commit of a batch in order to - * prevent transient errors from causing the bundle to fail. - */ - private static final int MAX_RETRIES = 5; - /** - * Initial backoff time for exponential backoff for retry attempts. - */ - private static final int INITIAL_BACKOFF_MILLIS = 5000; + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BUNDLE_WRITE_BACKOFF = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(Duration.standardSeconds(5)); DatastoreWriterFn(String projectId) { this(projectId, new V1DatastoreFactory()); @@ -906,10 +902,10 @@ public class DatastoreV1 { /** * Writes a batch of mutations to Cloud Datastore. * - * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES} - * times). All mutations in the batch will be committed again, even if the commit was partially - * successful. If the retry limit is exceeded, the last exception from the Cloud Datastore will - * be thrown. + * <p>If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All + * mutations in the batch will be committed again, even if the commit was partially + * successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be + * thrown. * * @throws DatastoreException if the commit fails or IOException or InterruptedException if * backing off between retries fails. @@ -917,7 +913,7 @@ public class DatastoreV1 { private void flushBatch() throws DatastoreException, IOException, InterruptedException { LOG.debug("Writing batch of {} mutations", mutations.size()); Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); + BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff(); while (true) { // Batch upsert entities. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index eb5fbe6..16cb004 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -56,7 +56,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; import org.junit.Before; @@ -117,9 +117,8 @@ public class BigQueryServicesImplTest { when(response.getContent()).thenReturn(toStream(testJob)); Sleeper sleeper = new FastNanoClockAndSleeper(); - BackOff backoff = new AttemptBoundedExponentialBackOff( - 5 /* attempts */, 1000 /* initialIntervalMillis */); - JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + JobServiceImpl.startJob( + testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); @@ -141,9 +140,8 @@ public class BigQueryServicesImplTest { when(response.getStatusCode()).thenReturn(409); // 409 means already exists Sleeper sleeper = new FastNanoClockAndSleeper(); - BackOff backoff = new AttemptBoundedExponentialBackOff( - 5 /* attempts */, 1000 /* initialIntervalMillis */); - JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + JobServiceImpl.startJob( + testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); @@ -169,9 +167,8 @@ public class BigQueryServicesImplTest { .thenReturn(toStream(testJob)); Sleeper sleeper = new FastNanoClockAndSleeper(); - BackOff backoff = new AttemptBoundedExponentialBackOff( - 5 /* attempts */, 1000 /* initialIntervalMillis */); - JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + JobServiceImpl.startJob( + testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index a596bb3..b680a0e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -54,8 +54,9 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,7 +233,7 @@ class V1TestUtil { // Number of times to retry on update failure private static final int MAX_RETRIES = 5; //Initial backoff time for exponential backoff for retry attempts. - private static final int INITIAL_BACKOFF_MILLIS = 5000; + private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(5); // Returns true if a Datastore key is complete. A key is complete if its last element // has either an id or a name. @@ -279,7 +280,9 @@ class V1TestUtil { private void flushBatch() throws DatastoreException, IOException, InterruptedException { LOG.info("Writing batch of {} entities", entities.size()); Sleeper sleeper = Sleeper.DEFAULT; - BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS); + BackOff backoff = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF).backoff(); while (true) { // Batch mutate entities.