damondouglas commented on code in PR #30123:
URL: https://github.com/apache/beam/pull/30123#discussion_r1476567150
##########
sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java:
##########
@@ -17,41 +17,651 @@
*/
package org.apache.beam.io.requestresponse;
+import static org.apache.beam.io.requestresponse.Monitoring.incIfPresent;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
+import static org.apache.beam.sdk.values.TypeDescriptors.kvs;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+import java.util.Spliterator;
+import java.util.SplittableRandom;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metric;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
/**
* {@link ThrottleWithoutExternalResource} throttles a {@link RequestT} {@link
PCollection} emitting
* a {@link RequestT} {@link PCollection} at a maximally configured rate,
without using an external
* resource.
*/
-// TODO(damondouglas): expand what "without external resource" means with
respect to "with external
-// resource" when the other throttle transforms implemented.
-// See: https://github.com/apache/beam/issues/28932
class ThrottleWithoutExternalResource<RequestT>
- extends PTransform<PCollection<RequestT>, PCollection<RequestT>> {
+ extends PTransform<PCollection<RequestT>, Result<RequestT>> {
+
+ static final String DISTRIBUTION_METRIC_NAME =
"milliseconds_between_element_emissions";
+ static final String INPUT_ELEMENTS_COUNTER_NAME = "input_elements_count";
+ static final String OUTPUT_ELEMENTS_COUNTER_NAME = "output_elements_count";
+
+ private final TupleTag<RequestT> outputTag = new TupleTag<RequestT>() {};
+ private final TupleTag<ApiIOError> errorTag = new TupleTag<ApiIOError>() {};
+
+ /**
+ * Instantiates a {@link ThrottleWithoutExternalResource} with the
maximumRate of {@link Rate} and
+ * without collecting metrics.
+ */
+ static <RequestT> ThrottleWithoutExternalResource<RequestT> of(Rate
maximumRate) {
+ return new ThrottleWithoutExternalResource<>(
+ Configuration.builder().setMaximumRate(maximumRate).build());
+ }
+
+ /** Returns {@link ThrottleWithoutExternalResource} with metrics collection
turned on. */
+ ThrottleWithoutExternalResource<RequestT> withMetricsCollected() {
+ return new ThrottleWithoutExternalResource<>(
+ configuration.toBuilder().setCollectMetrics(true).build());
+ }
- // TODO(damondouglas): remove suppress warnings when finally utilized in a
future PR.
- @SuppressWarnings({"unused"})
- private final Configuration<RequestT> configuration;
+ private final Configuration configuration;
- private ThrottleWithoutExternalResource(Configuration<RequestT>
configuration) {
+ private ThrottleWithoutExternalResource(Configuration configuration) {
this.configuration = configuration;
}
@Override
- public PCollection<RequestT> expand(PCollection<RequestT> input) {
- // TODO(damondouglas): expand in a future PR.
- return input;
+ public Result<RequestT> expand(PCollection<RequestT> input) {
+ ListCoder<RequestT> listCoder = ListCoder.of(input.getCoder());
+ Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(),
listCoder);
+
+ PCollectionTuple pct =
+ input
+ // Break up the PCollection into fixed channels assigned to an int
key [0,
+ // Rate::numElements).
+ .apply(AssignChannelFn.class.getSimpleName(), assignChannels())
+ // Apply GlobalWindows to prevent multiple window assignment.
+ .apply(
+ GlobalWindows.class.getSimpleName(),
+ Window.<KV<Integer, RequestT>>into(new GlobalWindows())
+
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
+ .discardingFiredPanes())
+ // Apply GroupByKey to convert PCollection of KV<Integer,
RequestT> to KV<Integer,
+ // Iterable<RequestT>>.
+ .apply(GroupByKey.class.getSimpleName(), GroupByKey.create())
+ // Convert KV<Integer, Iterable<RequestT>> to KV<Integer,
List<RequestT>> for cleaner
+ // processing by ThrottleFn; IterableCoder uses a List for
IterableLikeCoder's
+ // structuralValue.
+ .apply("ConvertToList", toList())
+ .setCoder(kvCoder)
+ // Finally apply a splittable DoFn by splitting the
Iterable<RequestT>, controlling the
+ // output via the watermark estimator.
+ .apply(ThrottleFn.class.getSimpleName(), throttle());
+
+ Result<RequestT> result = Result.of(input.getCoder(), outputTag, errorTag,
pct);
+
+ // If configured to collect metrics, assign a single key to the global
window timestamp and
+ // apply ComputeMetricsFn.
+ if (configuration.getCollectMetrics()) {
+ result
+ .getResponses()
+ .apply(
+ BoundedWindow.class.getSimpleName(),
+ WithKeys.of(ignored ->
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()))
+ .setCoder(KvCoder.of(VarLongCoder.of(), input.getCoder()))
+ .apply(ComputeMetricsFn.class.getSimpleName(), computeMetrics())
+ .setCoder(input.getCoder());
+ }
+
+ return result;
+ }
+
+ private ParDo.MultiOutput<KV<Integer, List<RequestT>>, RequestT> throttle() {
+ return ParDo.of(new ThrottleFn<RequestT>(configuration, outputTag))
+ .withOutputTags(outputTag, TupleTagList.of(errorTag));
+ }
+
+ /**
+ * This {@link DoFn} is inspired by {@link
org.apache.beam.sdk.transforms.PeriodicSequence}'s DoFn
+ * implementation with the exception that instead of emitting an {@link
Instant}, it emits a
+ * {@link RequestT}. Additionally, it uses an Integer based {@link
OffsetRange} and its associated
+ * {@link OffsetRangeTracker}. The reason for using an Integer based offset
range is due to Java
+ * collection sizes limit to int instead of long. Splittable DoFns provide
access to hold the
+ * watermark, and along with an output with timestamp, allow the DoFn to
emit elements as
+ * prescribed intervals.
+ */
+ static class ThrottleFn<RequestT> extends DoFn<KV<Integer, List<RequestT>>,
RequestT> {
+
+ private final Configuration configuration;
+ private final TupleTag<RequestT> outputTag;
+ private @MonotonicNonNull Counter inputElementsCounter = null;
+ private @MonotonicNonNull Counter outputElementsCounter = null;
+
+ ThrottleFn(Configuration configuration, TupleTag<RequestT> outputTag) {
+ this.configuration = configuration;
+ this.outputTag = outputTag;
+ }
+
+ @Setup
+ public void setup() {
+ if (configuration.getCollectMetrics()) {
+ inputElementsCounter = Metrics.counter(ThrottleFn.class,
INPUT_ELEMENTS_COUNTER_NAME);
+ outputElementsCounter = Metrics.counter(ThrottleFn.class,
OUTPUT_ELEMENTS_COUNTER_NAME);
+ }
+ }
+
+ /**
+ * Instantiates an initial {@link RestrictionTracker.IsBounded#BOUNDED}
{@link OffsetRange}
+ * restriction from [-1, {@link List#size()}). Defaults to [-1, 0) for
null {@link
+ * KV#getValue()} elements.
+ */
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(@Element KV<Integer, List<RequestT>>
element) {
+ int size = 0;
+ if (element.getValue() != null) {
+ size = element.getValue().size();
+ }
+ return OffsetRange.ofSize(size);
+ }
+
+ /** Instantiates an {@link OffsetRangeTracker} from an {@link OffsetRange}
instance. */
+ @NewTracker
+ public RestrictionTracker<OffsetRange, Integer> newTracker(
+ @Restriction OffsetRange restriction) {
+ return new OffsetRangeTracker(restriction);
+ }
+
+ /** Simply returns the {@link OffsetRange} restriction. */
+ @TruncateRestriction
+ public RestrictionTracker.TruncateResult<OffsetRange> truncate(
+ @Restriction OffsetRange restriction) {
+ return new RestrictionTracker.TruncateResult<OffsetRange>() {
+ @Override
+ public ThrottleWithoutExternalResource.@Nullable OffsetRange
getTruncatedRestriction() {
+ return restriction;
+ }
+ };
+ }
+
+ /**
+ * The {@link GetInitialWatermarkEstimatorState} initializes to this
DoFn's output watermark to
+ * a negative infinity timestamp via {@link
BoundedWindow#TIMESTAMP_MIN_VALUE}. The {@link
+ * Instant} returned by this method provides the runner the value is
passes as an argument to
+ * this DoFn's {@link #newWatermarkEstimator}.
+ */
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkState() {
+ return BoundedWindow.TIMESTAMP_MIN_VALUE;
+ }
+
+ /**
+ * This DoFn uses a {@link WatermarkEstimators.Manual} as its {@link
NewWatermarkEstimator},
+ * instantiated from an {@link Instant}. The state argument in this method
comes from the return
+ * of the {@link #getInitialWatermarkState}.
+ */
+ @NewWatermarkEstimator
+ public WatermarkEstimator<Instant> newWatermarkEstimator(
+ @WatermarkEstimatorState Instant state) {
+ return new WatermarkEstimators.Manual(state);
+ }
+
+ @ProcessElement
+ public void process(
+ @Element KV<Integer, List<RequestT>> element,
+ ManualWatermarkEstimator<Instant> estimator,
+ RestrictionTracker<OffsetRange, Integer> tracker,
+ MultiOutputReceiver receiver) {
+
+ int size = 0;
+ if (element.getValue() != null) {
+ size = element.getValue().size();
+ }
+
+ incIfPresent(inputElementsCounter, size);
+
+ if (element.getValue() == null || element.getValue().isEmpty()) {
+ return;
+ }
+
+ while (tracker.tryClaim(tracker.currentRestriction().getCurrent() + 1)) {
+ Instant nextEmittedTimestamp =
+
estimator.currentWatermark().plus(configuration.getMaximumRate().getInterval());
Review Comment:
State parameters are not allowed with ProcessElement methods of Splittable
DoFns: See ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L145-L158
vs ALLOWED_NON_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L127-L142.
https://gist.github.com/damondouglas/d4606679ceab30446b0225716ac2b8fd for
example throws
```
Exception in thread "main" java.lang.IllegalArgumentException:
StateNotCompatibleWithSDF, @ProcessElement process(ValueState, KV,
ManualWatermarkEstimator, RestrictionTracker, OutputReceiver): Illegal
parameter type: StateParameter{referent=StateDeclaration{id=some-state,
field=private final org.apache.beam.sdk.state.StateSpec
com.github.damondouglas.journal.StateNotCompatibleWithSDF.someStateSpec,
stateType=org.apache.beam.sdk.state.ValueState<java.lang.Long>},
alwaysFetched=false}
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2409)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.checkParameterOneOf(DoFnSignatures.java:1307)
at
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.analyzeProcessElementMethod(DoFnSignatures.java:1276)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]