damondouglas commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1473321569
##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java:
##########
@@ -17,41 +17,651 @@
*/
package org.apache.beam.io.requestresponse;
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.SplittableRandom;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
/**
* {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link
PCollection} emitting
* a {@link RequestT} {@link PCollection} at a maximally configured rate,
without using an external
* resource.
*/
-// TODO(damondouglas): expand what "without external resource" means with
respect to "with external
-// resource" when the other throttle transforms implemented.
-// See: https://github.com/apache/beam/issues/28932
class ThrottleWithoutExternalResource<RequestT>
- extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
+ extends PTransform<PCollection<RequestT>, Result<RequestT>> {
+
+ static final String DISTRIBUTION_METRIC_NAME =
"milliseconds_between_element_emissions";
+ static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+ static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+ private final TupleTag<RequestT> outputTag = new TupleTag<RequestT>() {};
+ private final TupleTag<ApiIOError> errorTag = new TupleTag<ApiIOError>() {};
+
+ /**
+ * Instantiates a {@link ThrottleWithoutExternalResource} with the
maximumRate of {@link Rate} and
+ * without collecting metrics.
+ */
+ static <RequestT> ThrottleWithoutExternalResource<RequestT> of(Rate
maximumRate) {
+ return new ThrottleWithoutExternalResource<>(
+ Configuration.builder().setMaximumRate(maximumRate).build());
+ }
+
+ /** Returns {@link ThrottleWithoutExternalResource} with metrics collection
turned on. */
+ ThrottleWithoutExternalResource<RequestT> withMetricsCollected() {
+ return new ThrottleWithoutExternalResource<>(
+ configuration.toBuilder().setCollectMetrics(true).build());
+ }
- // TODO(damondouglas): remove suppress warnings when finally utilized in a
future PR.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT> configuration;
+ private final Configuration configuration;
- private ThrottleWithoutExternalResource(Configuration<RequestT>
configuration) {
+ private ThrottleWithoutExternalResource(Configuration configuration) {
this.configuration = configuration;
}
@Override
- public PCollection<RequestT> expand(PCollection<RequestT> input) {
- // TODO(damondouglas): expand in a future PR.
- return input;
+ public Result<RequestT> expand(PCollection<RequestT> input) {
+ ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+ Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(),
listCoder);
+
+ PCollectionTuple pct =
+ input
+ // Break up the PCollection into fixed channels assigned to an int
key [0,
+ // Rate::numElements).
+ .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+ // Apply GlobalWindows to prevent multiple window assignment.
+ .apply(
+ GlobalWindows.class.getSimpleName(),
+ Window.<KV<Integer, RequestT>>into(new GlobalWindows())
+
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+ .discardingFiredPanes())
+ // Apply GroupByKey to convert PCollection of KV<Integer,
RequestT> to KV<Integer,
+ // Iterable<RequestT>>.
+ .apply(GroupByKey.class.getSimpleName(), GroupByKey.create())
+ // Convert KV<Integer, Iterable<RequestT>> to KV<Integer,
List<RequestT>> for cleaner
+ // processing by ThrottleFn; IterableCoder uses a List for
IterableLikeCoder's
+ // structuralValue.
+ .apply("ConvertToList", toList())
+ .setCoder(kvCoder)
+ // Finally apply a splittable DoFn by splitting the
Iterable<RequestT>, controlling the
+ // output via the watermark estimator.
+ .apply(ThrottleFn.class.getSimpleName(), throttle());
+
+ Result<RequestT> result = Result.of(input.getCoder(), outputTag, errorTag,
pct);
+
+ // If configured to collect metrics, assign a single key to the global
window timestamp and
+ // apply ComputeMetricsFn.
+ if (configuration.getCollectMetrics()) {
+ result
+ .getResponses()
+ .apply(
+ BoundedWindow.class.getSimpleName(),
+ WithKeys.of(ignored ->
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()))
+ .setCoder(KvCoder.of(VarLongCoder.of(), input.getCoder()))
+ .apply(ComputeMetricsFn.class.getSimpleName(), computeMetrics())
+ .setCoder(input.getCoder());
+ }
+
+ return result;
+ }
+
+ private ParDo.MultiOutput<KV<Integer, List<RequestT>>, RequestT> throttle() {
+ return ParDo.of(new ThrottleFn<RequestT>(configuration, outputTag))
+ .withOutputTags(outputTag, TupleTagList.of(errorTag));
+ }
+
+ /**
+ * This {@link DoFn} is inspired by {@link
org.apache.beam.sdk.transforms.PeriodicSequence}'s DoFn
+ * implementation with the exception that instead of emitting an {@link
Instant}, it emits a
+ * {@link RequestT}. Additionally, it uses an Integer based {@link
OffsetRange} and its associated
+ * {@link OffsetRangeTracker}. The reason for using an Integer based offset
range is due to Java
+ * collection sizes limit to int instead of long. Splittable DoFns provide
access to hold the
+ * watermark, and along with an output with timestamp, allow the DoFn to
emit elements as
+ * prescribed intervals.
+ */
+ static class ThrottleFn<RequestT> extends DoFn<KV<Integer, List<RequestT>>,
RequestT> {
+
+ private final Configuration configuration;
+ private final TupleTag<RequestT> outputTag;
+ private @MonotonicNonNull Counter inputElementsCounter = null;
+ private @MonotonicNonNull Counter outputElementsCounter = null;
+
+ ThrottleFn(Configuration configuration, TupleTag<RequestT> outputTag) {
+ this.configuration = configuration;
+ this.outputTag = outputTag;
+ }
+
+ @Setup
+ public void setup() {
+ if (configuration.getCollectMetrics()) {
+ inputElementsCounter = Metrics.counter(ThrottleFn.class,
INPUT_ELEMENTS_COUNTER_NAME);
+ outputElementsCounter = Metrics.counter(ThrottleFn.class,
OUTPUT_ELEMENTS_COUNTER_NAME);
+ }
+ }
+
+ /**
+ * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED}
{@link OffsetRange}
+ * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for
null {@link
+ * KV#getValue()} elements.
+ */
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>>
element) {
+ int size = 0;
+ if (element.getValue() != null) {
+ size = element.getValue().size();
+ }
+ return OffsetRange.ofSize(size);
+ }
+
+ /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange}
instance. */
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Integer> newTracker(
+ @Restriction OffsetRange restriction) {
+ return new OffsetRangeTracker(restriction);
+ }
+
+ /** Simply returns the {@link OffsetRange} restriction. */
+ @TruncateRestriction
+ public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+ @Restriction OffsetRange restriction) {
+ return new RestrictionTracker.TruncateResult<OffsetRange>() {
+ @Override
+ public ThrottleWithoutExternalResource.@Nullable OffsetRange
getTruncatedRestriction() {
+ return restriction;
+ }
+ };
+ }
+
+ /**
+ * The {@link GetInitialWatermarkEstimatorState} initializes to this
DoFn's output watermark to
+ * a negative infinity timestamp via {@link
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+ * Instant} returned by this method provides the runner the value is
passes as an argument to
+ * this DoFn's {@link #newWatermarkEstimator}.
+ */
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkState() {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link
NewWatermarkEstimator},
+ * instantiated from an {@link Instant}. The state argument in this method
comes from the return
+ * of the {@link #getInitialWatermarkState}.
+ */
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Instant> newWatermarkEstimator(
+ @WatermarkEstimatorState Instant state) {
+ return new WatermarkEstimators.Manual(state);
+ }
+
+ @ProcessElement
+ public void process(
+ @Element KV<Integer, List<RequestT>> element,
+ ManualWatermarkEstimator<Instant> estimator,
+ RestrictionTracker<OffsetRange, Integer> tracker,
+ MultiOutputReceiver receiver) {
+
+ int size = 0;
+ if (element.getValue() != null) {
+ size = element.getValue().size();
+ }
+
+ incIfPresent(inputElementsCounter, size);
+
+ if (element.getValue() == null || element.getValue().isEmpty()) {
+ return;
+ }
+
+ while (tracker.tryClaim(tracker.currentRestriction().getCurrent() + 1)) {
+ Instant nextEmittedTimestamp =
+
estimator.currentWatermark().plus(configuration.getMaximumRate().getInterval());
+ int index = tracker.currentRestriction().getCurrent();
+ RequestT requestT = element.getValue().get(index);
+ outputAndSetWatermark(nextEmittedTimestamp, requestT, estimator,
receiver);
+ }
+ }
+
+ /**
+ * Emits the element at the nextEmittedTimestamp and sets the watermark to
the same when {@link
+ * Instant#now()} reaches the nextEmittedTimestamp.
+ */
+ private void outputAndSetWatermark(
+ Instant nextEmittedTimestamp,
+ RequestT requestT,
+ ManualWatermarkEstimator<Instant> estimator,
+ MultiOutputReceiver receiver) {
+ Instant now = Instant.now();
+ while (now.isBefore(nextEmittedTimestamp)) {
+ now = Instant.now();
Review Comment:
The reason I am not returning a ProcessContinuation is ["While the runner
tries to honor the resume time, this is not
guaranteed"](https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint:~:text=While%20the%20runner%20tries%20to%20honor%20the%20resume%20time%2C%20this%20is%20not%20guaranteed.).
The main feature of this implementation is to control the time when elements
are emitted and not give up control.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]