FluentBackoff: a replacement for a variety of custom backoff implementations
We have 3 different backoff classes, which don't really have that much different functionality. Add a single, flexible backoff implementation that can be used to replace all three classes. Additionally, this new backoff actually supports more functionality than any of the other three did -- you can limit retries, cap the exponential growth of an individual backoff, and cap the cumulative time spent in backoff; prior implementations did not allow all 3. This also makes the parameters self-obvious (Duration, not number-that-is-also-millis) where appropriate. This initial PR should have no functional changes. * Implement FluentBackoff * Replace other custom BackOff implementations with FluentBackoff Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f485666 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f485666 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f485666 Branch: refs/heads/gearpump-runner Commit: 3f48566618552c4b0fa026aa3a75ef6f1875da63 Parents: c92e45d Author: Dan Halperin <dhalp...@google.com> Authored: Wed Aug 24 22:35:26 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:12 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/common/ExampleUtils.java | 7 +- .../runners/dataflow/DataflowPipelineJob.java | 72 +++--- .../beam/runners/dataflow/util/PackageUtil.java | 31 +-- .../dataflow/DataflowPipelineJobTest.java | 32 ++- .../sdk/io/BoundedReadFromUnboundedSource.java | 8 +- ...AttemptAndTimeBoundedExponentialBackOff.java | 172 -------------- .../util/AttemptBoundedExponentialBackOff.java | 85 ------- .../org/apache/beam/sdk/util/FluentBackoff.java | 229 +++++++++++++++++++ .../java/org/apache/beam/sdk/util/GcsUtil.java | 18 +- .../util/IntervalBoundedExponentialBackOff.java | 1 + ...mptAndTimeBoundedExponentialBackOffTest.java | 212 ----------------- .../AttemptBoundedExponentialBackOffTest.java | 84 ------- .../apache/beam/sdk/util/FluentBackoffTest.java | 226 ++++++++++++++++++ .../org/apache/beam/sdk/util/GcsUtilTest.java | 10 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 21 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 152 ++++++------ .../gcp/bigquery/BigQueryTableRowIterator.java | 5 +- .../beam/sdk/io/gcp/datastore/DatastoreV1.java | 26 +-- .../gcp/bigquery/BigQueryServicesImplTest.java | 17 +- .../beam/sdk/io/gcp/datastore/V1TestUtil.java | 9 +- 20 files changed, 675 insertions(+), 742 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java index eadb580..2e8dcf6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/ExampleUtils.java @@ -45,8 +45,9 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PubsubOptions; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Transport; +import org.joda.time.Duration; /** * The utility class that sets up and tears down external resources, @@ -79,7 +80,9 @@ public class ExampleUtils { */ public void setup() throws IOException { Sleeper sleeper = Sleeper.DEFAULT; - BackOff backOff = new AttemptBoundedExponentialBackOff(3, 200); + BackOff backOff = + FluentBackoff.DEFAULT + .withMaxRetries(3).withInitialBackoff(Duration.millis(200)).backoff(); Throwable lastException = null; try { do { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 9a515fa..dad59f2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -34,7 +34,6 @@ import java.io.IOException; import java.net.SocketTimeoutException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; @@ -44,8 +43,7 @@ import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.util.AttemptAndTimeBoundedExponentialBackOff; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,14 +93,27 @@ public class DataflowPipelineJob implements PipelineResult { /** * The polling interval for job status and messages information. */ - static final long MESSAGES_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); - static final long STATUS_POLLING_INTERVAL = TimeUnit.SECONDS.toMillis(2); + static final Duration MESSAGES_POLLING_INTERVAL = Duration.standardSeconds(2); + static final Duration STATUS_POLLING_INTERVAL = Duration.standardSeconds(2); + + static final double DEFAULT_BACKOFF_EXPONENT = 1.5; /** - * The amount of polling attempts for job status and messages information. + * The amount of polling retries for job status and messages information. */ - static final int MESSAGES_POLLING_ATTEMPTS = 12; - static final int STATUS_POLLING_ATTEMPTS = 5; + static final int MESSAGES_POLLING_RETRIES = 11; + static final int STATUS_POLLING_RETRIES = 4; + + private static final FluentBackoff MESSAGES_BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(MESSAGES_POLLING_INTERVAL) + .withMaxRetries(MESSAGES_POLLING_RETRIES) + .withExponent(DEFAULT_BACKOFF_EXPONENT); + protected static final FluentBackoff STATUS_BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(STATUS_POLLING_INTERVAL) + .withMaxRetries(STATUS_POLLING_RETRIES) + .withExponent(DEFAULT_BACKOFF_EXPONENT); /** * Constructs the job. @@ -214,21 +225,23 @@ public class DataflowPipelineJob implements PipelineResult { MonitoringUtil monitor = new MonitoringUtil(projectId, dataflowOptions.getDataflowClient()); long lastTimestamp = 0; - BackOff backoff = - duration.getMillis() > 0 - ? new AttemptAndTimeBoundedExponentialBackOff( - MESSAGES_POLLING_ATTEMPTS, - MESSAGES_POLLING_INTERVAL, - duration.getMillis(), - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, - nanoClock) - : new AttemptBoundedExponentialBackOff( - MESSAGES_POLLING_ATTEMPTS, MESSAGES_POLLING_INTERVAL); + BackOff backoff; + if (!duration.isLongerThan(Duration.ZERO)) { + backoff = MESSAGES_BACKOFF_FACTORY.backoff(); + } else { + backoff = MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration).backoff(); + } + + // This function tracks the cumulative time from the *first request* to enforce the wall-clock + // limit. Any backoff instance could, at best, track the the time since the first attempt at a + // given request. Thus, we need to track the cumulative time ourselves. + long startNanos = nanoClock.nanoTime(); + State state; do { // Get the state of the job before listing messages. This ensures we always fetch job // messages after the job finishes to ensure we have all them. - state = getStateWithRetries(1, sleeper); + state = getStateWithRetries(STATUS_BACKOFF_FACTORY.withMaxRetries(0).backoff(), sleeper); boolean hasError = state == State.UNKNOWN; if (messageHandler != null && !hasError) { @@ -250,7 +263,16 @@ public class DataflowPipelineJob implements PipelineResult { } if (!hasError) { + // Reset the backoff. backoff.reset(); + // If duration is set, update the new cumulative sleep time to be the remaining + // part of the total input sleep duration. + if (duration.isLongerThan(Duration.ZERO)) { + long nanosConsumed = nanoClock.nanoTime() - startNanos; + Duration consumed = Duration.millis((nanosConsumed + 999999) / 1000000); + backoff = + MESSAGES_BACKOFF_FACTORY.withMaxCumulativeBackoff(duration.minus(consumed)).backoff(); + } // Check if the job is done. if (state.isTerminal()) { return state; @@ -287,7 +309,7 @@ public class DataflowPipelineJob implements PipelineResult { return terminalState; } - return getStateWithRetries(STATUS_POLLING_ATTEMPTS, Sleeper.DEFAULT); + return getStateWithRetries(STATUS_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT); } /** @@ -299,7 +321,7 @@ public class DataflowPipelineJob implements PipelineResult { * @return The state of the job or State.UNKNOWN in case of failure. */ @VisibleForTesting - State getStateWithRetries(int attempts, Sleeper sleeper) { + State getStateWithRetries(BackOff attempts, Sleeper sleeper) { if (terminalState != null) { return terminalState; } @@ -318,17 +340,13 @@ public class DataflowPipelineJob implements PipelineResult { * Attempts to get the underlying {@link Job}. Uses exponential backoff on failure up to the * maximum number of passed in attempts. * - * @param attempts The amount of attempts to make. + * @param backoff the {@link BackOff} used to control retries. * @param sleeper Object used to do the sleeps between attempts. * @return The underlying {@link Job} object. * @throws IOException When the maximum number of retries is exhausted, the last exception is * thrown. */ - @VisibleForTesting - Job getJobWithRetries(int attempts, Sleeper sleeper) throws IOException { - AttemptBoundedExponentialBackOff backoff = - new AttemptBoundedExponentialBackOff(attempts, STATUS_POLLING_INTERVAL); - + private Job getJobWithRetries(BackOff backoff, Sleeper sleeper) throws IOException { // Retry loop ends in return or throw while (true) { try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index bf1f666..6d910ba 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -18,7 +18,7 @@ package org.apache.beam.runners.dataflow.util; import com.fasterxml.jackson.core.Base64Variants; -import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.dataflow.model.DataflowPackage; import com.google.cloud.hadoop.util.ApiErrorExtractor; @@ -37,10 +37,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Objects; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.ZipFiles; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,11 +55,15 @@ public class PackageUtil { /** * The initial interval to use between package staging attempts. */ - private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L; + private static final Duration INITIAL_BACKOFF_INTERVAL = Duration.standardSeconds(5); /** - * The maximum number of attempts when staging a file. + * The maximum number of retries when staging a file. */ - private static final int MAX_ATTEMPTS = 5; + private static final int MAX_RETRIES = 4; + + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_INTERVAL); /** * Translates exceptions from API calls. @@ -199,9 +204,7 @@ public class PackageUtil { } // Upload file, retrying on failure. - AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff( - MAX_ATTEMPTS, - INITIAL_BACKOFF_INTERVAL_MS); + BackOff backoff = BACKOFF_FACTORY.backoff(); while (true) { try { LOG.debug("Uploading classpath element {} to {}", classpathElement, target); @@ -219,15 +222,17 @@ public class PackageUtil { + "'gcloud auth login'.", classpathElement, target); LOG.error(errorMessage); throw new IOException(errorMessage, e); - } else if (!backoff.atMaxAttempts()) { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - classpathElement, e); - BackOffUtils.next(retrySleeper, backoff); - } else { + } + long sleep = backoff.nextBackOffMillis(); + if (sleep == BackOff.STOP) { // Rethrow last error, to be included as a cause in the catch below. LOG.error("Upload failed, will NOT retry staging of classpath: {}", classpathElement, e); throw e; + } else { + LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", + classpathElement, e); + retrySleeper.sleep(sleep); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 22b5400..226140a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.joda.time.Duration; @@ -111,22 +110,21 @@ public class DataflowPipelineJobTest { * AttemptBoundedExponentialBackOff given the number of retries and * an initial polling interval. * - * @param pollingIntervalMillis The initial polling interval given. - * @param attempts The number of attempts made + * @param pollingInterval The initial polling interval given. + * @param retries The number of retries made * @param timeSleptMillis The amount of time slept by the clock. This is checked * against the valid interval. */ - void checkValidInterval(long pollingIntervalMillis, int attempts, long timeSleptMillis) { + private void checkValidInterval(Duration pollingInterval, int retries, long timeSleptMillis) { long highSum = 0; long lowSum = 0; - for (int i = 1; i < attempts; i++) { + for (int i = 0; i < retries; i++) { double currentInterval = - pollingIntervalMillis - * Math.pow(AttemptBoundedExponentialBackOff.DEFAULT_MULTIPLIER, i - 1); - double offset = - AttemptBoundedExponentialBackOff.DEFAULT_RANDOMIZATION_FACTOR * currentInterval; - highSum += Math.round(currentInterval + offset); - lowSum += Math.round(currentInterval - offset); + pollingInterval.getMillis() + * Math.pow(DataflowPipelineJob.DEFAULT_BACKOFF_EXPONENT, i); + double randomOffset = 0.5 * currentInterval; + highSum += Math.round(currentInterval + randomOffset); + lowSum += Math.round(currentInterval - randomOffset); } assertThat(timeSleptMillis, allOf(greaterThanOrEqualTo(lowSum), lessThanOrEqualTo(highSum))); } @@ -228,7 +226,7 @@ public class DataflowPipelineJobTest { assertEquals(null, state); long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); checkValidInterval(DataflowPipelineJob.MESSAGES_POLLING_INTERVAL, - DataflowPipelineJob.MESSAGES_POLLING_ATTEMPTS, timeDiff); + DataflowPipelineJob.MESSAGES_POLLING_RETRIES, timeDiff); } @Test @@ -246,8 +244,8 @@ public class DataflowPipelineJobTest { State state = job.waitUntilFinish(Duration.millis(4), null, fastClock, fastClock); assertEquals(null, state); long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); - // Should only sleep for the 4 ms remaining. - assertEquals(timeDiff, 4L); + // Should only have slept for the 4 ms allowed. + assertEquals(4L, timeDiff); } @Test @@ -268,7 +266,7 @@ public class DataflowPipelineJobTest { assertEquals( State.RUNNING, - job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock)); + job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock)); } @Test @@ -286,10 +284,10 @@ public class DataflowPipelineJobTest { long startTime = fastClock.nanoTime(); assertEquals( State.UNKNOWN, - job.getStateWithRetries(DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, fastClock)); + job.getStateWithRetries(DataflowPipelineJob.STATUS_BACKOFF_FACTORY.backoff(), fastClock)); long timeDiff = TimeUnit.NANOSECONDS.toMillis(fastClock.nanoTime() - startTime); checkValidInterval(DataflowPipelineJob.STATUS_POLLING_INTERVAL, - DataflowPipelineJob.STATUS_POLLING_ATTEMPTS, timeDiff); + DataflowPipelineJob.STATUS_POLLING_RETRIES, timeDiff); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index 28d7746..b41c655 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.ValueWithRecordId; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -52,6 +52,10 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T private final UnboundedSource<T, ?> source; private final long maxNumRecords; private final Duration maxReadTime; + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(10)) + .withMaxBackoff(Duration.standardSeconds(10)); /** * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount @@ -241,7 +245,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PCollection<T private boolean advanceWithBackoff() throws IOException { // Try reading from the source with exponential backoff - BackOff backoff = new IntervalBoundedExponentialBackOff(10000L, 10L); + BackOff backoff = BACKOFF_FACTORY.backoff(); long nextSleep = backoff.nextBackOffMillis(); while (nextSleep != BackOff.STOP) { if (endTime != null && Instant.now().isAfter(endTime)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java deleted file mode 100644 index d8050e0..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOff.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.NanoClock; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.concurrent.TimeUnit; - -/** - * Extension of {@link AttemptBoundedExponentialBackOff} that bounds the total time that the backoff - * is happening as well as the amount of retries. Acts exactly as a AttemptBoundedExponentialBackOff - * unless the time interval has expired since the object was created. At this point, it will always - * return BackOff.STOP. Calling reset() resets both the timer and the number of retry attempts, - * unless a custom ResetPolicy (ResetPolicy.ATTEMPTS or ResetPolicy.TIMER) is passed to the - * constructor. - * - * <p>Implementation is not thread-safe. - */ -public class AttemptAndTimeBoundedExponentialBackOff extends AttemptBoundedExponentialBackOff { - private long endTimeMillis; - private long maximumTotalWaitTimeMillis; - private ResetPolicy resetPolicy; - private final NanoClock nanoClock; - // NanoClock.SYSTEM has a max elapsed time of 292 years or 2^63 ns. Here, we choose 2^53 ns as - // a smaller but still huge limit. - private static final long MAX_ELAPSED_TIME_MILLIS = 1L << 53; - - /** - * A ResetPolicy controls the behavior of this BackOff when reset() is called. By default, both - * the number of attempts and the time bound for the BackOff are reset, but an alternative - * ResetPolicy may be set to only reset one of these two. - */ - public static enum ResetPolicy { - ALL, - ATTEMPTS, - TIMER - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, long initialIntervalMillis, long maximumTotalWaitTimeMillis) { - this( - maximumNumberOfAttempts, - initialIntervalMillis, - maximumTotalWaitTimeMillis, - ResetPolicy.ALL, - NanoClock.SYSTEM); - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject - * to being reset. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, - long initialIntervalMillis, - long maximumTotalWaitTimeMillis, - ResetPolicy resetPolicy) { - this( - maximumNumberOfAttempts, - initialIntervalMillis, - maximumTotalWaitTimeMillis, - resetPolicy, - NanoClock.SYSTEM); - } - - /** - * Constructs an instance of AttemptAndTimeBoundedExponentialBackoff. - * - * @param maximumNumberOfAttempts The maximum number of attempts it will make. - * @param initialIntervalMillis The original interval to wait between attempts in milliseconds. - * @param maximumTotalWaitTimeMillis The maximum total time that this object will - * allow more attempts in milliseconds. - * @param resetPolicy The ResetPolicy specifying the properties of this BackOff that are subject - * to being reset. - * @param nanoClock clock used to measure the time that has passed. - */ - public AttemptAndTimeBoundedExponentialBackOff( - int maximumNumberOfAttempts, - long initialIntervalMillis, - long maximumTotalWaitTimeMillis, - ResetPolicy resetPolicy, - NanoClock nanoClock) { - super(maximumNumberOfAttempts, initialIntervalMillis); - checkArgument( - maximumTotalWaitTimeMillis > 0, "Maximum total wait time must be greater than zero."); - checkArgument( - maximumTotalWaitTimeMillis < MAX_ELAPSED_TIME_MILLIS, - "Maximum total wait time must be less than " + MAX_ELAPSED_TIME_MILLIS + " milliseconds"); - checkArgument(resetPolicy != null, "resetPolicy may not be null"); - checkArgument(nanoClock != null, "nanoClock may not be null"); - this.maximumTotalWaitTimeMillis = maximumTotalWaitTimeMillis; - this.resetPolicy = resetPolicy; - this.nanoClock = nanoClock; - // Set the end time for this BackOff. Note that we cannot simply call reset() here since the - // resetPolicy may not be set to reset the time bound. - endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis; - } - - @Override - @SuppressFBWarnings(value = "UR_UNINIT_READ_CALLED_FROM_SUPER_CONSTRUCTOR", - justification = "Explicitly handled in implementation.") - public void reset() { - // reset() is called in the constructor of the parent class before resetPolicy and nanoClock are - // set. In this case, we call the parent class's reset() method and return. - if (resetPolicy == null) { - super.reset(); - return; - } - // Reset the number of attempts. - if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.ATTEMPTS) { - super.reset(); - } - // Reset the time bound. - if (resetPolicy == ResetPolicy.ALL || resetPolicy == ResetPolicy.TIMER) { - endTimeMillis = getTimeMillis() + maximumTotalWaitTimeMillis; - } - } - - public void setEndtimeMillis(long endTimeMillis) { - this.endTimeMillis = endTimeMillis; - } - - @Override - public long nextBackOffMillis() { - if (atMaxAttempts()) { - return BackOff.STOP; - } - long backoff = Math.min(super.nextBackOffMillis(), endTimeMillis - getTimeMillis()); - return (backoff > 0 ? backoff : BackOff.STOP); - } - - private long getTimeMillis() { - return TimeUnit.NANOSECONDS.toMillis(nanoClock.nanoTime()); - } - - @Override - public boolean atMaxAttempts() { - return super.atMaxAttempts() || getTimeMillis() >= endTimeMillis; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java deleted file mode 100644 index 5707293..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOff.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.util.BackOff; - - -/** - * Implementation of {@link BackOff} that increases the back off period for each retry attempt - * using a randomization function that grows exponentially. - * - * <p>Example: The initial interval is .5 seconds and the maximum number of retries is 10. - * For 10 tries the sequence will be (values in seconds): - * - * <pre> - * retry# retry_interval randomized_interval - * 1 0.5 [0.25, 0.75] - * 2 0.75 [0.375, 1.125] - * 3 1.125 [0.562, 1.687] - * 4 1.687 [0.8435, 2.53] - * 5 2.53 [1.265, 3.795] - * 6 3.795 [1.897, 5.692] - * 7 5.692 [2.846, 8.538] - * 8 8.538 [4.269, 12.807] - * 9 12.807 [6.403, 19.210] - * 10 {@link BackOff#STOP} - * </pre> - * - * <p>Implementation is not thread-safe. - */ -public class AttemptBoundedExponentialBackOff implements BackOff { - public static final double DEFAULT_MULTIPLIER = 1.5; - public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; - private final int maximumNumberOfAttempts; - private final long initialIntervalMillis; - private int currentAttempt; - - public AttemptBoundedExponentialBackOff(int maximumNumberOfAttempts, long initialIntervalMillis) { - checkArgument(maximumNumberOfAttempts > 0, - "Maximum number of attempts must be greater than zero."); - checkArgument(initialIntervalMillis > 0, "Initial interval must be greater than zero."); - this.maximumNumberOfAttempts = maximumNumberOfAttempts; - this.initialIntervalMillis = initialIntervalMillis; - reset(); - } - - @Override - public void reset() { - currentAttempt = 1; - } - - @Override - public long nextBackOffMillis() { - if (currentAttempt >= maximumNumberOfAttempts) { - return BackOff.STOP; - } - double currentIntervalMillis = initialIntervalMillis - * Math.pow(DEFAULT_MULTIPLIER, currentAttempt - 1); - double randomOffset = (Math.random() * 2 - 1) - * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; - currentAttempt += 1; - return Math.round(currentIntervalMillis + randomOffset); - } - - public boolean atMaxAttempts() { - return currentAttempt >= maximumNumberOfAttempts; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java new file mode 100644 index 0000000..479d7a8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.util.BackOff; +import com.google.common.base.MoreObjects; +import org.joda.time.Duration; + +/** + * A fluent builder for {@link BackOff} objects that allows customization of the retry algorithm. + * + * @see #DEFAULT for the default configuration parameters. + */ +public final class FluentBackoff { + + private static final double DEFAULT_EXPONENT = 1.5; + private static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; + private static final Duration DEFAULT_MIN_BACKOFF = Duration.standardSeconds(1); + private static final Duration DEFAULT_MAX_BACKOFF = Duration.standardDays(1000); + private static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE; + private static final Duration DEFAULT_MAX_CUM_BACKOFF = Duration.standardDays(1000); + + private final double exponent; + private final Duration initialBackoff; + private final Duration maxBackoff; + private final Duration maxCumulativeBackoff; + private final int maxRetries; + + /** + * By default the {@link BackOff} created by this builder will use exponential backoff (base + * exponent 1.5) with an initial backoff of 1 second. These parameters can be overridden with + * {@link #withExponent(double)} and {@link #withInitialBackoff(Duration)}, + * respectively, and the maximum backoff after exponential increase can be capped using {@link + * FluentBackoff#withMaxBackoff(Duration)}. + * + * <p>The default {@link BackOff} does not limit the number of retries. To limit the backoff, the + * maximum total number of retries can be set using {@link #withMaxRetries(int)}. The + * total time spent in backoff can be time-bounded as well by configuring {@link + * #withMaxCumulativeBackoff(Duration)}. If either of these limits are reached, calls + * to {@link BackOff#nextBackOffMillis()} will return {@link BackOff#STOP} to signal that no more + * retries should continue. + */ + public static final FluentBackoff DEFAULT = new FluentBackoff( + DEFAULT_EXPONENT, + DEFAULT_MIN_BACKOFF, DEFAULT_MAX_BACKOFF, DEFAULT_MAX_CUM_BACKOFF, + DEFAULT_MAX_RETRIES); + + /** + * Instantiates a {@link BackOff} that will obey the current configuration. + * + * @see FluentBackoff + */ + public BackOff backoff() { + return new BackoffImpl(this); + } + + /** + * Returns a copy of this {@link FluentBackoff} that instead uses the specified exponent to + * control the exponential growth of delay. + * + * <p>Does not modify this object. + * + * @see FluentBackoff + */ + public FluentBackoff withExponent(double exponent) { + checkArgument(exponent > 0, "exponent %s must be greater than 0", exponent); + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + } + + /** + * Returns a copy of this {@link FluentBackoff} that instead uses the specified initial backoff + * duration. + * + * <p>Does not modify this object. + * + * @see FluentBackoff + */ + public FluentBackoff withInitialBackoff(Duration initialBackoff) { + checkArgument( + initialBackoff.isLongerThan(Duration.ZERO), + "initialBackoff %s must be at least 1 millisecond", + initialBackoff); + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + } + + /** + * Returns a copy of this {@link FluentBackoff} that limits the maximum backoff of an individual + * attempt to the specified duration. + * + * <p>Does not modify this object. + * + * @see FluentBackoff + */ + public FluentBackoff withMaxBackoff(Duration maxBackoff) { + checkArgument( + maxBackoff.getMillis() > 0, + "maxBackoff %s must be at least 1 millisecond", + maxBackoff); + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + } + + /** + * Returns a copy of this {@link FluentBackoff} that limits the total time spent in backoff + * returned across all calls to {@link BackOff#nextBackOffMillis()}. + * + * <p>Does not modify this object. + * + * @see FluentBackoff + */ + public FluentBackoff withMaxCumulativeBackoff(Duration maxCumulativeBackoff) { + checkArgument(maxCumulativeBackoff.isLongerThan(Duration.ZERO), + "maxCumulativeBackoff %s must be at least 1 millisecond", maxCumulativeBackoff); + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + } + + /** + * Returns a copy of this {@link FluentBackoff} that limits the total number of retries, aka + * the total number of calls to {@link BackOff#nextBackOffMillis()} before returning + * {@link BackOff#STOP}. + * + * <p>Does not modify this object. + * + * @see FluentBackoff + */ + public FluentBackoff withMaxRetries(int maxRetries) { + checkArgument(maxRetries >= 0, "maxRetries %s cannot be negative", maxRetries); + return new FluentBackoff( + exponent, initialBackoff, maxBackoff, maxCumulativeBackoff, maxRetries); + } + + public String toString() { + return MoreObjects.toStringHelper(FluentBackoff.class) + .add("exponent", exponent) + .add("initialBackoff", initialBackoff) + .add("maxBackoff", maxBackoff) + .add("maxRetries", maxRetries) + .add("maxCumulativeBackoff", maxCumulativeBackoff) + .toString(); + } + + private static class BackoffImpl implements BackOff { + + // Customization of this backoff. + private final FluentBackoff backoffConfig; + // Current state + private Duration currentCumulativeBackoff; + private int currentRetry; + + @Override + public void reset() { + currentRetry = 0; + currentCumulativeBackoff = Duration.ZERO; + } + + @Override + public long nextBackOffMillis() { + // Maximum number of retries reached. + if (currentRetry >= backoffConfig.maxRetries) { + return BackOff.STOP; + } + // Maximum cumulative backoff reached. + if (currentCumulativeBackoff.compareTo(backoffConfig.maxCumulativeBackoff) >= 0) { + return BackOff.STOP; + } + + double currentIntervalMillis = + Math.min( + backoffConfig.initialBackoff.getMillis() + * Math.pow(backoffConfig.exponent, currentRetry), + backoffConfig.maxBackoff.getMillis()); + double randomOffset = + (Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis; + long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset); + // Cap to limit on cumulative backoff + Duration remainingCumulative = + backoffConfig.maxCumulativeBackoff.minus(currentCumulativeBackoff); + nextBackoffMillis = Math.min(nextBackoffMillis, remainingCumulative.getMillis()); + + // Update state and return backoff. + currentCumulativeBackoff = currentCumulativeBackoff.plus(nextBackoffMillis); + currentRetry += 1; + return nextBackoffMillis; + } + + private BackoffImpl(FluentBackoff backoffConfig) { + this.backoffConfig = backoffConfig; + this.reset(); + } + + public String toString() { + return MoreObjects.toStringHelper(BackoffImpl.class) + .add("backoffConfig", backoffConfig) + .add("currentRetry", currentRetry) + .add("currentCumulativeBackoff", currentCumulativeBackoff) + .toString(); + } + } + + private FluentBackoff( + double exponent, Duration initialBackoff, Duration maxBackoff, Duration maxCumulativeBackoff, + int maxRetries) { + this.exponent = exponent; + this.initialBackoff = initialBackoff; + this.maxBackoff = maxBackoff; + this.maxRetries = maxRetries; + this.maxCumulativeBackoff = maxCumulativeBackoff; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 44a182e..41c372e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -66,6 +66,7 @@ import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +120,9 @@ public class GcsUtil { */ private static final int MAX_CONCURRENT_BATCHES = 256; + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + ///////////////////////////////////////////////////////////////////////////// /** Client for the GCS API. */ @@ -177,7 +181,7 @@ public class GcsUtil { // the request has strong global consistency. ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(getObject), - new AttemptBoundedExponentialBackOff(3, 200), + BACKOFF_FACTORY.backoff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); return ImmutableList.of(gcsPattern); @@ -216,7 +220,7 @@ public class GcsUtil { try { objects = ResilientOperation.retry( ResilientOperation.getGoogleRequestCallable(listObject), - new AttemptBoundedExponentialBackOff(3, 200), + BACKOFF_FACTORY.backoff(), RetryDeterminer.SOCKET_ERRORS, IOException.class); } catch (Exception e) { @@ -257,7 +261,10 @@ public class GcsUtil { * if the resource does not exist. */ public long fileSize(GcsPath path) throws IOException { - return fileSize(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT); + return fileSize( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); } /** @@ -335,7 +342,10 @@ public class GcsUtil { * be accessible otherwise the permissions exception will be propagated. */ public boolean bucketExists(GcsPath path) throws IOException { - return bucketExists(path, new AttemptBoundedExponentialBackOff(4, 200), Sleeper.DEFAULT); + return bucketExists( + path, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java index 519776a..6fac6dc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IntervalBoundedExponentialBackOff.java @@ -49,6 +49,7 @@ import com.google.api.client.util.BackOff; * * <p>Implementation is not thread-safe. */ +@Deprecated public class IntervalBoundedExponentialBackOff implements BackOff { public static final double DEFAULT_MULTIPLIER = 1.5; public static final double DEFAULT_RANDOMIZATION_FACTOR = 0.5; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java deleted file mode 100644 index 59e0fb7..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptAndTimeBoundedExponentialBackOffTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import com.google.api.client.util.BackOff; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Unit tests for {@link AttemptAndTimeBoundedExponentialBackOff}. */ -@RunWith(JUnit4.class) -public class AttemptAndTimeBoundedExponentialBackOffTest { - @Rule public ExpectedException exception = ExpectedException.none(); - @Rule public FastNanoClockAndSleeper fastClock = new FastNanoClockAndSleeper(); - - @Test - public void testUsingInvalidInitialInterval() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Initial interval must be greater than zero."); - new AttemptAndTimeBoundedExponentialBackOff(10, 0L, 1000L); - } - - @Test - public void testUsingInvalidTimeInterval() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Maximum total wait time must be greater than zero."); - new AttemptAndTimeBoundedExponentialBackOff(10, 2L, 0L); - } - - @Test - public void testUsingInvalidMaximumNumberOfRetries() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Maximum number of attempts must be greater than zero."); - new AttemptAndTimeBoundedExponentialBackOff(-1, 10L, 1000L); - } - - @Test - public void testThatFixedNumberOfAttemptsExits() throws Exception { - BackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, - 500L, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testThatResettingAllowsReuse() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, - 500, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - - backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 30, - 500, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - fastClock.sleep(2000L); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - } - - @Test - public void testThatResettingAttemptsAllowsReuse() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, - 500, - 1000, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testThatResettingAttemptsDoesNotAllowsReuse() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 30, - 500, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ATTEMPTS, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - fastClock.sleep(2000L); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testThatResettingTimerAllowsReuse() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 30, - 500, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - fastClock.sleep(2000L); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(561L), lessThan(1688L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(843L), lessThan(2531L))); - } - - @Test - public void testThatResettingTimerDoesNotAllowReuse() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, - 500, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.TIMER, - fastClock); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testTimeBound() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, 500L, 5L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock); - assertEquals(backOff.nextBackOffMillis(), 5L); - } - - @Test - public void testAtMaxAttempts() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, - 500L, - 1000L, - AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, - fastClock); - assertFalse(backOff.atMaxAttempts()); - backOff.nextBackOffMillis(); - assertFalse(backOff.atMaxAttempts()); - backOff.nextBackOffMillis(); - assertTrue(backOff.atMaxAttempts()); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testAtMaxTime() throws Exception { - AttemptBoundedExponentialBackOff backOff = - new AttemptAndTimeBoundedExponentialBackOff( - 3, 500L, 1L, AttemptAndTimeBoundedExponentialBackOff.ResetPolicy.ALL, fastClock); - fastClock.sleep(2); - assertTrue(backOff.atMaxAttempts()); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java deleted file mode 100644 index 3cfa961..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AttemptBoundedExponentialBackOffTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import com.google.api.client.util.BackOff; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Unit tests for {@link AttemptBoundedExponentialBackOff}. */ -@RunWith(JUnit4.class) -public class AttemptBoundedExponentialBackOffTest { - @Rule public ExpectedException exception = ExpectedException.none(); - - @Test - public void testUsingInvalidInitialInterval() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Initial interval must be greater than zero."); - new AttemptBoundedExponentialBackOff(10, 0L); - } - - @Test - public void testUsingInvalidMaximumNumberOfRetries() throws Exception { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Maximum number of attempts must be greater than zero."); - new AttemptBoundedExponentialBackOff(-1, 10L); - } - - @Test - public void testThatFixedNumberOfAttemptsExits() throws Exception { - BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testThatResettingAllowsReuse() throws Exception { - BackOff backOff = new AttemptBoundedExponentialBackOff(3, 500); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - backOff.reset(); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(249L), lessThan(751L))); - assertThat(backOff.nextBackOffMillis(), allOf(greaterThan(374L), lessThan(1126L))); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } - - @Test - public void testAtMaxAttempts() throws Exception { - AttemptBoundedExponentialBackOff backOff = new AttemptBoundedExponentialBackOff(3, 500); - assertFalse(backOff.atMaxAttempts()); - backOff.nextBackOffMillis(); - assertFalse(backOff.atMaxAttempts()); - backOff.nextBackOffMillis(); - assertTrue(backOff.atMaxAttempts()); - assertEquals(BackOff.STOP, backOff.nextBackOffMillis()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java new file mode 100644 index 0000000..20b03cf --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FluentBackoffTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.api.client.util.BackOff; +import java.io.IOException; +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link FluentBackoff}. + */ +@RunWith(JUnit4.class) +public class FluentBackoffTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + private final FluentBackoff defaultBackoff = FluentBackoff.DEFAULT; + + @Test + public void testInvalidExponent() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("exponent -2.0 must be greater than 0"); + defaultBackoff.withExponent(-2.0); + } + + @Test + public void testInvalidInitialBackoff() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("initialBackoff PT0S must be at least 1 millisecond"); + defaultBackoff.withInitialBackoff(Duration.ZERO); + } + + @Test + public void testInvalidMaxBackoff() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("maxBackoff PT0S must be at least 1 millisecond"); + defaultBackoff.withMaxBackoff(Duration.ZERO); + } + + @Test + public void testInvalidMaxRetries() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("maxRetries -1 cannot be negative"); + defaultBackoff.withMaxRetries(-1); + } + + @Test + public void testInvalidCumulativeBackoff() { + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("maxCumulativeBackoff PT-0.002S must be at least 1 millisecond"); + defaultBackoff.withMaxCumulativeBackoff(Duration.millis(-2)); + } + + /** + * Tests with bounded interval, custom exponent, and unlimited retries. + */ + @Test + public void testBoundedIntervalWithReset() throws Exception { + BackOff backOff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(500)) + .withMaxBackoff(Duration.standardSeconds(1)).backoff(); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + + // Reset, should go back to short times. + backOff.reset(); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(374L), lessThan(1126L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(500L), + lessThanOrEqualTo(1500L))); + + } + + /** + * Tests with bounded interval, custom exponent, limited retries, and a reset. + */ + @Test + public void testMaxRetriesWithReset() throws Exception { + BackOff backOff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(500)) + .withMaxRetries(1) + .backoff(); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); + assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); + assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); + assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); + + backOff.reset(); + assertThat(backOff.nextBackOffMillis(), allOf(greaterThanOrEqualTo(249L), lessThan(751L))); + assertThat(backOff.nextBackOffMillis(), equalTo(BackOff.STOP)); + } + + private static long countMaximumBackoff(BackOff backOff) throws IOException { + long cumulativeBackoffMillis = 0; + long currentBackoffMillis = backOff.nextBackOffMillis(); + while (currentBackoffMillis != BackOff.STOP) { + cumulativeBackoffMillis += currentBackoffMillis; + currentBackoffMillis = backOff.nextBackOffMillis(); + } + return cumulativeBackoffMillis; + } + + /** + * Tests with bounded interval, custom exponent, limited cumulative time, and a reset. + */ + @Test + public void testBoundedIntervalAndCumTimeWithReset() throws Exception { + BackOff backOff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(500)) + .withMaxBackoff(Duration.standardSeconds(1)) + .withMaxCumulativeBackoff(Duration.standardMinutes(1)).backoff(); + + assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); + + backOff.reset(); + assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); + // sanity check: should get 0 if we don't reset + assertThat(countMaximumBackoff(backOff), equalTo(0L)); + + backOff.reset(); + assertThat(countMaximumBackoff(backOff), equalTo(Duration.standardMinutes(1).getMillis())); + } + + /** + * Tests with bounded interval, custom exponent, limited cumulative time and retries. + */ + @Test + public void testBoundedIntervalAndCumTimeAndRetriesWithReset() throws Exception { + BackOff backOff = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(500)) + .withMaxBackoff(Duration.standardSeconds(1)) + .withMaxCumulativeBackoff(Duration.standardMinutes(1)) + .backoff(); + + long cumulativeBackoffMillis = 0; + long currentBackoffMillis = backOff.nextBackOffMillis(); + while (currentBackoffMillis != BackOff.STOP) { + cumulativeBackoffMillis += currentBackoffMillis; + currentBackoffMillis = backOff.nextBackOffMillis(); + } + assertThat(cumulativeBackoffMillis, equalTo(Duration.standardMinutes(1).getMillis())); + } + + @Test + public void testFluentBackoffToString() throws IOException { + FluentBackoff config = FluentBackoff.DEFAULT + .withExponent(3.4) + .withMaxRetries(4) + .withInitialBackoff(Duration.standardSeconds(3)) + .withMaxBackoff(Duration.standardHours(1)) + .withMaxCumulativeBackoff(Duration.standardDays(1)); + + assertEquals( + "FluentBackoff{exponent=3.4, initialBackoff=PT3S, maxBackoff=PT3600S," + + " maxRetries=4, maxCumulativeBackoff=PT86400S}", + config.toString()); + } + @Test + public void testBackoffImplToString() throws IOException { + FluentBackoff config = FluentBackoff.DEFAULT + .withExponent(3.4) + .withMaxRetries(4) + .withInitialBackoff(Duration.standardSeconds(3)) + .withMaxBackoff(Duration.standardHours(1)) + .withMaxCumulativeBackoff(Duration.standardDays(1)); + BackOff backOff = config.backoff(); + + assertEquals( + "BackoffImpl{backoffConfig=" + config.toString() + "," + + " currentRetry=0, currentCumulativeBackoff=PT0S}", + backOff.toString()); + + // backoff once, ignoring result + backOff.nextBackOffMillis(); + + // currentRetry is exact, we can test it. + assertThat(backOff.toString(), containsString("currentRetry=1")); + // currentCumulativeBackoff is not exact; we cannot even check that it's non-zero (randomness). + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java index 681b0aa..9504b4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GcsUtilTest.java @@ -365,7 +365,7 @@ public class GcsUtilTest { Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); - BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200); + BackOff mockBackOff = FluentBackoff.DEFAULT.withMaxRetries(2).backoff(); when(mockStorage.objects()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); @@ -376,7 +376,7 @@ public class GcsUtilTest { assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"), mockBackOff, new FastNanoClockAndSleeper())); - assertEquals(mockBackOff.nextBackOffMillis(), BackOff.STOP); + assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); } @Test @@ -390,7 +390,7 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200); + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); @@ -413,7 +413,7 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200); + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); GoogleJsonResponseException expectedException = googleJsonResponseException(HttpStatusCodes.STATUS_CODE_FORBIDDEN, "Waves hand mysteriously", "These aren't the buckets your looking for"); @@ -438,7 +438,7 @@ public class GcsUtilTest { Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); - BackOff mockBackOff = new AttemptBoundedExponentialBackOff(3, 200); + BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); when(mockStorage.buckets()).thenReturn(mockStorageObjects); when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f485666/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 304dc82..6dde581 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -24,9 +24,6 @@ import static com.google.common.base.Preconditions.checkState; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.api.client.json.JsonFactory; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -69,7 +66,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -108,7 +104,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.FileIOChannelFactory; import org.apache.beam.sdk.util.GcsIOChannelFactory; import org.apache.beam.sdk.util.GcsUtil; @@ -129,6 +124,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,7 +290,7 @@ public class BigQueryIO { * * <p>If the project id is omitted, the default project id is used. */ - public static TableReference parseTableSpec(String tableSpec) { + static TableReference parseTableSpec(String tableSpec) { Matcher match = TABLE_SPEC.matcher(tableSpec); if (!match.matches()) { throw new IllegalArgumentException( @@ -953,14 +949,14 @@ public class BigQueryIO { * ... */ private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> { - // The maximum number of attempts to verify temp files. - private static final int MAX_FILES_VERIFY_ATTEMPTS = 10; + // The maximum number of retries to verify temp files. + private static final int MAX_FILES_VERIFY_RETRIES = 9; // The maximum number of retries to poll a BigQuery job. protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; // The initial backoff for verifying temp files. - private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1); protected final String jobIdToken; protected final String extractDestinationDir; @@ -1055,14 +1051,7 @@ public class BigQueryIO { }}; List<BoundedSource<TableRow>> avroSources = Lists.newArrayList(); - BackOff backoff = new AttemptBoundedExponentialBackOff( - MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS); for (String fileName : files) { - while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) { - if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) { - break; - } - } avroSources.add(new TransformingSource<>( AvroSource.from(fileName), function, getDefaultOutputCoder())); }