damondouglas commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1490153000


##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the
+ * actual work of throttling by holding the watermark and performing a {@link
+ * DoFn.ProcessContinuation#withResumeDelay(Duration)} of {@link 
Rate#getInterval()} if there are
+ * remaining elements to process. Finally, the transform applies the original 
input's {@link
+ * WindowingStrategy} to the returning {@code PCollection<RequestT>}.
+ */
+public class Throttle<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<RequestT>> {
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  /**
+   * Instantiates a {@link Throttle} with the maximumRate of {@link Rate} and 
without collecting
+   * metrics.
+   */
+  static <RequestT> Throttle<RequestT> of(Rate maximumRate) {
+    return new 
Throttle<>(Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link Throttle} with metrics collection turned on. */
+  Throttle<RequestT> withMetricsCollected() {
+    return new 
Throttle<>(configuration.toBuilder().setCollectMetrics(true).build());
+  }
+
+  /**
+   * Configures {@link Throttle} with additional parameters for use in 
streaming contexts. Calls
+   * {@link #withStreamingConfiguration(long, Duration)} with {@code null} 
{@link Duration}
+   * argument. See {@link #withStreamingConfiguration(long, Duration)} for 
more details.
+   */
+  Throttle<RequestT> withStreamingConfiguration(long bufferingSize) {
+    return withStreamingConfiguration(bufferingSize, null);
+  }
+
+  /**
+   * Configures {@link Throttle} for use with {@link 
PCollection.IsBounded#UNBOUNDED} {@link
+   * PCollection}s. In a streaming context, {@link Throttle} applies the input 
{@code
+   * PCollection<RequestT>} to {@link GroupIntoBatches} prior to throttling. 
Therefore, it requires
+   * additional configuration for how to handle streaming contexts.
+   */
+  Throttle<RequestT> withStreamingConfiguration(
+      long bufferingSize, @Nullable Duration maxBufferingDuration) {
+    return new Throttle<>(
+        configuration
+            .toBuilder()
+            .setStreamBufferingSize(bufferingSize)
+            .setStreamMaxBufferingDuration(maxBufferingDuration)
+            .build());
+  }
+
+  private final Configuration configuration;
+
+  private Throttle(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public PCollection<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, 
Iterable<RequestT>>>>
+        groupingTransform = GroupByKey.create();

Review Comment:
   Tests do not work in batch mode as it relies on event timers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to