damondouglas commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1473310832


##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java:
##########
@@ -17,41 +17,651 @@
  */
 package org.apache.beam.io.requestresponse;
 
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.SplittableRandom;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
 
 /**
  * {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link 
PCollection} emitting
  * a {@link RequestT} {@link PCollection} at a maximally configured rate, 
without using an external
  * resource.
  */
-// TODO(damondouglas): expand what "without external resource" means with 
respect to "with external
-//   resource" when the other throttle transforms implemented.
-//   See: https://github.com/apache/beam/issues/28932
 class ThrottleWithoutExternalResource<RequestT>
-    extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
+    extends PTransform<PCollection<RequestT>, Result<RequestT>> {
+
+  static final String DISTRIBUTION_METRIC_NAME = 
"milliseconds_between_element_emissions";
+  static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+  static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+  private final TupleTag<RequestT> outputTag = new TupleTag<RequestT>() {};
+  private final TupleTag<ApiIOError> errorTag = new TupleTag<ApiIOError>() {};
+
+  /**
+   * Instantiates a {@link ThrottleWithoutExternalResource} with the 
maximumRate of {@link Rate} and
+   * without collecting metrics.
+   */
+  static <RequestT> ThrottleWithoutExternalResource<RequestT> of(Rate 
maximumRate) {
+    return new ThrottleWithoutExternalResource<>(
+        Configuration.builder().setMaximumRate(maximumRate).build());
+  }
+
+  /** Returns {@link ThrottleWithoutExternalResource} with metrics collection 
turned on. */
+  ThrottleWithoutExternalResource<RequestT> withMetricsCollected() {
+    return new ThrottleWithoutExternalResource<>(
+        configuration.toBuilder().setCollectMetrics(true).build());
+  }
 
-  // TODO(damondouglas): remove suppress warnings when finally utilized in a 
future PR.
-  @SuppressWarnings({"unused"})
-  private final Configuration<RequestT> configuration;
+  private final Configuration configuration;
 
-  private ThrottleWithoutExternalResource(Configuration<RequestT> 
configuration) {
+  private ThrottleWithoutExternalResource(Configuration configuration) {
     this.configuration = configuration;
   }
 
   @Override
-  public PCollection<RequestT> expand(PCollection<RequestT> input) {
-    // TODO(damondouglas): expand in a future PR.
-    return input;
+  public Result<RequestT> expand(PCollection<RequestT> input) {
+    ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+    Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), 
listCoder);
+
+    PCollectionTuple pct =
+        input
+            // Break up the PCollection into fixed channels assigned to an int 
key [0,
+            // Rate::numElements).
+            .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+            // Apply GlobalWindows to prevent multiple window assignment.
+            .apply(

Review Comment:
   Thank you for pointing this out. If we have more than one window, splits 
downstream will have restrictions aligned to multiple windows and defeat the 
purpose of using the splittable DoFn to throttle the PCollection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to