johnjcasey commented on code in PR #27681: URL: https://github.com/apache/beam/pull/27681#discussion_r1280696718
########## sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java: ########## @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link GoogleAdsV14} provides an API to read Google Ads API v14 reports. + * + * <p>The Google Ads API does not use service account credentials in the same way as Google Cloud + * Platform APIs do. Service account credentials are typically only used to delegate (using + * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 + * desktop flow may be preferable over domain wide delegation. Defaults for OAuth 2.0 credentials, + * refresh token and developer token can be provided using the following flags: + * + * <pre> + * --googleAdsClientId=your-client-id + * --googleAdsClientSecret=your-client-secret + * --googleAdsRefreshToken=your-refresh-token + * --googleAdsDeveloperToken=your-developer-token + * </pre> + * + * <p>Use {@link GoogleAdsV14#read()} to read a bounded {@link PCollection} of {@link GoogleAdsRow} + * from a query using {@link Read#withQuery(String)} and one or a few customer IDs using either + * {@link Read#withCustomerId(Long)} or {@link Read#withCustomerIds(List)}. Alternatively, use + * {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a {@link PCollection} of {@link SearchGoogleAdsStreamRequest}. + * + * <p>For example, using {@link GoogleAdsV14#read()}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<GoogleAdsRow> rows = + * p.apply( + * GoogleAdsIO.v14() + * .read() + * .withCustomerId(1234567890l) + * .withQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign")); + * p.run(); + * }</pre> + * + * <p>Alternatively, using {@link GoogleAdsV14#readAll()} to execute requests from a {@link + * PCollection} of {@link SearchGoogleAdsStreamRequest}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<SearchGoogleAdsStreamRequest> requests = + * p.apply( + * Create.of( + * ImmutableList.of( + * SearchGoogleAdsStreamRequest.newBuilder() + * .setCustomerId(Long.toString(1234567890l)) + * .setQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign") + * .build()))); + * PCollection<GoogleAdsRow> rows = requests.apply(GoogleAdsIO.v14().readAll()); + * p.run(); + * }</pre> + * + * <h2>Client-side rate limiting</h2> + * + * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a + * default rate limiting policy is provided to stay well under the rate limit for the Google Ads + * API, but this limit is only local to a single worker and operates without any knowledge of other + * applications using the same developer token for any customer ID. The Google Ads API enforces + * global limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all applications + * using the same developer token. Users of these transforms are strongly advised to implement their + * own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact with a shared rate + * limiting service for any production workloads. + * + * @see GoogleAdsIO#v14() + * @see GoogleAdsOptions + * @see <a href="https://developers.google.com/google-ads/api/docs/best-practices/overview">Best + * Practices in the Google Ads documentation</a> + */ +public class GoogleAdsV14 { Review Comment: By convention, we wouldn't split out our IOs by API version, and if we wanted to update to some new API version, we would do that in a way that was ideally invisible to the users. I'm not familiar enough with the Google Ads API to know if that is reasonable or not, but I thought I would bring it up here. ########## sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java: ########## @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link GoogleAdsV14} provides an API to read Google Ads API v14 reports. + * + * <p>The Google Ads API does not use service account credentials in the same way as Google Cloud + * Platform APIs do. Service account credentials are typically only used to delegate (using + * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 + * desktop flow may be preferable over domain wide delegation. Defaults for OAuth 2.0 credentials, + * refresh token and developer token can be provided using the following flags: + * + * <pre> + * --googleAdsClientId=your-client-id + * --googleAdsClientSecret=your-client-secret + * --googleAdsRefreshToken=your-refresh-token + * --googleAdsDeveloperToken=your-developer-token + * </pre> + * + * <p>Use {@link GoogleAdsV14#read()} to read a bounded {@link PCollection} of {@link GoogleAdsRow} + * from a query using {@link Read#withQuery(String)} and one or a few customer IDs using either + * {@link Read#withCustomerId(Long)} or {@link Read#withCustomerIds(List)}. Alternatively, use + * {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a {@link PCollection} of {@link SearchGoogleAdsStreamRequest}. + * + * <p>For example, using {@link GoogleAdsV14#read()}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<GoogleAdsRow> rows = + * p.apply( + * GoogleAdsIO.v14() + * .read() + * .withCustomerId(1234567890l) + * .withQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign")); + * p.run(); + * }</pre> + * + * <p>Alternatively, using {@link GoogleAdsV14#readAll()} to execute requests from a {@link + * PCollection} of {@link SearchGoogleAdsStreamRequest}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<SearchGoogleAdsStreamRequest> requests = + * p.apply( + * Create.of( + * ImmutableList.of( + * SearchGoogleAdsStreamRequest.newBuilder() + * .setCustomerId(Long.toString(1234567890l)) + * .setQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign") + * .build()))); + * PCollection<GoogleAdsRow> rows = requests.apply(GoogleAdsIO.v14().readAll()); + * p.run(); + * }</pre> + * + * <h2>Client-side rate limiting</h2> + * + * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a + * default rate limiting policy is provided to stay well under the rate limit for the Google Ads + * API, but this limit is only local to a single worker and operates without any knowledge of other + * applications using the same developer token for any customer ID. The Google Ads API enforces + * global limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all applications + * using the same developer token. Users of these transforms are strongly advised to implement their + * own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact with a shared rate + * limiting service for any production workloads. + * + * @see GoogleAdsIO#v14() + * @see GoogleAdsOptions + * @see <a href="https://developers.google.com/google-ads/api/docs/best-practices/overview">Best + * Practices in the Google Ads documentation</a> + */ +public class GoogleAdsV14 { + static final GoogleAdsV14 INSTANCE = new GoogleAdsV14(); + + private GoogleAdsV14() {} + + public Read read() { + return new AutoValue_GoogleAdsV14_Read.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + public ReadAll readAll() { + return new AutoValue_GoogleAdsV14_ReadAll.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + /** + * A {@link PTransform} that reads the results of a Google Ads query as {@link GoogleAdsRow} + * objects. + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<GoogleAdsRow>> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract @Nullable List<Long> getCustomerIds(); + + abstract @Nullable String getQuery(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setCustomerIds(List<Long> customerId); + + abstract Builder setQuery(String query); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract Read build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link Read} transform with the specified developer token. + * @see GoogleAdsClient + */ + public Read withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link Read} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public Read withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer IDs to query. + * + * @param customerIds + * @return A new {@link Read} transform with the specified customer IDs to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerIds(List<Long> customerIds) { + checkArgumentNotNull(customerIds, "customerIds cannot be null"); + checkArgument(customerIds.size() > 0, "customerIds cannot be empty"); + + return toBuilder().setCustomerIds(ImmutableList.copyOf(customerIds)).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer ID to query. + * + * @param customerId + * @return A new {@link Read} transform with the specified customer ID to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerId(Long customerId) { + checkArgumentNotNull(customerId, "customerId cannot be null"); + + return withCustomerIds(ImmutableList.of(customerId)); + } + + /** + * Creates and returns a new {@link Read} transform with the specified query. The query will be + * executed for each customer ID. + * + * @param query + * @return A new {@link Read} transform with the specified query. + * @see SearchGoogleAdsStreamRequest + * @see #withCustomerId(Long) + * @see #withCustomerIds(List) + */ + public Read withQuery(String query) { + checkArgumentNotNull(query, "query cannot be null"); + checkArgument(!query.isEmpty(), "query cannot be empty"); + + return toBuilder().setQuery(query).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified client factory. A {@link + * GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service clients. + * The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes unless the + * construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link Read} transform with the specified client factory. + * @see GoogleAdsClient + */ + public Read withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link Read} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection<GoogleAdsRow> expand(PBegin input) { + String query = getQuery(); + List<Long> customerIds = getCustomerIds(); Review Comment: This pattern is limiting. Because the customer IDs are specified at pipeline construction time, they cannot be dynamic. Its likely worth reworking this IO to instead consume a PCollection of those IDs, so that they can be dynamically generated by some upstream source. ########## sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java: ########## @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link GoogleAdsV14} provides an API to read Google Ads API v14 reports. + * + * <p>The Google Ads API does not use service account credentials in the same way as Google Cloud + * Platform APIs do. Service account credentials are typically only used to delegate (using + * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 + * desktop flow may be preferable over domain wide delegation. Defaults for OAuth 2.0 credentials, + * refresh token and developer token can be provided using the following flags: + * + * <pre> + * --googleAdsClientId=your-client-id + * --googleAdsClientSecret=your-client-secret + * --googleAdsRefreshToken=your-refresh-token + * --googleAdsDeveloperToken=your-developer-token + * </pre> + * + * <p>Use {@link GoogleAdsV14#read()} to read a bounded {@link PCollection} of {@link GoogleAdsRow} + * from a query using {@link Read#withQuery(String)} and one or a few customer IDs using either + * {@link Read#withCustomerId(Long)} or {@link Read#withCustomerIds(List)}. Alternatively, use + * {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a {@link PCollection} of {@link SearchGoogleAdsStreamRequest}. + * + * <p>For example, using {@link GoogleAdsV14#read()}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<GoogleAdsRow> rows = + * p.apply( + * GoogleAdsIO.v14() + * .read() + * .withCustomerId(1234567890l) + * .withQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign")); + * p.run(); + * }</pre> + * + * <p>Alternatively, using {@link GoogleAdsV14#readAll()} to execute requests from a {@link + * PCollection} of {@link SearchGoogleAdsStreamRequest}: + * + * <pre>{@code + * Pipeline p = Pipeline.create(); + * PCollection<SearchGoogleAdsStreamRequest> requests = + * p.apply( + * Create.of( + * ImmutableList.of( + * SearchGoogleAdsStreamRequest.newBuilder() + * .setCustomerId(Long.toString(1234567890l)) + * .setQuery( + * "SELECT" + * + "campaign.id," + * + "campaign.name," + * + "campaign.status" + * + "FROM campaign") + * .build()))); + * PCollection<GoogleAdsRow> rows = requests.apply(GoogleAdsIO.v14().readAll()); + * p.run(); + * }</pre> + * + * <h2>Client-side rate limiting</h2> + * + * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a + * default rate limiting policy is provided to stay well under the rate limit for the Google Ads + * API, but this limit is only local to a single worker and operates without any knowledge of other + * applications using the same developer token for any customer ID. The Google Ads API enforces + * global limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all applications + * using the same developer token. Users of these transforms are strongly advised to implement their + * own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact with a shared rate + * limiting service for any production workloads. + * + * @see GoogleAdsIO#v14() + * @see GoogleAdsOptions + * @see <a href="https://developers.google.com/google-ads/api/docs/best-practices/overview">Best + * Practices in the Google Ads documentation</a> + */ +public class GoogleAdsV14 { + static final GoogleAdsV14 INSTANCE = new GoogleAdsV14(); + + private GoogleAdsV14() {} + + public Read read() { + return new AutoValue_GoogleAdsV14_Read.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + public ReadAll readAll() { + return new AutoValue_GoogleAdsV14_ReadAll.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + /** + * A {@link PTransform} that reads the results of a Google Ads query as {@link GoogleAdsRow} + * objects. + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<GoogleAdsRow>> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract @Nullable List<Long> getCustomerIds(); + + abstract @Nullable String getQuery(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setCustomerIds(List<Long> customerId); + + abstract Builder setQuery(String query); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract Read build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link Read} transform with the specified developer token. + * @see GoogleAdsClient + */ + public Read withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link Read} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public Read withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer IDs to query. + * + * @param customerIds + * @return A new {@link Read} transform with the specified customer IDs to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerIds(List<Long> customerIds) { + checkArgumentNotNull(customerIds, "customerIds cannot be null"); + checkArgument(customerIds.size() > 0, "customerIds cannot be empty"); + + return toBuilder().setCustomerIds(ImmutableList.copyOf(customerIds)).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer ID to query. + * + * @param customerId + * @return A new {@link Read} transform with the specified customer ID to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerId(Long customerId) { + checkArgumentNotNull(customerId, "customerId cannot be null"); + + return withCustomerIds(ImmutableList.of(customerId)); + } + + /** + * Creates and returns a new {@link Read} transform with the specified query. The query will be + * executed for each customer ID. + * + * @param query + * @return A new {@link Read} transform with the specified query. + * @see SearchGoogleAdsStreamRequest + * @see #withCustomerId(Long) + * @see #withCustomerIds(List) + */ + public Read withQuery(String query) { + checkArgumentNotNull(query, "query cannot be null"); + checkArgument(!query.isEmpty(), "query cannot be empty"); + + return toBuilder().setQuery(query).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified client factory. A {@link + * GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service clients. + * The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes unless the + * construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link Read} transform with the specified client factory. + * @see GoogleAdsClient + */ + public Read withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link Read} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection<GoogleAdsRow> expand(PBegin input) { + String query = getQuery(); + List<Long> customerIds = getCustomerIds(); + checkArgumentNotNull(query, "withQuery() is required"); + checkArgumentNotNull(customerIds, "either withCustomerId() or withCustomerIds() is required"); + + return input + .apply(Create.of(customerIds)) + .apply( + MapElements.into(TypeDescriptor.of(SearchGoogleAdsStreamRequest.class)) + .via( + customerId -> + SearchGoogleAdsStreamRequest.newBuilder() + .setCustomerId(Long.toString(customerId)) + .setQuery(query) + .build())) + .apply( + INSTANCE + .readAll() + .withDeveloperToken(getDeveloperToken()) + .withLoginCustomerId(getLoginCustomerId()) + .withGoogleAdsClientFactory(getGoogleAdsClientFactory()) + .withRateLimitPolicy(getRateLimitPolicyFactory())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotDefault( + DisplayData.item("customerIds", String.valueOf(getCustomerIds())) + .withLabel("Customer IDs"), + "null") + .addIfNotNull(DisplayData.item("query", String.valueOf(getQuery())).withLabel("Query")); + } + } + + /** + * A {@link PTransform} that reads the results of many {@link SearchGoogleAdsStreamRequest} + * objects as {@link GoogleAdsRow} objects. * + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class ReadAll + extends PTransform<PCollection<SearchGoogleAdsStreamRequest>, PCollection<GoogleAdsRow>> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract ReadAll build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link ReadAll} transform with the specified developer token. + * @see GoogleAdsClient + */ + public ReadAll withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link ReadAll} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public ReadAll withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified client factory. A + * {@link GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service + * clients. The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes + * unless the construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link ReadAll} transform with the specified client factory. + * @see GoogleAdsClient + */ + public ReadAll withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link ReadAll} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public ReadAll withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection<GoogleAdsRow> expand(PCollection<SearchGoogleAdsStreamRequest> input) { + GoogleAdsOptions options = input.getPipeline().getOptions().as(GoogleAdsOptions.class); + + checkArgument( + options.getGoogleAdsDeveloperToken() != null || getDeveloperToken() != null, + "either --googleAdsDeveloperToken or .withDeveloperToken() is required"); + + return input.apply(ParDo.of(new ReadAllFn(this))); + } + + /** + * A {@link DoFn} that reads reports from Google Ads for each query using the {@code + * SearchStream} method. + */ + @VisibleForTesting + static class ReadAllFn extends DoFn<SearchGoogleAdsStreamRequest, GoogleAdsRow> { + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BACKOFF = + FluentBackoff.DEFAULT + .withExponent(2.0) + .withInitialBackoff(Duration.standardSeconds(30)) + .withMaxRetries(MAX_RETRIES); + + @VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT; + + private final GoogleAdsV14.ReadAll spec; + + private transient @Nullable GoogleAdsClient googleAdsClient; + private transient @Nullable GoogleAdsServiceClient googleAdsServiceClient; + private transient @Nullable RateLimitPolicy rateLimitPolicy; + + ReadAllFn(GoogleAdsV14.ReadAll spec) { + this.spec = spec; + } + + @Setup + public void setup(PipelineOptions options) { + GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class); + + googleAdsClient = + spec.getGoogleAdsClientFactory() + .newGoogleAdsClient( + adsOptions, spec.getDeveloperToken(), null, spec.getLoginCustomerId()); + googleAdsServiceClient = googleAdsClient.getVersion14().createGoogleAdsServiceClient(); + rateLimitPolicy = spec.getRateLimitPolicyFactory().getRateLimitPolicy(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException, InterruptedException { + GoogleAdsClient googleAdsClient = checkStateNotNull(this.googleAdsClient); + GoogleAdsServiceClient googleAdsServiceClient = + checkStateNotNull(this.googleAdsServiceClient); + RateLimitPolicy rateLimitPolicy = checkStateNotNull(this.rateLimitPolicy); + + BackOff backoff = BACKOFF.backoff(); + BackOff nextBackoff = backoff; + GoogleAdsException lastException = null; + + SearchGoogleAdsStreamRequest request = c.element(); + String developerToken = googleAdsClient.getDeveloperToken(); + String customerId = request.getCustomerId(); + + do { + rateLimitPolicy.onBeforeRequest(developerToken, customerId, request); Review Comment: What are the consequences for breaking the rate limit? I ask, because the default rate limiter won't scale with parallel reads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
