http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index d6464dd..16bb1b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -32,9 +32,6 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.auto.value.AutoValue; @@ -89,8 +86,11 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index ef51650..b41490f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -67,7 +67,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceIm import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.apache.beam.sdk.util.BackOffAdapter; +import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.util.Transport; @@ -133,7 +134,8 @@ public class BigQueryServicesImplTest { Sleeper sleeper = new FastNanoClockAndSleeper(); JobServiceImpl.startJob( - testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); + testJob, new ApiErrorExtractor(), bigquery, sleeper, + BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff())); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); @@ -157,7 +159,8 @@ public class BigQueryServicesImplTest { Sleeper sleeper = new FastNanoClockAndSleeper(); JobServiceImpl.startJob( - testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); + testJob, new ApiErrorExtractor(), bigquery, sleeper, + BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff())); verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); @@ -185,7 +188,8 @@ public class BigQueryServicesImplTest { Sleeper sleeper = new FastNanoClockAndSleeper(); JobServiceImpl.startJob( - testJob, new ApiErrorExtractor(), bigquery, sleeper, FluentBackoff.DEFAULT.backoff()); + testJob, new ApiErrorExtractor(), bigquery, sleeper, + BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff())); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); @@ -500,7 +504,8 @@ public class BigQueryServicesImplTest { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); @@ -536,7 +541,8 @@ public class BigQueryServicesImplTest { DatasetServiceImpl dataService = new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); - dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, insertIds, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); verify(response, times(2)).getStatusCode(); verify(response, times(2)).getContent(); verify(response, times(2)).getContentType(); @@ -577,7 +583,8 @@ public class BigQueryServicesImplTest { // Expect it to fail. try { - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); fail(); } catch (IOException e) { assertThat(e, instanceOf(IOException.class)); @@ -617,7 +624,8 @@ public class BigQueryServicesImplTest { new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create()); try { - dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper()); + dataService.insertAll(ref, rows, null, + BackOffAdapter.toGcpBackOff(TEST_BACKOFF.backoff()), new MockSleeper()); fail(); } catch (RuntimeException e) { verify(response, times(1)).getStatusCode(); http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index ed6a0be..2045bb7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -66,6 +66,7 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.util.Transport; @@ -187,11 +188,12 @@ class FakeJobService implements JobService, Serializable { public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = - FluentBackoff.DEFAULT - .withMaxRetries(maxAttempts) - .withInitialBackoff(Duration.millis(10)) - .withMaxBackoff(Duration.standardSeconds(1)) - .backoff(); + BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT + .withMaxRetries(maxAttempts) + .withInitialBackoff(Duration.millis(10)) + .withMaxBackoff(Duration.standardSeconds(1)) + .backoff()); Sleeper sleeper = Sleeper.DEFAULT; try { do { http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index 85c27dd..dc91638 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -26,9 +26,6 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.util.BackOff; -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; import com.google.auth.Credentials; import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -57,8 +54,11 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.RetryHttpRequestInitializer; +import org.apache.beam.sdk.util.Sleeper; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java index 2bdfffa..6dc810b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcher.java @@ -49,6 +49,7 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.testing.SerializableMatcher; +import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Transport; import org.hamcrest.Description; @@ -118,7 +119,8 @@ public class BigqueryMatcher extends TypeSafeMatcher<PipelineResult> queryContent.setQuery(query); response = queryWithRetries( - bigqueryClient, queryContent, Sleeper.DEFAULT, BACKOFF_FACTORY.backoff()); + bigqueryClient, queryContent, Sleeper.DEFAULT, + BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff())); } catch (IOException | InterruptedException e) { if (e instanceof InterruptedIOException) { Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/beam/blob/f6a0c674/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java index 5fcdce9..2b03909 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/testing/BigqueryMatcherTest.java @@ -36,7 +36,8 @@ import com.google.common.collect.Lists; import java.io.IOException; import java.math.BigInteger; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; +import org.apache.beam.sdk.util.BackOffAdapter; +import org.apache.beam.sdk.util.FastNanoClockAndSleeper; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -131,7 +132,7 @@ public class BigqueryMatcherTest { mockBigqueryClient, new QueryRequest(), fastClock, - BigqueryMatcher.BACKOFF_FACTORY.backoff()); + BackOffAdapter.toGcpBackOff(BigqueryMatcher.BACKOFF_FACTORY.backoff())); } finally { verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) .query(eq(projectId), eq(new QueryRequest())); @@ -151,7 +152,7 @@ public class BigqueryMatcherTest { mockBigqueryClient, new QueryRequest(), fastClock, - BigqueryMatcher.BACKOFF_FACTORY.backoff()); + BackOffAdapter.toGcpBackOff(BigqueryMatcher.BACKOFF_FACTORY.backoff())); } finally { verify(mockJobs, atLeast(BigqueryMatcher.MAX_QUERY_RETRIES)) .query(eq(projectId), eq(new QueryRequest()));
