damccorm commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1490006653


##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the
+ * actual work of throttling by holding the watermark and performing a {@link
+ * DoFn.ProcessContinuation#withResumeDelay(Duration)} of {@link 
Rate#getInterval()} if there are
+ * remaining elements to process. Finally, the transform applies the original 
input's {@link
+ * WindowingStrategy} to the returning {@code PCollection<RequestT>}.
+ */
+public class Throttle<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<RequestT>> {
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  /**
+   * Instantiates a {@link Throttle} with the maximumRate of {@link Rate} and 
without collecting
+   * metrics.
+   */
+  static <RequestT> Throttle<RequestT> of(Rate maximumRate) {
+    return new 
Throttle<>(Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link Throttle} with metrics collection turned on. */
+  Throttle<RequestT> withMetricsCollected() {
+    return new 
Throttle<>(configuration.toBuilder().setCollectMetrics(true).build());
+  }
+
+  /**
+   * Configures {@link Throttle} with additional parameters for use in 
streaming contexts. Calls
+   * {@link #withStreamingConfiguration(long, Duration)} with {@code null} 
{@link Duration}
+   * argument. See {@link #withStreamingConfiguration(long, Duration)} for 
more details.
+   */
+  Throttle<RequestT> withStreamingConfiguration(long bufferingSize) {
+    return withStreamingConfiguration(bufferingSize, null);
+  }
+
+  /**
+   * Configures {@link Throttle} for use with {@link 
PCollection.IsBounded#UNBOUNDED} {@link
+   * PCollection}s. In a streaming context, {@link Throttle} applies the input 
{@code
+   * PCollection<RequestT>} to {@link GroupIntoBatches} prior to throttling. 
Therefore, it requires
+   * additional configuration for how to handle streaming contexts.
+   */
+  Throttle<RequestT> withStreamingConfiguration(
+      long bufferingSize, @Nullable Duration maxBufferingDuration) {
+    return new Throttle<>(
+        configuration
+            .toBuilder()
+            .setStreamBufferingSize(bufferingSize)
+            .setStreamMaxBufferingDuration(maxBufferingDuration)
+            .build());
+  }
+
+  private final Configuration configuration;
+
+  private Throttle(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public PCollection<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, 
Iterable<RequestT>>>>
+        groupingTransform = GroupByKey.create();

Review Comment:
   Is there a reason we can't just always use GroupIntoBatches? I think it will 
tend to perform better because it won't require the whole previous stage to 
complete before we start sending requests



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the
+ * actual work of throttling by holding the watermark and performing a {@link
+ * DoFn.ProcessContinuation#withResumeDelay(Duration)} of {@link 
Rate#getInterval()} if there are
+ * remaining elements to process. Finally, the transform applies the original 
input's {@link
+ * WindowingStrategy} to the returning {@code PCollection<RequestT>}.
+ */
+public class Throttle<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<RequestT>> {
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  /**
+   * Instantiates a {@link Throttle} with the maximumRate of {@link Rate} and 
without collecting
+   * metrics.
+   */
+  static <RequestT> Throttle<RequestT> of(Rate maximumRate) {
+    return new 
Throttle<>(Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link Throttle} with metrics collection turned on. */
+  Throttle<RequestT> withMetricsCollected() {
+    return new 
Throttle<>(configuration.toBuilder().setCollectMetrics(true).build());
+  }
+
+  /**
+   * Configures {@link Throttle} with additional parameters for use in 
streaming contexts. Calls
+   * {@link #withStreamingConfiguration(long, Duration)} with {@code null} 
{@link Duration}
+   * argument. See {@link #withStreamingConfiguration(long, Duration)} for 
more details.
+   */
+  Throttle<RequestT> withStreamingConfiguration(long bufferingSize) {
+    return withStreamingConfiguration(bufferingSize, null);
+  }
+
+  /**
+   * Configures {@link Throttle} for use with {@link 
PCollection.IsBounded#UNBOUNDED} {@link
+   * PCollection}s. In a streaming context, {@link Throttle} applies the input 
{@code
+   * PCollection<RequestT>} to {@link GroupIntoBatches} prior to throttling. 
Therefore, it requires
+   * additional configuration for how to handle streaming contexts.
+   */
+  Throttle<RequestT> withStreamingConfiguration(
+      long bufferingSize, @Nullable Duration maxBufferingDuration) {
+    return new Throttle<>(
+        configuration
+            .toBuilder()
+            .setStreamBufferingSize(bufferingSize)
+            .setStreamMaxBufferingDuration(maxBufferingDuration)
+            .build());
+  }
+
+  private final Configuration configuration;
+
+  private Throttle(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public PCollection<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, 
Iterable<RequestT>>>>
+        groupingTransform = GroupByKey.create();
+    String groupingStepName = GroupByKey.class.getSimpleName();
+    if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
+      groupingStepName = GroupIntoBatches.class.getSimpleName();
+      long bufferingSize =
+          checkStateNotNull(
+              configuration.getStreamBufferingSize(),
+              "Unbounded PCollection is missing streaming configuration; 
configure Throttle for use with unbounded PCollections using 
Throttle#withStreamingConfiguration");
+      GroupIntoBatches<Integer, RequestT> groupIntoBatches = 
GroupIntoBatches.ofSize(bufferingSize);
+      if (configuration.getStreamMaxBufferingDuration() != null) {
+        Duration maxBufferingDuration =
+            checkStateNotNull(configuration.getStreamMaxBufferingDuration());
+        groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(maxBufferingDuration);
+      }
+      groupingTransform = groupIntoBatches;
+    }
+
+    return input
+        // Step 1. Break up the PCollection into fixed channels assigned to an 
int key [0,
+        // Rate::numElements).
+        .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+        .apply(groupingStepName, groupingTransform)
+
+        // Step 2. Convert KV<Integer, Iterable<RequestT>> to KV<Integer, 
List<RequestT>>.
+        // Working with a List<RequestT> is cleaner than an Iterable in 
Splittable DoFns.
+        // IterableCoder uses a List for IterableLikeCoder's structuralValue.
+        .apply("ConvertToList", toList())
+        .setCoder(kvCoder)
+
+        // Step 3. Apply the splittable DoFn that performs the actual work of 
throttling.
+        .apply(ThrottleFn.class.getSimpleName(), throttle());
+  }
+
+  private ParDo.SingleOutput<KV<Integer, List<RequestT>>, RequestT> throttle() 
{
+    return ParDo.of(new ThrottleFn());
+  }
+
+  @DoFn.BoundedPerElement
+  private class ThrottleFn extends DoFn<KV<Integer, List<RequestT>>, RequestT> 
{
+    private @MonotonicNonNull Counter inputElementsCounter = null;
+    private @MonotonicNonNull Counter outputElementsCounter = null;
+
+    @Setup
+    public void setup() {
+      if (configuration.getCollectMetrics()) {
+        inputElementsCounter = Metrics.counter(Throttle.class, 
INPUT_ELEMENTS_COUNTER_NAME);
+        outputElementsCounter = Metrics.counter(Throttle.class, 
OUTPUT_ELEMENTS_COUNTER_NAME);
+      }
+    }
+
+    /**
+     * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED} 
{@link OffsetRange}
+     * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for 
null {@link
+     * KV#getValue()} elements.
+     */
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>> 
element) {
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+      incIfPresent(inputElementsCounter, size);
+      return new OffsetRange(-1, size);
+    }
+
+    /**
+     * The {@link GetInitialWatermarkEstimatorState} initializes to this 
DoFn's output watermark to
+     * a negative infinity timestamp via {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+     * Instant} returned by this method provides the runner the value is 
passes as an argument to
+     * this DoFn's {@link #newWatermarkEstimator}.
+     */
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link 
NewWatermarkEstimator},
+     * instantiated from an {@link Instant}. The state argument in this method 
comes from the return
+     * of the {@link #getInitialWatermarkState}.
+     */
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+      return new WatermarkEstimators.Manual(state);
+    }
+
+    /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange} 
instance. */
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction 
OffsetRange restriction) {
+      return new OffsetRangeTracker(restriction);
+    }
+
+    /** Simply returns the {@link OffsetRange} restriction. */
+    @TruncateRestriction
+    public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+        @Restriction OffsetRange restriction) {
+      return new RestrictionTracker.TruncateResult<OffsetRange>() {
+        @Override
+        public OffsetRange getTruncatedRestriction() {
+          return restriction;
+        }
+      };
+    }
+
+    /**
+     * Emits the next {@code element.getValue().get(position)} from the {@link
+     * OffsetRange#getFrom()} if {@link RestrictionTracker#tryClaim} is true 
and sets the watermark.
+     * If remaining items exist, it returns {@link
+     * DoFn.ProcessContinuation#withResumeDelay(Duration)} with {@link
+     * Configuration#getMaximumRate()}'s {@link Rate#getInterval()}, otherwise 
{@link
+     * ProcessContinuation#stop()}.
+     */
+    @ProcessElement
+    public ProcessContinuation process(
+        @Element KV<Integer, List<RequestT>> element,
+        @Timestamp Instant timestamp,
+        ManualWatermarkEstimator<Instant> estimator,
+        RestrictionTracker<OffsetRange, Long> tracker,
+        OutputReceiver<RequestT> receiver) {
+
+      if (element.getValue() == null || element.getValue().isEmpty()) {
+        return ProcessContinuation.stop();
+      }
+
+      long position = tracker.currentRestriction().getFrom();
+      if (position < 0) {
+        position = 0;
+      }
+
+      if (!tracker.tryClaim(position)) {
+        return ProcessContinuation.stop();
+      }
+
+      RequestT value = element.getValue().get((int) position);
+      estimator.setWatermark(timestamp);
+      receiver.output(value);

Review Comment:
   One possible approach is to have a stateful step using OrderedListState 
where you wrap each element with the desired firing time, then using an SDF 
kind of like this one to return a process continuation until we're ready to 
fire the element (this gets around the issues you've run into with timers not 
firing correctly in batch mode). This may still have watermark concerns though 
(mentioned elsewhere)



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the

Review Comment:
   Nit: This is out of date I think



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the
+ * actual work of throttling by holding the watermark and performing a {@link
+ * DoFn.ProcessContinuation#withResumeDelay(Duration)} of {@link 
Rate#getInterval()} if there are
+ * remaining elements to process. Finally, the transform applies the original 
input's {@link
+ * WindowingStrategy} to the returning {@code PCollection<RequestT>}.
+ */
+public class Throttle<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<RequestT>> {
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  /**
+   * Instantiates a {@link Throttle} with the maximumRate of {@link Rate} and 
without collecting
+   * metrics.
+   */
+  static <RequestT> Throttle<RequestT> of(Rate maximumRate) {
+    return new 
Throttle<>(Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link Throttle} with metrics collection turned on. */
+  Throttle<RequestT> withMetricsCollected() {
+    return new 
Throttle<>(configuration.toBuilder().setCollectMetrics(true).build());
+  }
+
+  /**
+   * Configures {@link Throttle} with additional parameters for use in 
streaming contexts. Calls
+   * {@link #withStreamingConfiguration(long, Duration)} with {@code null} 
{@link Duration}
+   * argument. See {@link #withStreamingConfiguration(long, Duration)} for 
more details.
+   */
+  Throttle<RequestT> withStreamingConfiguration(long bufferingSize) {
+    return withStreamingConfiguration(bufferingSize, null);
+  }
+
+  /**
+   * Configures {@link Throttle} for use with {@link 
PCollection.IsBounded#UNBOUNDED} {@link
+   * PCollection}s. In a streaming context, {@link Throttle} applies the input 
{@code
+   * PCollection<RequestT>} to {@link GroupIntoBatches} prior to throttling. 
Therefore, it requires
+   * additional configuration for how to handle streaming contexts.
+   */
+  Throttle<RequestT> withStreamingConfiguration(
+      long bufferingSize, @Nullable Duration maxBufferingDuration) {
+    return new Throttle<>(
+        configuration
+            .toBuilder()
+            .setStreamBufferingSize(bufferingSize)
+            .setStreamMaxBufferingDuration(maxBufferingDuration)
+            .build());
+  }
+
+  private final Configuration configuration;
+
+  private Throttle(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public PCollection<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, 
Iterable<RequestT>>>>
+        groupingTransform = GroupByKey.create();
+    String groupingStepName = GroupByKey.class.getSimpleName();
+    if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
+      groupingStepName = GroupIntoBatches.class.getSimpleName();
+      long bufferingSize =
+          checkStateNotNull(
+              configuration.getStreamBufferingSize(),
+              "Unbounded PCollection is missing streaming configuration; 
configure Throttle for use with unbounded PCollections using 
Throttle#withStreamingConfiguration");
+      GroupIntoBatches<Integer, RequestT> groupIntoBatches = 
GroupIntoBatches.ofSize(bufferingSize);
+      if (configuration.getStreamMaxBufferingDuration() != null) {
+        Duration maxBufferingDuration =
+            checkStateNotNull(configuration.getStreamMaxBufferingDuration());
+        groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(maxBufferingDuration);
+      }
+      groupingTransform = groupIntoBatches;
+    }
+
+    return input
+        // Step 1. Break up the PCollection into fixed channels assigned to an 
int key [0,
+        // Rate::numElements).
+        .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+        .apply(groupingStepName, groupingTransform)
+
+        // Step 2. Convert KV<Integer, Iterable<RequestT>> to KV<Integer, 
List<RequestT>>.
+        // Working with a List<RequestT> is cleaner than an Iterable in 
Splittable DoFns.
+        // IterableCoder uses a List for IterableLikeCoder's structuralValue.
+        .apply("ConvertToList", toList())
+        .setCoder(kvCoder)
+
+        // Step 3. Apply the splittable DoFn that performs the actual work of 
throttling.
+        .apply(ThrottleFn.class.getSimpleName(), throttle());
+  }
+
+  private ParDo.SingleOutput<KV<Integer, List<RequestT>>, RequestT> throttle() 
{
+    return ParDo.of(new ThrottleFn());
+  }
+
+  @DoFn.BoundedPerElement
+  private class ThrottleFn extends DoFn<KV<Integer, List<RequestT>>, RequestT> 
{
+    private @MonotonicNonNull Counter inputElementsCounter = null;
+    private @MonotonicNonNull Counter outputElementsCounter = null;
+
+    @Setup
+    public void setup() {
+      if (configuration.getCollectMetrics()) {
+        inputElementsCounter = Metrics.counter(Throttle.class, 
INPUT_ELEMENTS_COUNTER_NAME);
+        outputElementsCounter = Metrics.counter(Throttle.class, 
OUTPUT_ELEMENTS_COUNTER_NAME);
+      }
+    }
+
+    /**
+     * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED} 
{@link OffsetRange}
+     * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for 
null {@link
+     * KV#getValue()} elements.
+     */
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>> 
element) {
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+      incIfPresent(inputElementsCounter, size);
+      return new OffsetRange(-1, size);
+    }
+
+    /**
+     * The {@link GetInitialWatermarkEstimatorState} initializes to this 
DoFn's output watermark to
+     * a negative infinity timestamp via {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+     * Instant} returned by this method provides the runner the value is 
passes as an argument to
+     * this DoFn's {@link #newWatermarkEstimator}.
+     */
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link 
NewWatermarkEstimator},
+     * instantiated from an {@link Instant}. The state argument in this method 
comes from the return
+     * of the {@link #getInitialWatermarkState}.
+     */
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+      return new WatermarkEstimators.Manual(state);
+    }
+
+    /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange} 
instance. */
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Long> newTracker(@Restriction 
OffsetRange restriction) {
+      return new OffsetRangeTracker(restriction);
+    }
+
+    /** Simply returns the {@link OffsetRange} restriction. */
+    @TruncateRestriction
+    public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+        @Restriction OffsetRange restriction) {
+      return new RestrictionTracker.TruncateResult<OffsetRange>() {
+        @Override
+        public OffsetRange getTruncatedRestriction() {
+          return restriction;
+        }
+      };
+    }
+
+    /**
+     * Emits the next {@code element.getValue().get(position)} from the {@link
+     * OffsetRange#getFrom()} if {@link RestrictionTracker#tryClaim} is true 
and sets the watermark.
+     * If remaining items exist, it returns {@link
+     * DoFn.ProcessContinuation#withResumeDelay(Duration)} with {@link
+     * Configuration#getMaximumRate()}'s {@link Rate#getInterval()}, otherwise 
{@link
+     * ProcessContinuation#stop()}.
+     */
+    @ProcessElement
+    public ProcessContinuation process(
+        @Element KV<Integer, List<RequestT>> element,
+        @Timestamp Instant timestamp,
+        ManualWatermarkEstimator<Instant> estimator,
+        RestrictionTracker<OffsetRange, Long> tracker,
+        OutputReceiver<RequestT> receiver) {
+
+      if (element.getValue() == null || element.getValue().isEmpty()) {
+        return ProcessContinuation.stop();
+      }
+
+      long position = tracker.currentRestriction().getFrom();
+      if (position < 0) {
+        position = 0;
+      }
+
+      if (!tracker.tryClaim(position)) {
+        return ProcessContinuation.stop();
+      }
+
+      RequestT value = element.getValue().get((int) position);
+      estimator.setWatermark(timestamp);
+      receiver.output(value);

Review Comment:
   I keep getting hung up on this function, even with the current approach 
(which I think has addressed many of my concerns). Basically, my remaining 
question is: in streaming mode does this actually guarantee the throttling 
behavior we're expecting. Lets say, for example, that we have a throttling 
limit of 100 elements per hour. And then the following happens:
   
   ```
   We get 100 elements all at once
   These elements are grouped into batches of size 1, then processed by this 
transform, all of them fire
   maxBufferingDuration time passes (but less than an hour has passed, so we 
should still be throttled)
   We get 100 elements all at once again
   ```
   won't those 100 elements come in, get put into batches of size 1, and then 
immediately fire (since they have a new restriction tracker/watermark)?
   
   --------------------------------------
   
   FWIW - I do think this is the type of conversation that would have benefited 
greatly from being in a design doc where we can do threaded conversation more 
quickly. Not asking you to retrofit this in, but I am asking that we consider 
more detailed designs in the future to avoid getting bogged down like we have. 
Each time we've return to the PR, we've done so with very different approaches, 
which is expensive for you as an implementer and me as a reviewer.



##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/Throttle.java:
##########
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponse;
+
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Throttle a {@link PCollection} of {@link RequestT} elements.
+ *
+ * <pre>
+ * {@link Throttle} returns the same {@link RequestT}
+ * {@link PCollection}, but with elements emitted at a
+ * slower rate. Throttling is a best effort to decrease a {@link PCollection} 
throughput to a
+ * maximum of a configured {@link Rate} parameter.
+ * </pre>
+ *
+ * <h2>Basic Usage</h2>
+ *
+ * {@link Throttle} minimally requires specifying a maximum {@link Rate} 
parameter.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(Throttle.of(Rate.of(10, 
Duration.standardSeconds(1L))));
+ *
+ * }</pre>
+ *
+ * <h2>Applying metrics</h2>
+ *
+ * Additionally, usage can enable optional metrics to the measure counts of 
input and outputs.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *   Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *           .withMetricsCollected()
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Additional streaming configuration</h2>
+ *
+ * In a streaming context, {@link Throttle} uses {@link GroupIntoBatches} to 
group elements of the
+ * input {@link PCollection} prior to throttling. Therefore, it needs to know 
how it should apply
+ * {@link GroupIntoBatches} to the input {@link PCollection}. The following 
takes the additional
+ * parameters via {@link #withStreamingConfiguration} that it forwards when 
instantiating and
+ * applying {@link GroupIntoBatches}.
+ *
+ * <pre>{@code
+ * PCollection<RequestT> original = ...
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize)
+ * );
+ *
+ *   // or
+ *
+ * PCollection<RequestT> throttled = original.apply(
+ *  Throttle.of(Rate.of(10, Duration.standardSeconds(1L)))
+ *          .withStreamingConfiguration(bufferingSize, maxBufferingDuration)
+ * );
+ *
+ * }</pre>
+ *
+ * <h2>Throttle Algorithm</h2>
+ *
+ * The following discusses the algorithm for how {@link Throttle} reduces the 
throughput of a {@code
+ * PCollection<RequestT>}.
+ *
+ * <p>First, the transform processes the original {@code 
PCollection<RequestT>} into a {@code
+ * PCollection<KV<Integer, RequestT>>} via random assignment of the key using 
{@link
+ * RandomDataGenerator#nextInt}. The result is a key space: [0, {@link 
Rate#getNumElements()}) such
+ * that each keyed channel is throttled at a rate of {@link 
Rate#getInterval()}. Next, for unbounded
+ * {@link PCollection}s i.e. streaming, the transform applies {@link 
GroupIntoBatches}; for bounded
+ * it applies {@link GroupByKey}. Then the transform converts the resulting, 
{@code
+ * PCollection<KV<Integer, Iterable<RequestT>>>} into a {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>}. This is done to simplify the coding of the downstream <a
+ * 
href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns";>Splittable
+ * DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code 
PCollection<KV<Integer,
+ * List<RequestT>>>} prior to applying to the splittable DoFn. This splittable 
DoFn performs the
+ * actual work of throttling by holding the watermark and performing a {@link
+ * DoFn.ProcessContinuation#withResumeDelay(Duration)} of {@link 
Rate#getInterval()} if there are
+ * remaining elements to process. Finally, the transform applies the original 
input's {@link
+ * WindowingStrategy} to the returning {@code PCollection<RequestT>}.
+ */
+public class Throttle<RequestT> extends PTransform<PCollection<RequestT>, 
PCollection<RequestT>> {
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  /**
+   * Instantiates a {@link Throttle} with the maximumRate of {@link Rate} and 
without collecting
+   * metrics.
+   */
+  static <RequestT> Throttle<RequestT> of(Rate maximumRate) {
+    return new 
Throttle<>(Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link Throttle} with metrics collection turned on. */
+  Throttle<RequestT> withMetricsCollected() {
+    return new 
Throttle<>(configuration.toBuilder().setCollectMetrics(true).build());
+  }
+
+  /**
+   * Configures {@link Throttle} with additional parameters for use in 
streaming contexts. Calls
+   * {@link #withStreamingConfiguration(long, Duration)} with {@code null} 
{@link Duration}
+   * argument. See {@link #withStreamingConfiguration(long, Duration)} for 
more details.
+   */
+  Throttle<RequestT> withStreamingConfiguration(long bufferingSize) {
+    return withStreamingConfiguration(bufferingSize, null);
+  }
+
+  /**
+   * Configures {@link Throttle} for use with {@link 
PCollection.IsBounded#UNBOUNDED} {@link
+   * PCollection}s. In a streaming context, {@link Throttle} applies the input 
{@code
+   * PCollection<RequestT>} to {@link GroupIntoBatches} prior to throttling. 
Therefore, it requires
+   * additional configuration for how to handle streaming contexts.
+   */
+  Throttle<RequestT> withStreamingConfiguration(
+      long bufferingSize, @Nullable Duration maxBufferingDuration) {
+    return new Throttle<>(
+        configuration
+            .toBuilder()
+            .setStreamBufferingSize(bufferingSize)
+            .setStreamMaxBufferingDuration(maxBufferingDuration)
+            .build());
+  }
+
+  private final Configuration configuration;
+
+  private Throttle(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public PCollection<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, 
Iterable<RequestT>>>>
+        groupingTransform = GroupByKey.create();
+    String groupingStepName = GroupByKey.class.getSimpleName();
+    if (input.isBounded().equals(PCollection.IsBounded.UNBOUNDED)) {
+      groupingStepName = GroupIntoBatches.class.getSimpleName();
+      long bufferingSize =
+          checkStateNotNull(
+              configuration.getStreamBufferingSize(),
+              "Unbounded PCollection is missing streaming configuration; 
configure Throttle for use with unbounded PCollections using 
Throttle#withStreamingConfiguration");
+      GroupIntoBatches<Integer, RequestT> groupIntoBatches = 
GroupIntoBatches.ofSize(bufferingSize);
+      if (configuration.getStreamMaxBufferingDuration() != null) {
+        Duration maxBufferingDuration =
+            checkStateNotNull(configuration.getStreamMaxBufferingDuration());
+        groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(maxBufferingDuration);
+      }
+      groupingTransform = groupIntoBatches;
+    }
+
+    return input
+        // Step 1. Break up the PCollection into fixed channels assigned to an 
int key [0,
+        // Rate::numElements).
+        .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+        .apply(groupingStepName, groupingTransform)
+
+        // Step 2. Convert KV<Integer, Iterable<RequestT>> to KV<Integer, 
List<RequestT>>.
+        // Working with a List<RequestT> is cleaner than an Iterable in 
Splittable DoFns.
+        // IterableCoder uses a List for IterableLikeCoder's structuralValue.
+        .apply("ConvertToList", toList())
+        .setCoder(kvCoder)
+
+        // Step 3. Apply the splittable DoFn that performs the actual work of 
throttling.
+        .apply(ThrottleFn.class.getSimpleName(), throttle());
+  }
+
+  private ParDo.SingleOutput<KV<Integer, List<RequestT>>, RequestT> throttle() 
{
+    return ParDo.of(new ThrottleFn());
+  }
+
+  @DoFn.BoundedPerElement
+  private class ThrottleFn extends DoFn<KV<Integer, List<RequestT>>, RequestT> 
{
+    private @MonotonicNonNull Counter inputElementsCounter = null;
+    private @MonotonicNonNull Counter outputElementsCounter = null;
+
+    @Setup
+    public void setup() {
+      if (configuration.getCollectMetrics()) {
+        inputElementsCounter = Metrics.counter(Throttle.class, 
INPUT_ELEMENTS_COUNTER_NAME);
+        outputElementsCounter = Metrics.counter(Throttle.class, 
OUTPUT_ELEMENTS_COUNTER_NAME);
+      }
+    }
+
+    /**
+     * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED} 
{@link OffsetRange}
+     * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for 
null {@link
+     * KV#getValue()} elements.
+     */
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>> 
element) {
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+      incIfPresent(inputElementsCounter, size);
+      return new OffsetRange(-1, size);
+    }
+
+    /**
+     * The {@link GetInitialWatermarkEstimatorState} initializes to this 
DoFn's output watermark to
+     * a negative infinity timestamp via {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+     * Instant} returned by this method provides the runner the value is 
passes as an argument to
+     * this DoFn's {@link #newWatermarkEstimator}.
+     */
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link 
NewWatermarkEstimator},
+     * instantiated from an {@link Instant}. The state argument in this method 
comes from the return
+     * of the {@link #getInitialWatermarkState}.
+     */
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+      return new WatermarkEstimators.Manual(state);

Review Comment:
   One question I have (genuinely not sure of the answer) is how putting a 
watermark estimator in the middle of a pipeline impacts the previous watermark. 
For example, lets say I have the following structure:
   
   ```
   1) Read from Pub/Sub
   2) Throttle
   3) GroupByKey
   ```
   
   And I have a constant stream of elements coming from pub/sub, will the 
GroupByKey _ever_ fire? Basically, I can imagine the pubsub watermark slowly 
rising, but since Throttle initializes a new estimator each time it encounters 
an element, its watermark might never advance (until its processed all 
elements, which would never happen).
   
   The longer I stare at this (have been looking at this PR for a while this 
afternoon), the more I'm just generally concerned with using an SDF this way. 
An SDF is by nature meant to process a _single_ element (or at least a bounded 
set of elements). Here we're using an SDF on a potentially unbounded dataset in 
an environment where we don't control the windowing, triggering, or watermark, 
and I basically have no clue how it will behave (but I'd guess poorly).
   
   I think I've expressed my preference for a timers based approach before (and 
saying this doesn't work in batch), but I think these concerns just reaffirm 
that preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to