damondouglas commented on code in PR #29401: URL: https://github.com/apache/beam/pull/29401#discussion_r1398681092
########## sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithExternalResource.java: ########## @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.requestresponse; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Optional; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteSource; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Throttles a {@link T} {@link PCollection} using an external resource. + * + * <p>{@link ThrottleWithExternalResource} makes use of {@link PeriodicImpulse} as it needs to + * coordinate three {@link PTransform}s concurrently. Usage of {@link ThrottleWithExternalResource} + * should consider the impact of {@link PeriodicImpulse} on the pipeline. + * + * <p>Usage of {@link ThrottleWithExternalResource} is completely optional and serves as one of many + * methods by {@link RequestResponseIO} to protect against API overuse. Usage should not depend on + * {@link ThrottleWithExternalResource} alone to achieve API overuse prevention for several reasons. + * The underlying external resource may not scale at all or as fast as a Beam Runner. The external + * resource itself may be an API with its own quota that {@link ThrottleWithExternalResource} does + * not consider. + * + * <p>{@link ThrottleWithExternalResource} makes use of several {@link Caller}s that work together + * to achieve its aim of throttling a {@link T} {@link PCollection}. A {@link RefresherT} is a + * {@link Caller} that takes an {@link Instant} and refreshes a shared {@link Quota}. An {@link + * EnqueuerT} enqueues a {@link T} element while a {@link DequeuerT} dequeues said element when the + * {@link ReporterT} reports that the stored {@link Quota#getNumRequests} is >0. Finally, a {@link + * DecrementerT} decrements from the shared {@link Quota} value, additionally reporting the value + * after performing the action. + * + * <p>{@link ThrottleWithExternalResource} instantiates and applies two {@link Call} {@link + * PTransform}s using the aforementioned {@link Caller}s {@link RefresherT} and {@link EnqueuerT}. + * {@link ThrottleWithExternalResource} calls {@link ReporterT}, {@link DequeuerT}, {@link + * DecrementerT} within its {@link DoFn}, emitting the dequeued {@link T} when the {@link ReporterT} + * reports a value >0. As an additional safety check, the DoFn checks whether the {@link Quota} + * value after {@link DecrementerT}'s action is <0, signaling that multiple workers are attempting + * the same too fast and therefore exists the DoFn allowing for the next refresh. + * + * <p>{@link ThrottleWithExternalResource} flattens errors emitted from {@link EnqueuerT}, {@link + * RefresherT}, and its own {@link DoFn} into a single {@link ApiIOError} {@link PCollection} that + * is encapsulated, with a {@link T} {@link PCollection} output into a {@link Call.Result}. + */ +class ThrottleWithExternalResource< + @NonNull T, + ReporterT extends Caller<@NonNull String, @NonNull Long> & SetupTeardown, + EnqueuerT extends Caller<@NonNull T, Void> & SetupTeardown, + DequeuerT extends Caller<@NonNull Instant, @NonNull T> & SetupTeardown, + DecrementerT extends Caller<@NonNull Instant, @NonNull Long> & SetupTeardown, + RefresherT extends Caller<@NonNull Instant, Void> & SetupTeardown> + extends PTransform<@NonNull PCollection<@NonNull T>, Call.@NonNull Result<@NonNull T>> { + + /** + * Instantiate a {@link ThrottleWithExternalResource} using a {@link RedisClient}. + * + * <p><a href="https://redis.io">Redis</a> is designed for multiple workloads, simultaneously + * reading and writing to a shared instance. See <a + * href="https://redis.io/docs/get-started/faq/">Redis FAQ</a> for more information on important + * considerations when using Redis as {@link ThrottleWithExternalResource}'s external resource. + */ + static <@NonNull T> + ThrottleWithExternalResource< + @NonNull T, + RedisReporter, + RedisEnqueuer<@NonNull T>, + RedisDequeuer<@NonNull T>, + RedisDecrementer, + RedisRefresher> + usingRedis( + URI uri, + String quotaIdentifier, + String queueKey, + Quota quota, + Coder<@NonNull T> coder) + throws Coder.NonDeterministicException { + return new ThrottleWithExternalResource< + @NonNull T, + RedisReporter, + RedisEnqueuer<@NonNull T>, + RedisDequeuer<@NonNull T>, + RedisDecrementer, + RedisRefresher>( + quota, + quotaIdentifier, + coder, + new RedisReporter(uri), + new RedisEnqueuer<>(uri, queueKey, coder), + new RedisDequeuer<>(uri, coder, queueKey), + new RedisDecrementer(uri, queueKey), + new RedisRefresher(uri, quota, quotaIdentifier)); + } + + private static final Duration THROTTLE_INTERVAL = Duration.standardSeconds(1L); + + private final @NonNull Quota quota; + private final @NonNull String quotaIdentifier; + private final @NonNull Coder<@NonNull T> coder; + private final @NonNull ReporterT reporterT; + private final @NonNull EnqueuerT enqueuerT; + private final @NonNull DequeuerT dequeuerT; + private final @NonNull DecrementerT decrementerT; + private final @NonNull RefresherT refresherT; + + ThrottleWithExternalResource( + @NonNull Quota quota, + @NonNull String quotaIdentifier, + @NonNull Coder<@NonNull T> coder, + @NonNull ReporterT reporterT, + @NonNull EnqueuerT enqueuerT, + @NonNull DequeuerT dequeuerT, + @NonNull DecrementerT decrementerT, + @NonNull RefresherT refresherT) + throws Coder.NonDeterministicException { + this.quotaIdentifier = quotaIdentifier; + this.reporterT = reporterT; + coder.verifyDeterministic(); + checkArgument(!quotaIdentifier.isEmpty()); + this.quota = quota; + this.coder = coder; + this.enqueuerT = enqueuerT; + this.dequeuerT = dequeuerT; + this.decrementerT = decrementerT; + this.refresherT = refresherT; + } + + @Override + public Call.@NonNull Result<@NonNull T> expand(PCollection<@NonNull T> input) { + Pipeline pipeline = input.getPipeline(); + + // Refresh known quota to control the throttle rate. + Call.Result<Void> refreshResult = + pipeline + .apply("quota/impulse", PeriodicImpulse.create().withInterval(quota.getInterval())) Review Comment: I personally think these timings will be fine. This is not the only thing preventing API overuse (as commented in the code comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
