gemini-code-assist[bot] commented on code in PR #38363: URL: https://github.com/apache/beam/pull/38363#discussion_r3184172218
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Helper class for handling elements blocked on side inputs. */ +class StreamingSideInputProcessor<InputT, W extends BoundedWindow> { + private final StreamingSideInputFetcher<InputT, W> sideInputFetcher; + + public StreamingSideInputProcessor(StreamingSideInputFetcher<InputT, W> sideInputFetcher) { + this.sideInputFetcher = sideInputFetcher; + } + + /** + * Handle's startBundle. If there are unblocked elements, process them and then return the set of + * windows that were unblocked. + */ + Iterator<WindowedValue<InputT>> tryUnblockElements() { + sideInputFetcher.prefetchBlockedMap(); + + // Find the set of ready windows. + Set<W> readyWindows = sideInputFetcher.getReadyWindows(); + + Iterable<BagState<WindowedValue<InputT>>> elementsBags = + sideInputFetcher.prefetchElements(readyWindows); + + // Return a lazy iterator to the released elements. This is a destructive iterator - it clears + // the bags after reading them. Bags can be paged in from the service, so we try to avoid + // materializing the whole + // bag into memory here. + Iterator<WindowedValue<InputT>> releasedElements = + new Iterator<WindowedValue<InputT>>() { + Iterator<BagState<WindowedValue<InputT>>> bagsIterator = elementsBags.iterator(); + @Nullable Iterator<WindowedValue<InputT>> currentBagElements; + @Nullable BagState<WindowedValue<InputT>> currentBag; + + @Override + public boolean hasNext() { + do { + if (currentBagElements == null || !currentBagElements.hasNext()) { + if (!advanceBag()) { + return false; + } + } + } while (!org.apache.beam.sdk.util.Preconditions.checkStateNotNull(currentBagElements) + .hasNext()); + return true; + } + + boolean advanceBag() { + // Once we finish reading a bag, clear it. + clearCurrentBag(); + if (bagsIterator.hasNext()) { + currentBag = bagsIterator.next(); + currentBagElements = currentBag.read().iterator(); + return true; + } else { + return false; + } + } + + void clearCurrentBag() { + if (currentBag != null) { + currentBag.clear(); + currentBag = null; + currentBagElements = null; + } + } + + @Override + public WindowedValue<InputT> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return org.apache.beam.sdk.util.Preconditions.checkStateNotNull(currentBagElements) + .next(); + } + }; + + sideInputFetcher.releaseBlockedWindows(readyWindows); + return releasedElements; + } + + void handleFinishBundle() { + sideInputFetcher.persist(); + } + + /* + Handle process element. Runs the elements that have an available side input, and buffers elements for which the + side input is blocked. Returns the list of elements that are unblocked and should be processed. + */ + Iterator<? extends WindowedValue<InputT>> handleProcessElement( + WindowedValue<InputT> compressedElem) { + // Note: We could write this as a three-line stream expression, but side effects are discouraged + // in Java streams. + return Iterators.filter( + compressedElem.explodeWindows().iterator(), + (WindowedValue<InputT> e) -> !sideInputFetcher.storeIfBlocked(e)); + } + + void handleProcessTimer(TimerInternals.TimerData timer) { + // We must call this to ensure the side-input is cached for the timer. However since a user + // timer can only + // be set via element processing (or another timer) in the same window, the window should be + // unblocked once + // we get here. + Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer)); Review Comment:  The use of `Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer))` assumes that side inputs are always ready when a timer fires. While this is generally true for user timers (which are set after an element is processed), it relies on the runner's guarantee that cleanup timers are only set after unblocking. If this assumption is ever violated, it will lead to an `IllegalStateException` rather than buffering the timer. Consider if it's safer to handle the blocked case by returning a boolean and letting the caller decide, or if this strict assertion is the intended behavior for the Dataflow runner. ########## sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java: ########## @@ -3678,6 +3679,154 @@ public void processElement( pipeline.run(); } + @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesSideInputs.class, + UsesSideInputsInTimer.class, + UsesTestStream.class, + UsesTimersInParDo.class, + UsesTriggeredSideInputs.class, + UsesOnWindowExpiration.class + }) + public void testTimerSideInput() { + // SideInput tag id + final String sideInputTag1 = "tag1"; + + final PCollectionView<Integer> sideInput = + pipeline + .apply("CreateSideInput1", Create.of(2)) + .apply("ViewSideInput1", View.asSingleton()); + + DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn = + new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() { + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @StateId("foo") + private final StateSpec<ValueState<Integer>> stateSpec = StateSpecs.value(); + + @ProcessElement + public void process(@Timestamp Instant ts, @TimerId("timer") Timer timer) { + timer.align(Duration.standardSeconds(10)).setRelative(); + } + + @OnTimer("timer") + public void onTimer( + OutputReceiver<KV<Integer, Integer>> o, + @DoFn.SideInput(sideInputTag1) Integer sideInput, + @Key Integer key) { + o.output(KV.of(key, sideInput)); + } + + @OnWindowExpiration + public void onWindowExpiration( + @DoFn.SideInput(sideInputTag1) Integer sideInput, + OutputReceiver<KV<Integer, Integer>> o, + @Key Integer key) { + o.output(KV.of(key, sideInput)); + } + }; + + final int numTestElements = 10; + final Instant now = new Instant(0); + TestStream.Builder<KV<Integer, Integer>> builder = + TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)); + + for (int i = 0; i < numTestElements; i++) { + builder = + builder.addElements( + TimestampedValue.of(KV.of(i % 2, i), now.plus(Duration.millis(i * 1000)))); + if ((i + 1) % 10 == 0) { + builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i + 1) * 1000))); + } + } + List<KV<Integer, Integer>> expected = + IntStream.rangeClosed(0, 1) + .boxed() + .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i, 2)).stream()) + .collect(Collectors.toList()); + + PCollection<KV<Integer, Integer>> output = + pipeline + .apply(builder.advanceWatermarkToInfinity()) + .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput)); + PAssert.that(output).containsInAnyOrder(expected); + pipeline.run(); + } + + @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesSideInputs.class, + UsesSideInputsInTimer.class, + UsesTimersInParDo.class, + UsesTriggeredSideInputs.class + }) + public void testSideInputNotReadyTimer() { + final String sideInputTag = "tag1"; + + // Create a side input that is delayed by 5 seconds using Thread.sleep + DoFn<KV<String, String>, String> delayFn = + new DoFn<KV<String, String>, String>() { + @ProcessElement + public void process(OutputReceiver<String> o) throws InterruptedException { + Thread.sleep(java.time.Duration.ofSeconds(15).toMillis()); Review Comment:  Using `Thread.sleep` with a duration of 15 seconds in a test can lead to slow build times and potential flakiness in CI environments. While it's used here to simulate side input delay, consider if there's a more deterministic way to test this behavior, perhaps by using `TestStream` with multiple bundles or a custom `PCollectionView` that allows controlling readiness without blocking the thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
