gemini-code-assist[bot] commented on code in PR #38363: URL: https://github.com/apache/beam/pull/38363#discussion_r3357296557
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java: ########## @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.io.Closeable; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; +import org.apache.beam.runners.dataflow.worker.counters.CounterName; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +class SimpleParDoFnHelpers<InputT, OutputT, W extends BoundedWindow> { + private static final Logger LOG = LoggerFactory.getLogger(SimpleParDoFnHelpers.class); + + // TODO: Remove once Distributions has shipped. + @VisibleForTesting + static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = "outputs_per_element_counter"; + + private static final String COUNTER_NAME = "per-element-output-count"; + + final PipelineOptions options; + final DoFnInstanceManager doFnInstanceManager; + + private final SideInputReader sideInputReader; + final DataflowOperationContext operationContext; + private final TupleTag<OutputT> mainOutputTag; + private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices; + private final List<TupleTag<?>> sideOutputTags; + final DataflowExecutionContext.DataflowStepContext stepContext; + final DataflowExecutionContext.DataflowStepContext userStepContext; + private final CounterFactory counterFactory; + private final DoFnRunnerFactory runnerFactory; + final boolean hasStreamingSideInput; + final OutputsPerElementTracker outputsPerElementTracker; + private final DoFnSchemaInformation doFnSchemaInformation; + private final Map<String, PCollectionView<?>> sideInputMapping; + + // Various DoFn helpers, null between bundles + @Nullable DoFnRunner<InputT, OutputT> fnRunner; + @Nullable DoFnInfo<InputT, OutputT> fnInfo; + private Receiver @Nullable [] receivers; + + // This may additionally be null if it is not a real DoFn but an OldDoFn or + // GroupAlsoByWindowViaWindowSetDoFn + protected @Nullable DoFnSignature fnSignature; + + SimpleParDoFnHelpers( + PipelineOptions options, + DoFnInstanceManager doFnInstanceManager, + SideInputReader sideInputReader, + TupleTag<OutputT> mainOutputTag, + Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices, + DataflowExecutionContext.DataflowStepContext stepContext, + DataflowOperationContext operationContext, + DoFnSchemaInformation doFnSchemaInformation, + Map<String, PCollectionView<?>> sideInputMapping, + DoFnRunnerFactory runnerFactory) { + this.options = options; + this.doFnInstanceManager = doFnInstanceManager; + + // We vend a freshly deserialized version for each run + this.sideInputReader = sideInputReader; + this.operationContext = operationContext; + checkArgument(!outputTupleTagsToReceiverIndices.isEmpty(), "expected at least one output"); + this.mainOutputTag = mainOutputTag; + this.outputTupleTagsToReceiverIndices = outputTupleTagsToReceiverIndices; + ImmutableList.Builder<TupleTag<?>> sideOutputTagsBuilder = ImmutableList.builder(); + for (TupleTag<?> tag : outputTupleTagsToReceiverIndices.keySet()) { + if (!mainOutputTag.equals(tag)) { + sideOutputTagsBuilder.add(tag); + } + } + this.sideOutputTags = sideOutputTagsBuilder.build(); + this.stepContext = stepContext; + + // StepContext provides a TimerInternals and StateInternals for use by the system - this class. + // For the user, we request a user-scoped StepContext to provide a user-scoped + // StateInternals and TimerInternals. + this.userStepContext = stepContext.namespacedToUser(); + + this.counterFactory = operationContext.counterFactory(); + this.runnerFactory = runnerFactory; + this.hasStreamingSideInput = + options.as(StreamingOptions.class).isStreaming() && !sideInputReader.isEmpty(); + this.outputsPerElementTracker = createOutputsPerElementTracker(); + this.doFnSchemaInformation = doFnSchemaInformation; + this.sideInputMapping = sideInputMapping; + } + + boolean hasState() { + return fnSignature != null && !fnSignature.stateDeclarations().isEmpty(); + } + + void startBundle(Receiver... receivers) throws Exception { + checkArgument( + receivers.length == outputTupleTagsToReceiverIndices.size(), + "unexpected number of receivers for DoFn"); + + this.receivers = receivers; + } + + void reallyStartBundle() throws Exception { + checkState(fnRunner == null, "bundle already started (or not properly finished)"); + + WindowedValueMultiReceiver outputManager = + new WindowedValueMultiReceiver() { + final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new HashMap<>(); + + private @Nullable Receiver getReceiverOrNull(TupleTag<?> tag) { + Integer receiverIndex = outputTupleTagsToReceiverIndices.get(tag); + if (receiverIndex != null) { + return receivers[receiverIndex]; + } else { + return undeclaredOutputs.get(tag); + } + } + + @Override + public <TagT> void output(TupleTag<TagT> tag, WindowedValue<TagT> output) { + outputsPerElementTracker.onOutput(); + Receiver receiver = getReceiverOrNull(tag); + if (receiver == null) { + // A new undeclared output. + // TODO: plumb through the operationName, so that we can + // name implicit outputs after it. + String outputName = "implicit-" + tag.getId(); + // TODO: plumb through the counter prefix, so we can + // make it available to the OutputReceiver class in case + // it wants to use it in naming output counterFactory. (It + // doesn't today.) + OutputReceiver undeclaredReceiver = new OutputReceiver(); + + ElementCounter outputCounter = + new DataflowOutputCounter( + outputName, counterFactory, stepContext.getNameContext()); + undeclaredReceiver.addOutputCounter(outputCounter); + undeclaredOutputs.put(tag, undeclaredReceiver); + receiver = undeclaredReceiver; + } + + try { + receiver.process(output); + } catch (RuntimeException | Error e) { + // Rethrow unchecked exceptions as-is to avoid excessive nesting + // via a chain of DoFn's. + throw e; + } catch (Exception e) { + // This should never happen in practice with DoFn's, but can happen + // with other Receivers. + throw new RuntimeException(e); + } + } + }; + fnInfo = (DoFnInfo) doFnInstanceManager.get(); + fnSignature = DoFnSignatures.getSignature(fnInfo.getDoFn().getClass()); + + fnRunner = + runnerFactory.createRunner( + fnInfo.getDoFn(), + options, + mainOutputTag, + sideOutputTags, + fnInfo.getSideInputViews(), + sideInputReader, + fnInfo.getInputCoder(), + fnInfo.getOutputCoders(), + fnInfo.getWindowingStrategy(), + stepContext, + userStepContext, + outputManager, + doFnSchemaInformation, + sideInputMapping); + fnRunner.startBundle(); + } + + void finishBundle(StreamingSideInputProcessor<?, ?> sideInputProcessor) throws Exception { + if (fnRunner != null) { + fnRunner.finishBundle(); + if (sideInputProcessor != null) { + sideInputProcessor.handleFinishBundle(); + } + doFnInstanceManager.complete(fnInfo); + fnRunner = null; + fnInfo = null; + fnSignature = null; + sideInputProcessor = null; + } + } + + void abort() throws Exception { + doFnInstanceManager.abort(fnInfo); + fnRunner = null; + fnInfo = null; + } + + @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer"; + + enum TimerType { + USER { + @Override + public void processTimer( + SimpleParDoFnHelpers doFn, + TimerInternals.TimerData timer, + StreamingSideInputProcessor sideInputProcessor) + throws Exception { + doFn.processUserTimer(timer, sideInputProcessor); + } + }, + FAIL_USER { + @Override + public void processTimer( + SimpleParDoFnHelpers doFn, + TimerInternals.TimerData timer, + StreamingSideInputProcessor sideInputProcessor) + throws Exception { + throw new UnsupportedOperationException( + "Attempt to deliver a timer to a DoFn, but timers are not supported here."); + } + }, + SYSTEM { + @Override + public void processTimer( + SimpleParDoFnHelpers doFn, + TimerInternals.TimerData timer, + StreamingSideInputProcessor sideInputProcessor) + throws Exception { + doFn.processSystemTimer(timer, sideInputProcessor); + } + }; + + public abstract void processTimer( + SimpleParDoFnHelpers doFn, + TimerInternals.TimerData timer, + StreamingSideInputProcessor sideInputProcessor) + throws Exception; + }; + + void processTimers( + TimerType mode, + DataflowExecutionContext.DataflowStepContext context, + Coder<BoundedWindow> windowCoder, + Runnable startKey, + StreamingSideInputProcessor<?, ?> sideInputProcessor) + throws Exception { + TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder); + + if (timer != null && fnRunner == null) { + // If we need to run reallyStartBundle in here, we need to make sure to switch the state + // sampler into the start state. + try (Closeable start = operationContext.enterStart()) { + reallyStartBundle(); + startKey.run(); + } + } + + while (timer != null) { + mode.processTimer(this, timer, sideInputProcessor); + timer = context.getNextFiredTimer(windowCoder); + } + } Review Comment:  Because Java is pass-by-value, passing `sideInputProcessor` directly to `processTimers` means that if `fnRunner` is initially `null`, the `sideInputProcessor` argument is evaluated as `null`. Even though `startKey.run()` (which calls `onStartKey()`) subsequently initializes `sideInputProcessor` on the caller side, the local parameter `sideInputProcessor` inside `processTimers` remains `null`. As a result, `mode.processTimer` will receive `null` and the side input checks for timers will be bypassed.\n\nTo fix this, pass a `java.util.function.Supplier<StreamingSideInputProcessor<?, ?>>` instead of the processor directly. ```java void processTimers(\n TimerType mode,\n DataflowExecutionContext.DataflowStepContext context,\n Coder<BoundedWindow> windowCoder,\n Runnable startKey,\n java.util.function.Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessorSupplier)\n throws Exception {\n TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);\n\n if (timer != null && fnRunner == null) {\n // If we need to run reallyStartBundle in here, we need to make sure to switch the state\n // sampler into the start state.\n try (Closeable start = operationContext.enterStart()) {\n reallyStartBundle();\n startKey.run();\n }\n }\n\n while (timer != null) {\n mode.processTimer(this, timer, sideInputProcessorSupplier.get());\n timer = context.getNextFiredTimer(windowCoder);\n }\n } ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java: ########## @@ -355,180 +177,33 @@ public void processTimers() throws Exception { // exist without actually decoding them. Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) - (fnInfo != null ? fnInfo : doFnInstanceManager.peek()) + (helpers.fnInfo != null ? helpers.fnInfo : helpers.doFnInstanceManager.peek()) .getWindowingStrategy() .getWindowFn() .windowCoder(); - processTimers(TimerType.USER, userStepContext, windowCoder); - processTimers(TimerType.SYSTEM, stepContext, windowCoder); - } - - private void processUserTimer(TimerData timer) throws Exception { - if (fnSignature.timerDeclarations().containsKey(timer.getTimerId()) - || fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) { - BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow(); - fnRunner.onTimer( - timer.getTimerId(), - timer.getTimerFamilyId(), - this.stepContext.stateInternals().getKey(), - window, - timer.getTimestamp(), - timer.getOutputTimestamp(), - timer.getDomain(), - timer.causedByDrain()); - } - } - - private void processSystemTimer(TimerData timer) throws Exception { - - // Timer owned by this class, for cleaning up state in expired windows - if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) { - checkState( - timer.getDomain().equals(TimeDomain.EVENT_TIME), - "%s received cleanup timer with domain not EVENT_TIME: %s", - this, - timer); - - checkState( - timer.getNamespace() instanceof WindowNamespace, - "%s received cleanup timer not for a %s: %s", - this, - WindowNamespace.class.getSimpleName(), - timer); - - BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow(); - Instant targetTime = earliestAllowableCleanupTime(window, fnInfo.getWindowingStrategy()); - - checkState( - !targetTime.isAfter(timer.getTimestamp()), - "%s received state cleanup timer for window %s " - + " that is before the appropriate cleanup time %s", - this, - window, - targetTime); - - fnRunner.onWindowExpiration( - window, timer.getOutputTimestamp(), this.stepContext.stateInternals().getKey()); - - // This is for a timer for a window that is expired, so clean it up. - for (StateDeclaration stateDecl : fnSignature.stateDeclarations().values()) { - StateTag<?> tag; - try { - tag = - StateTags.tagForSpec( - stateDecl.id(), (StateSpec) stateDecl.field().get(fnInfo.getDoFn())); - } catch (IllegalAccessException e) { - throw new RuntimeException( - String.format( - "Error accessing %s for %s", - StateSpec.class.getName(), fnInfo.getDoFn().getClass().getName()), - e); - } - - StateInternals stateInternals = userStepContext.stateInternals(); - org.apache.beam.sdk.state.State state = stateInternals.state(timer.getNamespace(), tag); - state.clear(); - } - } + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.USER, + helpers.userStepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.SYSTEM, + helpers.stepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); Review Comment:  Change the `sideInputProcessor` argument to a lambda supplier `() -> sideInputProcessor` to avoid passing a stale `null` reference when the bundle is started lazily during timer processing. ```java helpers.processTimers(\n SimpleParDoFnHelpers.TimerType.USER,\n helpers.userStepContext,\n windowCoder,\n this::onStartKey,\n () -> sideInputProcessor);\n helpers.processTimers(\n SimpleParDoFnHelpers.TimerType.SYSTEM,\n helpers.stepContext,\n windowCoder,\n this::onStartKey,\n () -> sideInputProcessor); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.api.client.util.Lists; +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */ +public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W extends BoundedWindow> + implements ParDoFn { + private final StateTag<ValueState<K>> keyAddr; + private final Coder<InputT> inputCoder; + private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> helpers; + protected @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor; + + StreamingKeyedWorkKitemSideInputParDoFn( + PipelineOptions options, + DoFnInstanceManager doFnInstanceManager, + SideInputReader sideInputReader, + TupleTag<OutputT> mainOutputTag, + Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices, + DataflowExecutionContext.DataflowStepContext stepContext, + DataflowOperationContext operationContext, + DoFnSchemaInformation doFnSchemaInformation, + Map<String, PCollectionView<?>> sideInputMapping, + DoFnRunnerFactory runnerFactory, + Coder<K> keyCoder, + Coder<InputT> inputCoder) { + helpers = + new SimpleParDoFnHelpers<>( + options, + doFnInstanceManager, + sideInputReader, + mainOutputTag, + outputTupleTagsToReceiverIndices, + stepContext, + operationContext, + doFnSchemaInformation, + sideInputMapping, + runnerFactory); + this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", keyCoder)); + this.inputCoder = inputCoder; + } + + ValueState<K> keyValue() { + return helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr); + } + + @Override + public void startBundle(Receiver... receivers) throws Exception { + helpers.startBundle(receivers); + if (helpers.hasStreamingSideInput) { + // There is non-trivial setup that needs to be performed for watermark propagation + // even on empty bundles. + helpers.reallyStartBundle(); + onStartKey(); + } + } + + protected void onStartKey() { + if (helpers.hasStreamingSideInput) { + sideInputProcessor = + new StreamingSideInputProcessor<>( + new StreamingSideInputFetcher<InputT, W>( + helpers.fnInfo.getSideInputViews(), + inputCoder, + (WindowingStrategy<?, W>) helpers.fnInfo.getWindowingStrategy(), + (StreamingModeExecutionContext.StreamingModeStepContext) + helpers.userStepContext)); + } + + if (sideInputProcessor != null) { + boolean hasState = helpers.hasState(); + + // TODO(relax): We should be able to get this without writing it to state! + K key = keyValue().read(); + + sideInputProcessor.tryUnblockElementsAndTimers( + (unblockedElements, unblockedTimers) -> { + if (!Iterables.isEmpty(unblockedElements) || !Iterables.isEmpty(unblockedTimers)) { + helpers.fnRunner.processElement( + new ValueInEmptyWindows<>( + KeyedWorkItems.workItem(key, unblockedTimers, unblockedElements))); + } + if (hasState) { + List<W> windows = + (List<W>) + StreamSupport.stream(unblockedElements.spliterator(), false) + .flatMap(wv -> wv.getWindows().stream()) + .collect(Collectors.toList()); + // These elements are now processed. Register cleanup timers for all the unblocked + // windows. + helpers.registerStateCleanup( + (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(), windows); + } + }); + } + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(Object untypedElem) throws Exception { + if (helpers.fnRunner == null) { + // If we need to run reallyStartBundle in here, we need to make sure to switch the state + // sampler into the start state. + try (Closeable start = helpers.operationContext.enterStart()) { + helpers.reallyStartBundle(); + onStartKey(); + } + } + helpers.outputsPerElementTracker.onProcessElement(); + + WindowedValue<KeyedWorkItem<K, InputT>> elem = + (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem; + onProcessWindowedValue(elem); + + helpers.outputsPerElementTracker.onProcessElementSuccess(); + } + + @Override + public void processTimers() throws Exception { + + // Note: We need to get windowCoder to decode the timers. If we haven't already deserialized + // the fnInfo, we peek at a new instance to retrieve that. If this extra deserialization becomes + // excessively costly, we could either (1) have the DoFnInstanceManager remember the associated + // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) check whether timers + // exist without actually decoding them. + Coder<BoundedWindow> windowCoder = + (Coder<BoundedWindow>) + (helpers.fnInfo != null ? helpers.fnInfo : helpers.doFnInstanceManager.peek()) + .getWindowingStrategy() + .getWindowFn() + .windowCoder(); + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.FAIL_USER, + helpers.userStepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.SYSTEM, + helpers.stepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); Review Comment:  Change the `sideInputProcessor` argument to a lambda supplier `() -> sideInputProcessor` to avoid passing a stale `null` reference when the bundle is started lazily during timer processing. ```java helpers.processTimers(\n SimpleParDoFnHelpers.TimerType.FAIL_USER,\n helpers.userStepContext,\n windowCoder,\n this::onStartKey,\n () -> sideInputProcessor);\n helpers.processTimers(\n SimpleParDoFnHelpers.TimerType.SYSTEM,\n helpers.stepContext,\n windowCoder,\n this::onStartKey,\n () -> sideInputProcessor); ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.api.client.util.Lists; +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */ +public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W extends BoundedWindow> + implements ParDoFn { + private final StateTag<ValueState<K>> keyAddr; + private final Coder<InputT> inputCoder; + private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> helpers; + protected @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor; + + StreamingKeyedWorkKitemSideInputParDoFn( + PipelineOptions options, + DoFnInstanceManager doFnInstanceManager, + SideInputReader sideInputReader, + TupleTag<OutputT> mainOutputTag, + Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices, + DataflowExecutionContext.DataflowStepContext stepContext, + DataflowOperationContext operationContext, + DoFnSchemaInformation doFnSchemaInformation, + Map<String, PCollectionView<?>> sideInputMapping, + DoFnRunnerFactory runnerFactory, + Coder<K> keyCoder, + Coder<InputT> inputCoder) { + helpers = + new SimpleParDoFnHelpers<>( + options, + doFnInstanceManager, + sideInputReader, + mainOutputTag, + outputTupleTagsToReceiverIndices, + stepContext, + operationContext, + doFnSchemaInformation, + sideInputMapping, + runnerFactory); + this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", keyCoder)); + this.inputCoder = inputCoder; + } + + ValueState<K> keyValue() { + return helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr); + } + + @Override + public void startBundle(Receiver... receivers) throws Exception { + helpers.startBundle(receivers); + if (helpers.hasStreamingSideInput) { + // There is non-trivial setup that needs to be performed for watermark propagation + // even on empty bundles. + helpers.reallyStartBundle(); + onStartKey(); + } + } + + protected void onStartKey() { + if (helpers.hasStreamingSideInput) { + sideInputProcessor = + new StreamingSideInputProcessor<>( + new StreamingSideInputFetcher<InputT, W>( + helpers.fnInfo.getSideInputViews(), + inputCoder, + (WindowingStrategy<?, W>) helpers.fnInfo.getWindowingStrategy(), + (StreamingModeExecutionContext.StreamingModeStepContext) + helpers.userStepContext)); + } + + if (sideInputProcessor != null) { + boolean hasState = helpers.hasState(); + + // TODO(relax): We should be able to get this without writing it to state! + K key = keyValue().read(); + + sideInputProcessor.tryUnblockElementsAndTimers( + (unblockedElements, unblockedTimers) -> { + if (!Iterables.isEmpty(unblockedElements) || !Iterables.isEmpty(unblockedTimers)) { + helpers.fnRunner.processElement( + new ValueInEmptyWindows<>( + KeyedWorkItems.workItem(key, unblockedTimers, unblockedElements))); + } Review Comment:  If `key` is null when there are unblocked elements or timers, calling `KeyedWorkItems.workItem(key, ...)` will likely result in a `NullPointerException` or invalid state downstream. It is safer to enforce that `key` is non-null using `Objects.requireNonNull`. ```java sideInputProcessor.tryUnblockElementsAndTimers(\n (unblockedElements, unblockedTimers) -> {\n if (!Iterables.isEmpty(unblockedElements) || !Iterables.isEmpty(unblockedTimers)) {\n java.util.Objects.requireNonNull(key, \"Key is null but there are unblocked elements or timers\");\n helpers.fnRunner.processElement(\n new ValueInEmptyWindows<>(\n KeyedWorkItems.workItem(key, unblockedTimers, unblockedElements)));\n } ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.api.client.util.Lists; +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */ +public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W extends BoundedWindow> + implements ParDoFn { + private final StateTag<ValueState<K>> keyAddr; + private final Coder<InputT> inputCoder; + private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> helpers; + protected @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor; + + StreamingKeyedWorkKitemSideInputParDoFn( + PipelineOptions options, + DoFnInstanceManager doFnInstanceManager, + SideInputReader sideInputReader, + TupleTag<OutputT> mainOutputTag, + Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices, + DataflowExecutionContext.DataflowStepContext stepContext, + DataflowOperationContext operationContext, + DoFnSchemaInformation doFnSchemaInformation, + Map<String, PCollectionView<?>> sideInputMapping, + DoFnRunnerFactory runnerFactory, + Coder<K> keyCoder, + Coder<InputT> inputCoder) { + helpers = + new SimpleParDoFnHelpers<>( + options, + doFnInstanceManager, + sideInputReader, + mainOutputTag, + outputTupleTagsToReceiverIndices, + stepContext, + operationContext, + doFnSchemaInformation, + sideInputMapping, + runnerFactory); + this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", keyCoder)); + this.inputCoder = inputCoder; + } + + ValueState<K> keyValue() { + return helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr); + } + + @Override + public void startBundle(Receiver... receivers) throws Exception { + helpers.startBundle(receivers); + if (helpers.hasStreamingSideInput) { + // There is non-trivial setup that needs to be performed for watermark propagation + // even on empty bundles. + helpers.reallyStartBundle(); + onStartKey(); + } + } + + protected void onStartKey() { + if (helpers.hasStreamingSideInput) { + sideInputProcessor = + new StreamingSideInputProcessor<>( + new StreamingSideInputFetcher<InputT, W>( + helpers.fnInfo.getSideInputViews(), + inputCoder, + (WindowingStrategy<?, W>) helpers.fnInfo.getWindowingStrategy(), + (StreamingModeExecutionContext.StreamingModeStepContext) + helpers.userStepContext)); + } + + if (sideInputProcessor != null) { + boolean hasState = helpers.hasState(); + + // TODO(relax): We should be able to get this without writing it to state! + K key = keyValue().read(); + + sideInputProcessor.tryUnblockElementsAndTimers( + (unblockedElements, unblockedTimers) -> { + if (!Iterables.isEmpty(unblockedElements) || !Iterables.isEmpty(unblockedTimers)) { + helpers.fnRunner.processElement( + new ValueInEmptyWindows<>( + KeyedWorkItems.workItem(key, unblockedTimers, unblockedElements))); + } + if (hasState) { + List<W> windows = + (List<W>) + StreamSupport.stream(unblockedElements.spliterator(), false) + .flatMap(wv -> wv.getWindows().stream()) + .collect(Collectors.toList()); + // These elements are now processed. Register cleanup timers for all the unblocked + // windows. + helpers.registerStateCleanup( + (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(), windows); + } + }); + } + } + + @Override + @SuppressWarnings("unchecked") + public void processElement(Object untypedElem) throws Exception { + if (helpers.fnRunner == null) { + // If we need to run reallyStartBundle in here, we need to make sure to switch the state + // sampler into the start state. + try (Closeable start = helpers.operationContext.enterStart()) { + helpers.reallyStartBundle(); + onStartKey(); + } + } + helpers.outputsPerElementTracker.onProcessElement(); + + WindowedValue<KeyedWorkItem<K, InputT>> elem = + (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem; + onProcessWindowedValue(elem); + + helpers.outputsPerElementTracker.onProcessElementSuccess(); + } + + @Override + public void processTimers() throws Exception { + + // Note: We need to get windowCoder to decode the timers. If we haven't already deserialized + // the fnInfo, we peek at a new instance to retrieve that. If this extra deserialization becomes + // excessively costly, we could either (1) have the DoFnInstanceManager remember the associated + // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) check whether timers + // exist without actually decoding them. + Coder<BoundedWindow> windowCoder = + (Coder<BoundedWindow>) + (helpers.fnInfo != null ? helpers.fnInfo : helpers.doFnInstanceManager.peek()) + .getWindowingStrategy() + .getWindowFn() + .windowCoder(); + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.FAIL_USER, + helpers.userStepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); + helpers.processTimers( + SimpleParDoFnHelpers.TimerType.SYSTEM, + helpers.stepContext, + windowCoder, + this::onStartKey, + sideInputProcessor); + } + + @Override + public void finishBundle() throws Exception { + helpers.finishBundle(sideInputProcessor); + this.sideInputProcessor = null; + } + + @Override + public void abort() throws Exception { + helpers.abort(); + } + + protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K, InputT>> elem) { + // TODO: Get rid of this! + final K key = elem.getValue().key(); + keyValue().write(key); + + boolean hasState = helpers.hasState(); + Collection<W> windowsProcessed; + if (sideInputProcessor != null) { + windowsProcessed = hasState ? Lists.newArrayList() : Collections.emptyList(); + KeyedWorkItem<K, InputT> unblocked = sideInputProcessor.handleProcessKeyedWorkItem(elem); + if (!Iterables.isEmpty(unblocked.elementsIterable()) + || !Iterables.isEmpty(unblocked.timersIterable())) { + helpers.fnRunner.processElement(elem.withValue(unblocked)); + } + if (hasState) { + windowsProcessed = + (Collection<W>) + StreamSupport.stream(unblocked.elementsIterable().spliterator(), false) + .flatMap(wv -> wv.getWindows().stream()) + .collect(Collectors.toList()); + } + } else { + helpers.fnRunner.processElement(elem); + windowsProcessed = (Collection<W>) elem.getWindows(); + } Review Comment:  In the `else` branch, `elem` is of type `WindowedValue<KeyedWorkItem<K, InputT>>`. Calling `elem.getWindows()` will return the windows of the `KeyedWorkItem` wrapper (which are typically empty, e.g. when wrapped in `ValueInEmptyWindows`), rather than the actual windows of the elements inside the `KeyedWorkItem`. This means state cleanup will not be registered for the correct windows.\n\nYou should extract the windows from the elements inside the `KeyedWorkItem`, similar to how it is done in the `if` branch. ```java } else {\n helpers.fnRunner.processElement(elem);\n windowsProcessed =\n (Collection<W>)\n StreamSupport.stream(elem.getValue().elementsIterable().spliterator(), false)\n .flatMap(wv -> wv.getWindows().stream())\n .collect(Collectors.toList());\n } ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.api.client.util.Lists; +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.DoFnInfo; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */ +public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W extends BoundedWindow> Review Comment:  There is a typo in the class name `StreamingKeyedWorkKitemSideInputParDoFn` (contains `WorkKitem` instead of `WorkItem`). Please rename the class and the file to `StreamingKeyedWorkItemSideInputParDoFn`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
