kennknowles commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3382577640
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -3678,6 +3679,154 @@ public void processElement(
pipeline.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTestStream.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testTimerSideInput() {
+ // SideInput tag id
+ final String sideInputTag1 = "tag1";
+
+ final PCollectionView<Integer> sideInput =
+ pipeline
+ .apply("CreateSideInput1", Create.of(2))
+ .apply("ViewSideInput1", View.asSingleton());
+
+ DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+ new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("foo")
+ private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(@Timestamp Instant ts, @TimerId("timer") Timer
timer) {
+ timer.align(Duration.standardSeconds(10)).setRelative();
+ }
+
+ @OnTimer("timer")
+ public void onTimer(
+ OutputReceiver<KV<Integer, Integer>> o,
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+
+ @OnWindowExpiration
+ public void onWindowExpiration(
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ OutputReceiver<KV<Integer, Integer>> o,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+ };
+
+ final int numTestElements = 10;
+ final Instant now = new Instant(0);
+ TestStream.Builder<KV<Integer, Integer>> builder =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder =
+ builder.addElements(
+ TimestampedValue.of(KV.of(i % 2, i),
now.plus(Duration.millis(i * 1000))));
+ if ((i + 1) % 10 == 0) {
+ builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i +
1) * 1000)));
+ }
+ }
+ List<KV<Integer, Integer>> expected =
+ IntStream.rangeClosed(0, 1)
+ .boxed()
+ .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i,
2)).stream())
+ .collect(Collectors.toList());
+
+ PCollection<KV<Integer, Integer>> output =
+ pipeline
+ .apply(builder.advanceWatermarkToInfinity())
+ .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+ PAssert.that(output).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class
+ })
+ public void testSideInputNotReadyTimer() {
+ final String sideInputTag = "tag1";
+
+ // Create a side input that is delayed by 5 seconds using Thread.sleep
+ DoFn<KV<String, String>, String> delayFn =
+ new DoFn<KV<String, String>, String>() {
+ @ProcessElement
+ public void process(OutputReceiver<String> o) throws
InterruptedException {
+ Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
+ o.output("side-value");
+ }
+ };
Review Comment:
We should see about implementing it. Even if we implement the coalesce
points as sleep(15000) it would match this test's behavior while being a
starting point for robustness.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.SideInputReader;
+import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class SimpleParDoFnHelpersTest {
+ private PipelineOptions options;
+ @Mock DoFnInstanceManager doFnInstanceManager;
+ @Mock SideInputReader sideInputReader;
+ @Mock DataflowStepContext stepContext;
+ @Mock DataflowStepContext userStepContext;
+ @Mock DataflowOperationContext operationContext;
+ @Mock DoFnRunnerFactory<String, String> runnerFactory;
+ @Mock DoFnRunner<String, String> mockRunner;
+
+ @Mock StreamingSideInputProcessor<String, GlobalWindow> sideInputProcessor;
+
+ @Mock DoFnInfo<String, String> doFnInfo;
+ @Mock CounterFactory counterFactory;
+
+ private static class TestDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement() {}
+ }
+
+ private TestDoFn doFn = new TestDoFn();
+
+ private SimpleParDoFnHelpers<String, String, GlobalWindow> helpers;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ when(stepContext.namespacedToUser()).thenReturn(userStepContext);
+ when(operationContext.counterFactory()).thenReturn(counterFactory);
+
+ when(doFnInstanceManager.get()).thenReturn((DoFnInfo) doFnInfo);
+ when(doFnInfo.getDoFn()).thenReturn(doFn);
+
+ when(runnerFactory.createRunner(
+ any(), any(), any(), any(), any(), any(), any(), any(), any(),
any(), any(), any(),
+ any(), any()))
+ .thenReturn(mockRunner);
+
+ helpers =
+ new SimpleParDoFnHelpers<>(
+ options,
+ doFnInstanceManager,
+ sideInputReader,
+ new TupleTag<>("main"),
+ ImmutableMap.of(new TupleTag<>("main"), 0),
+ stepContext,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ ImmutableMap.of(),
+ runnerFactory);
+ }
+
+ @Test
+ public void testReallyStartBundle() throws Exception {
Review Comment:
What does this actually test? It looks like a bunch of mocking that is
almost just trying to read the code of the implementation for the presence of a
couple of calls. Does this add any value? Does it overly constrain
implementation details? (when I see mocking like this, I assume "no" / "yes"
respectively unless there's some strong argument).
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W
extends BoundedWindow>
+ implements ParDoFn {
+ private final StateTag<ValueState<K>> keyAddr;
+ private final Coder<InputT> inputCoder;
+ private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W>
helpers;
+ protected @Nullable StreamingSideInputProcessor<InputT, W>
sideInputProcessor;
+
+ StreamingKeyedWorkKitemSideInputParDoFn(
+ PipelineOptions options,
+ DoFnInstanceManager doFnInstanceManager,
+ SideInputReader sideInputReader,
+ TupleTag<OutputT> mainOutputTag,
+ Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+ DataflowExecutionContext.DataflowStepContext stepContext,
+ DataflowOperationContext operationContext,
+ DoFnSchemaInformation doFnSchemaInformation,
+ Map<String, PCollectionView<?>> sideInputMapping,
+ DoFnRunnerFactory runnerFactory,
+ Coder<K> keyCoder,
+ Coder<InputT> inputCoder) {
+ helpers =
+ new SimpleParDoFnHelpers<>(
+ options,
+ doFnInstanceManager,
+ sideInputReader,
+ mainOutputTag,
+ outputTupleTagsToReceiverIndices,
+ stepContext,
+ operationContext,
+ doFnSchemaInformation,
+ sideInputMapping,
+ runnerFactory);
+ this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key",
keyCoder));
+ this.inputCoder = inputCoder;
+ }
+
+ ValueState<K> keyValue() {
+ return
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+ }
+
+ @Override
+ public void startBundle(Receiver... receivers) throws Exception {
+ helpers.startBundle(receivers);
+ if (helpers.hasStreamingSideInput) {
+ // There is non-trivial setup that needs to be performed for watermark
propagation
+ // even on empty bundles.
+ helpers.reallyStartBundle();
+ onStartKey();
+ }
+ }
+
+ protected void onStartKey() {
+ if (helpers.hasStreamingSideInput) {
+ sideInputProcessor =
+ new StreamingSideInputProcessor<>(
+ new StreamingSideInputFetcher<InputT, W>(
+ helpers.fnInfo.getSideInputViews(),
+ inputCoder,
+ (WindowingStrategy<?, W>)
helpers.fnInfo.getWindowingStrategy(),
+ (StreamingModeExecutionContext.StreamingModeStepContext)
+ helpers.userStepContext));
+ }
+
+ if (sideInputProcessor != null) {
+ boolean hasState = helpers.hasState();
+
+ // TODO(relax): We should be able to get this without writing it to
state!
+ K key = keyValue().read();
+
+ sideInputProcessor.tryUnblockElementsAndTimers(
+ (unblockedElements, unblockedTimers) -> {
+ if (!Iterables.isEmpty(unblockedElements) ||
!Iterables.isEmpty(unblockedTimers)) {
+ helpers.fnRunner.processElement(
+ new ValueInEmptyWindows<>(
+ KeyedWorkItems.workItem(key, unblockedTimers,
unblockedElements)));
+ }
+ if (hasState) {
+ List<W> windows =
+ (List<W>)
+ StreamSupport.stream(unblockedElements.spliterator(),
false)
+ .flatMap(wv -> wv.getWindows().stream())
+ .collect(Collectors.toList());
+ // These elements are now processed. Register cleanup timers for
all the unblocked
+ // windows.
+ helpers.registerStateCleanup(
+ (WindowingStrategy<?, W>)
getDoFnInfo().getWindowingStrategy(), windows);
+ }
+ });
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(Object untypedElem) throws Exception {
+ if (helpers.fnRunner == null) {
+ // If we need to run reallyStartBundle in here, we need to make sure to
switch the state
+ // sampler into the start state.
+ try (Closeable start = helpers.operationContext.enterStart()) {
+ helpers.reallyStartBundle();
+ onStartKey();
+ }
+ }
+ helpers.outputsPerElementTracker.onProcessElement();
+
+ WindowedValue<KeyedWorkItem<K, InputT>> elem =
+ (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
+ onProcessWindowedValue(elem);
+
+ helpers.outputsPerElementTracker.onProcessElementSuccess();
+ }
+
+ @Override
+ public void processTimers() throws Exception {
+
+ // Note: We need to get windowCoder to decode the timers. If we haven't
already deserialized
+ // the fnInfo, we peek at a new instance to retrieve that. If this extra
deserialization becomes
+ // excessively costly, we could either (1) have the DoFnInstanceManager
remember the associated
+ // windowCoder (allowing us to get it without a DoFnInfo instance) or (2)
check whether timers
+ // exist without actually decoding them.
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>)
+ (helpers.fnInfo != null ? helpers.fnInfo :
helpers.doFnInstanceManager.peek())
+ .getWindowingStrategy()
+ .getWindowFn()
+ .windowCoder();
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.FAIL_USER,
+ helpers.userStepContext,
+ windowCoder,
+ this::onStartKey,
+ sideInputProcessor);
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.SYSTEM,
+ helpers.stepContext,
+ windowCoder,
+ this::onStartKey,
+ sideInputProcessor);
+ }
+
+ @Override
+ public void finishBundle() throws Exception {
+ helpers.finishBundle(sideInputProcessor);
+ this.sideInputProcessor = null;
+ }
+
+ @Override
+ public void abort() throws Exception {
+ helpers.abort();
+ }
+
+ protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K,
InputT>> elem) {
+ // TODO: Get rid of this!
+ final K key = elem.getValue().key();
+ keyValue().write(key);
+
+ boolean hasState = helpers.hasState();
+ Collection<W> windowsProcessed;
+ if (sideInputProcessor != null) {
+ windowsProcessed = hasState ? Lists.newArrayList() :
Collections.emptyList();
+ KeyedWorkItem<K, InputT> unblocked =
sideInputProcessor.handleProcessKeyedWorkItem(elem);
+ if (!Iterables.isEmpty(unblocked.elementsIterable())
+ || !Iterables.isEmpty(unblocked.timersIterable())) {
+ helpers.fnRunner.processElement(elem.withValue(unblocked));
+ }
+ if (hasState) {
+ windowsProcessed =
+ (Collection<W>)
+
StreamSupport.stream(unblocked.elementsIterable().spliterator(), false)
+ .flatMap(wv -> wv.getWindows().stream())
+ .collect(Collectors.toList());
+ }
+ } else {
+ helpers.fnRunner.processElement(elem);
+ windowsProcessed = (Collection<W>) elem.getWindows();
+ }
Review Comment:
Is this accurate? The windows for the elems are in `.elementsIterable()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]