damondouglas commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1473329149


##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java:
##########
@@ -17,41 +17,651 @@
  */
 package org.apache.beam.io.requestresponse;
 
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.SplittableRandom;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
 
 /**
  * {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link 
PCollection} emitting
  * a {@link RequestT} {@link PCollection} at a maximally configured rate, 
without using an external
  * resource.
  */
-// TODO(damondouglas): expand what "without external resource" means with 
respect to "with external
-//   resource" when the other throttle transforms implemented.
-//   See: https://github.com/apache/beam/issues/28932
 class ThrottleWithoutExternalResource<RequestT>
-    extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
+    extends PTransform<PCollection<RequestT>, Result<RequestT>> {
+
+  static final String DISTRIBUTION_METRIC_NAME = 
"milliseconds_between_element_emissions";
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  private final TupleTag<RequestT> outputTag = new TupleTag<RequestT>() {};
+  private final TupleTag<ApiIOError> errorTag = new TupleTag<ApiIOError>() {};
+
+  /**
+   * Instantiates a {@link ThrottleWithoutExternalResource} with the 
maximumRate of {@link Rate} and
+   * without collecting metrics.
+   */
+  static <RequestT> ThrottleWithoutExternalResource<RequestT> of(Rate 
maximumRate) {
+    return new ThrottleWithoutExternalResource<>(
+        Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link ThrottleWithoutExternalResource} with metrics collection 
turned on. */
+  ThrottleWithoutExternalResource<RequestT> withMetricsCollected() {
+    return new ThrottleWithoutExternalResource<>(
+        configuration.toBuilder().setCollectMetrics(true).build());
+  }
 
-  // TODO(damondouglas): remove suppress warnings when finally utilized in a 
future PR.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
+  private final Configuration configuration;
 
-  private ThrottleWithoutExternalResource(Configuration<RequestT> 
configuration) {
+  private ThrottleWithoutExternalResource(Configuration configuration) {
     this.configuration = configuration;
   }
 
   @Override
-  public PCollection<RequestT> expand(PCollection<RequestT> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return input;
+  public Result<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PCollectionTuple pct =
+        input
+            // Break up the PCollection into fixed channels assigned to an int 
key [0,
+            // Rate::numElements).
+            .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+            // Apply GlobalWindows to prevent multiple window assignment.
+            .apply(
+                GlobalWindows.class.getSimpleName(),
+                Window.<KV<Integer, RequestT>>into(new GlobalWindows())
+                    
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+                    .discardingFiredPanes())
+            // Apply GroupByKey to convert PCollection of KV<Integer, 
RequestT> to KV<Integer,
+            // Iterable<RequestT>>.
+            .apply(GroupByKey.class.getSimpleName(), GroupByKey.create())
+            // Convert KV<Integer, Iterable<RequestT>> to KV<Integer, 
List<RequestT>> for cleaner
+            // processing by ThrottleFn; IterableCoder uses a List for 
IterableLikeCoder's
+            // structuralValue.
+            .apply("ConvertToList", toList())
+            .setCoder(kvCoder)
+            // Finally apply a splittable DoFn by splitting the 
Iterable<RequestT>, controlling the
+            // output via the watermark estimator.
+            .apply(ThrottleFn.class.getSimpleName(), throttle());
+
+    Result<RequestT> result = Result.of(input.getCoder(), outputTag, errorTag, 
pct);
+
+    // If configured to collect metrics, assign a single key to the global 
window timestamp and
+    // apply ComputeMetricsFn.
+    if (configuration.getCollectMetrics()) {
+      result
+          .getResponses()
+          .apply(
+              BoundedWindow.class.getSimpleName(),
+              WithKeys.of(ignored -> 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()))
+          .setCoder(KvCoder.of(VarLongCoder.of(), input.getCoder()))
+          .apply(ComputeMetricsFn.class.getSimpleName(), computeMetrics())
+          .setCoder(input.getCoder());
+    }
+
+    return result;
+  }
+
+  private ParDo.MultiOutput<KV<Integer, List<RequestT>>, RequestT> throttle() {
+    return ParDo.of(new ThrottleFn<RequestT>(configuration, outputTag))
+        .withOutputTags(outputTag, TupleTagList.of(errorTag));
+  }
+
+  /**
+   * This {@link DoFn} is inspired by {@link 
org.apache.beam.sdk.transforms.PeriodicSequence}'s DoFn
+   * implementation with the exception that instead of emitting an {@link 
Instant}, it emits a
+   * {@link RequestT}. Additionally, it uses an Integer based {@link 
OffsetRange} and its associated
+   * {@link OffsetRangeTracker}. The reason for using an Integer based offset 
range is due to Java
+   * collection sizes limit to int instead of long. Splittable DoFns provide 
access to hold the
+   * watermark, and along with an output with timestamp, allow the DoFn to 
emit elements as
+   * prescribed intervals.
+   */
+  static class ThrottleFn<RequestT> extends DoFn<KV<Integer, List<RequestT>>, 
RequestT> {
+
+    private final Configuration configuration;
+    private final TupleTag<RequestT> outputTag;
+    private @MonotonicNonNull Counter inputElementsCounter = null;
+    private @MonotonicNonNull Counter outputElementsCounter = null;
+
+    ThrottleFn(Configuration configuration, TupleTag<RequestT> outputTag) {
+      this.configuration = configuration;
+      this.outputTag = outputTag;
+    }
+
+    @Setup
+    public void setup() {
+      if (configuration.getCollectMetrics()) {
+        inputElementsCounter = Metrics.counter(ThrottleFn.class, 
INPUT_ELEMENTS_COUNTER_NAME);
+        outputElementsCounter = Metrics.counter(ThrottleFn.class, 
OUTPUT_ELEMENTS_COUNTER_NAME);
+      }
+    }
+
+    /**
+     * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED} 
{@link OffsetRange}
+     * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for 
null {@link
+     * KV#getValue()} elements.
+     */
+    @GetInitialRestriction
+    public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>> 
element) {
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+      return OffsetRange.ofSize(size);
+    }
+
+    /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange} 
instance. */
+    @NewTracker
+    public RestrictionTracker<OffsetRange, Integer> newTracker(
+        @Restriction OffsetRange restriction) {
+      return new OffsetRangeTracker(restriction);
+    }
+
+    /** Simply returns the {@link OffsetRange} restriction. */
+    @TruncateRestriction
+    public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+        @Restriction OffsetRange restriction) {
+      return new RestrictionTracker.TruncateResult<OffsetRange>() {
+        @Override
+        public ThrottleWithoutExternalResource.@Nullable OffsetRange 
getTruncatedRestriction() {
+          return restriction;
+        }
+      };
+    }
+
+    /**
+     * The {@link GetInitialWatermarkEstimatorState} initializes to this 
DoFn's output watermark to
+     * a negative infinity timestamp via {@link 
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+     * Instant} returned by this method provides the runner the value is 
passes as an argument to
+     * this DoFn's {@link #newWatermarkEstimator}.
+     */
+    @GetInitialWatermarkEstimatorState
+    public Instant getInitialWatermarkState() {
+      return BoundedWindow.TIMESTAMP_MIN_VALUE;
+    }
+
+    /**
+     * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link 
NewWatermarkEstimator},
+     * instantiated from an {@link Instant}. The state argument in this method 
comes from the return
+     * of the {@link #getInitialWatermarkState}.
+     */
+    @NewWatermarkEstimator
+    public WatermarkEstimator<Instant> newWatermarkEstimator(
+        @WatermarkEstimatorState Instant state) {
+      return new WatermarkEstimators.Manual(state);
+    }
+
+    @ProcessElement
+    public void process(
+        @Element KV<Integer, List<RequestT>> element,
+        ManualWatermarkEstimator<Instant> estimator,
+        RestrictionTracker<OffsetRange, Integer> tracker,
+        MultiOutputReceiver receiver) {
+
+      int size = 0;
+      if (element.getValue() != null) {
+        size = element.getValue().size();
+      }
+
+      incIfPresent(inputElementsCounter, size);
+
+      if (element.getValue() == null || element.getValue().isEmpty()) {
+        return;
+      }
+
+      while (tracker.tryClaim(tracker.currentRestriction().getCurrent() + 1)) {

Review Comment:
   The constraint of a Splittable DoFn's ProcessElement method is that we can 
only access the restriction via the restriction tracker i.e. we can't have a 
restriction as the function argument of the ProcessElement method. We can only 
have the RestrictionTracker as its argument. Also, there is some implementation 
relationship between emitting elements and tryClaim. I'd have to open up the 
IDE and simulate the error again but it's something related to "you can't emit 
elements until you tryClaim, etc etc". There's an intimate coupling between a 
lot of these agents that I had to keep as close as possible to the limited 
examples and documentation we have on Splittable DoFns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to