This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0856c22a9e9 Side Input improvements (#38363)
0856c22a9e9 is described below
commit 0856c22a9e92f7a7b5fb072139fdad2b14160228
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Jun 9 11:10:17 2026 -0700
Side Input improvements (#38363)
* side-input improvements
* remove files
* add Override
* foo
* foo
* fix splittable dofn
* fix
* foo
* fix windows
* fix compilation
* foo
---
.../apache/beam/runners/core/SimpleDoFnRunner.java | 26 +-
.../dataflow/worker/SimpleDoFnRunnerFactory.java | 12 -
.../runners/dataflow/worker/SimpleParDoFn.java | 533 ++++-----------------
...impleParDoFn.java => SimpleParDoFnHelpers.java} | 409 ++++++++--------
.../worker/SplittableProcessFnFactory.java | 20 +-
.../StreamingKeyedWorkItemSideInputParDoFn.java | 246 ++++++++++
.../worker/StreamingSideInputDoFnRunner.java | 41 +-
.../dataflow/worker/StreamingSideInputFetcher.java | 34 +-
.../worker/StreamingSideInputProcessor.java | 132 +++++
.../dataflow/worker/UserParDoFnFactory.java | 56 ++-
.../dataflow/worker/SimpleParDoFnHelpersTest.java | 133 +++++
.../runners/dataflow/worker/SimpleParDoFnTest.java | 6 +-
...eamingKeyedWorkItemSideInputDoFnRunnerTest.java | 3 +-
...StreamingKeyedWorkItemSideInputParDoFnTest.java | 488 +++++++++++++++++++
.../worker/StreamingSideInputProcessorTest.java | 215 +++++++++
.../dataflow/worker/UserParDoFnFactoryTest.java | 27 +-
runners/prism/java/build.gradle | 3 +
.../beam/sdk/testing/UsesSideInputsInTimer.java | 27 ++
.../java/org/apache/beam/sdk/transforms/DoFn.java | 16 +
.../sdk/transforms/reflect/DoFnSignatures.java | 9 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 149 ++++++
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 28 ++
22 files changed, 1854 insertions(+), 759 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 9859d672b3e..470e22a6699 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -870,8 +870,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
}
@Override
- public Object sideInput(String tagId) {
- throw new UnsupportedOperationException("SideInput parameters are not
supported.");
+ public @Nullable Object sideInput(String tagId) {
+ PCollectionView<?> view =
+ checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s
not found", tagId);
+ return sideInput(view);
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ return SimpleDoFnRunner.this.sideInput(
+ view, view.getWindowMappingFn().getSideInputWindow(window()));
}
@Override
@@ -1196,8 +1205,17 @@ public class SimpleDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT, Out
}
@Override
- public Object sideInput(String tagId) {
- throw new UnsupportedOperationException("SideInput parameters are not
supported.");
+ public @Nullable Object sideInput(String tagId) {
+ PCollectionView<?> view =
+ checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s
not found", tagId);
+ return sideInput(view);
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ return SimpleDoFnRunner.this.sideInput(
+ view, view.getWindowMappingFn().getSideInputWindow(window()));
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
index 52fcec439aa..5286fc1aae9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
@@ -68,17 +67,6 @@ class SimpleDoFnRunnerFactory<InputT, OutputT> implements
DoFnRunnerFactory<Inpu
windowingStrategy,
doFnSchemaInformation,
sideInputMapping);
- boolean hasStreamingSideInput =
- options.as(StreamingOptions.class).isStreaming() &&
!sideInputReader.isEmpty();
- if (hasStreamingSideInput) {
- return new StreamingSideInputDoFnRunner<>(
- fnRunner,
- new StreamingSideInputFetcher<>(
- sideInputViews,
- inputCoder,
- windowingStrategy,
- (StreamingModeExecutionContext.StreamingModeStepContext)
userStepContext));
- }
return fnRunner;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 434d46c20a5..34dff6b8835 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -17,53 +17,26 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
import java.io.Closeable;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.dataflow.worker.counters.Counter;
-import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
-import org.apache.beam.runners.dataflow.worker.counters.CounterName;
-import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
-import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.DoFnInfo;
-import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A base class providing simple set up, processing, and tear down for a
wrapped {@link
@@ -76,41 +49,9 @@ import org.slf4j.LoggerFactory;
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
-
- // TODO: Remove once Distributions has shipped.
- @VisibleForTesting
- static final String OUTPUTS_PER_ELEMENT_EXPERIMENT =
"outputs_per_element_counter";
-
- private static final String COUNTER_NAME = "per-element-output-count";
-
- private static final Logger LOG =
LoggerFactory.getLogger(SimpleParDoFn.class);
-
- protected final PipelineOptions options;
- private final DoFnInstanceManager doFnInstanceManager;
-
- private final SideInputReader sideInputReader;
- private final DataflowOperationContext operationContext;
- private final TupleTag<OutputT> mainOutputTag;
- private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
- private final List<TupleTag<?>> sideOutputTags;
- private final DataflowExecutionContext.DataflowStepContext stepContext;
- private final DataflowExecutionContext.DataflowStepContext userStepContext;
- private final CounterFactory counterFactory;
- private final DoFnRunnerFactory runnerFactory;
- private final boolean hasStreamingSideInput;
- private final OutputsPerElementTracker outputsPerElementTracker;
- private final DoFnSchemaInformation doFnSchemaInformation;
- private final Map<String, PCollectionView<?>> sideInputMapping;
-
- // Various DoFn helpers, null between bundles
- private @Nullable DoFnRunner<InputT, OutputT> fnRunner;
- @Nullable DoFnInfo<InputT, OutputT> fnInfo;
- private Receiver @Nullable [] receivers;
-
- // This may additionally be null if it is not a real DoFn but an OldDoFn or
- // GroupAlsoByWindowViaWindowSetDoFn
- private @Nullable DoFnSignature fnSignature;
+public class SimpleParDoFn<InputT, OutputT, W extends BoundedWindow>
implements ParDoFn {
+ private final SimpleParDoFnHelpers<InputT, OutputT, W> helpers;
+ private @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor;
/** Creates a {@link SimpleParDoFn} using basic information about the step
being executed. */
SimpleParDoFn(
@@ -124,225 +65,106 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
DoFnSchemaInformation doFnSchemaInformation,
Map<String, PCollectionView<?>> sideInputMapping,
DoFnRunnerFactory runnerFactory) {
- this.options = options;
- this.doFnInstanceManager = doFnInstanceManager;
-
- // We vend a freshly deserialized version for each run
- this.sideInputReader = sideInputReader;
- this.operationContext = operationContext;
- checkArgument(!outputTupleTagsToReceiverIndices.isEmpty(), "expected at
least one output");
- this.mainOutputTag = mainOutputTag;
- this.outputTupleTagsToReceiverIndices = outputTupleTagsToReceiverIndices;
- ImmutableList.Builder<TupleTag<?>> sideOutputTagsBuilder =
ImmutableList.builder();
- for (TupleTag<?> tag : outputTupleTagsToReceiverIndices.keySet()) {
- if (!mainOutputTag.equals(tag)) {
- sideOutputTagsBuilder.add(tag);
- }
- }
- this.sideOutputTags = sideOutputTagsBuilder.build();
- this.stepContext = stepContext;
-
- // StepContext provides a TimerInternals and StateInternals for use by the
system - this class.
- // For the user, we request a user-scoped StepContext to provide a
user-scoped
- // StateInternals and TimerInternals.
- this.userStepContext = stepContext.namespacedToUser();
-
- this.counterFactory = operationContext.counterFactory();
- this.runnerFactory = runnerFactory;
- this.hasStreamingSideInput =
- options.as(StreamingOptions.class).isStreaming() &&
!sideInputReader.isEmpty();
- this.outputsPerElementTracker = createOutputsPerElementTracker();
- this.doFnSchemaInformation = doFnSchemaInformation;
- this.sideInputMapping = sideInputMapping;
- }
-
- private OutputsPerElementTracker createOutputsPerElementTracker() {
- // TODO: Remove once Distributions has shipped.
- if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
- return NoopOutputsPerElementTracker.INSTANCE;
- }
-
- // TODO: Remove log statement when functionality is enabled by default.
- LOG.info("{} counter enabled.", COUNTER_NAME);
-
- return new OutputsPerElementTrackerImpl();
- }
-
- private boolean hasExperiment(String experiment) {
- List<String> experiments =
options.as(DataflowPipelineDebugOptions.class).getExperiments();
- return experiments != null && experiments.contains(experiment);
- }
-
- /** Simple state tracker to calculate PerElementOutputCount counter. */
- private interface OutputsPerElementTracker {
-
- void onOutput();
-
- void onProcessElement();
-
- void onProcessElementSuccess();
- }
-
- private class OutputsPerElementTrackerImpl implements
OutputsPerElementTracker {
-
- private long outputsPerElement;
- private final Counter<Long, CounterFactory.CounterDistribution> counter;
-
- public OutputsPerElementTrackerImpl() {
- this.counter =
- counterFactory.distribution(
-
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
- }
-
- @Override
- public void onProcessElement() {
- reset();
- }
-
- @Override
- public void onOutput() {
- outputsPerElement++;
- }
-
- @Override
- public void onProcessElementSuccess() {
- counter.addValue(outputsPerElement);
- reset();
- }
-
- private void reset() {
- outputsPerElement = 0L;
- }
- }
-
- /** No-op {@link OutputsPerElementTracker} implementation used when the
counter is disabled. */
- private static class NoopOutputsPerElementTracker implements
OutputsPerElementTracker {
-
- private NoopOutputsPerElementTracker() {}
-
- public static final OutputsPerElementTracker INSTANCE = new
NoopOutputsPerElementTracker();
-
- @Override
- public void onOutput() {}
-
- @Override
- public void onProcessElement() {}
-
- @Override
- public void onProcessElementSuccess() {}
+ this.helpers =
+ new SimpleParDoFnHelpers<>(
+ options,
+ doFnInstanceManager,
+ sideInputReader,
+ mainOutputTag,
+ outputTupleTagsToReceiverIndices,
+ stepContext,
+ operationContext,
+ doFnSchemaInformation,
+ sideInputMapping,
+ runnerFactory);
}
@Override
public void startBundle(Receiver... receivers) throws Exception {
- checkArgument(
- receivers.length == outputTupleTagsToReceiverIndices.size(),
- "unexpected number of receivers for DoFn");
-
- this.receivers = receivers;
- if (hasStreamingSideInput) {
+ helpers.startBundle(receivers);
+ if (helpers.hasStreamingSideInput) {
// There is non-trivial setup that needs to be performed for watermark
propagation
// even on empty bundles.
- reallyStartBundle();
- }
- }
-
- private void reallyStartBundle() throws Exception {
- checkState(fnRunner == null, "bundle already started (or not properly
finished)");
-
- WindowedValueMultiReceiver outputManager =
- new WindowedValueMultiReceiver() {
- final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new
HashMap<>();
-
- private @Nullable Receiver getReceiverOrNull(TupleTag<?> tag) {
- Integer receiverIndex = outputTupleTagsToReceiverIndices.get(tag);
- if (receiverIndex != null) {
- return receivers[receiverIndex];
- } else {
- return undeclaredOutputs.get(tag);
- }
- }
-
- @Override
- public <TagT> void output(TupleTag<TagT> tag, WindowedValue<TagT>
output) {
- outputsPerElementTracker.onOutput();
- Receiver receiver = getReceiverOrNull(tag);
- if (receiver == null) {
- // A new undeclared output.
- // TODO: plumb through the operationName, so that we can
- // name implicit outputs after it.
- String outputName = "implicit-" + tag.getId();
- // TODO: plumb through the counter prefix, so we can
- // make it available to the OutputReceiver class in case
- // it wants to use it in naming output counterFactory. (It
- // doesn't today.)
- OutputReceiver undeclaredReceiver = new OutputReceiver();
-
- ElementCounter outputCounter =
- new DataflowOutputCounter(
- outputName, counterFactory,
stepContext.getNameContext());
- undeclaredReceiver.addOutputCounter(outputCounter);
- undeclaredOutputs.put(tag, undeclaredReceiver);
- receiver = undeclaredReceiver;
+ helpers.reallyStartBundle();
+ onStartKey();
+ }
+ }
+
+ protected void onStartKey() {
+ // TODO(relax): This assumes single-key bundles, which will change!
Refactor this to not make
+ // this assumption.
+ if (helpers.hasStreamingSideInput) {
+ sideInputProcessor =
+ new StreamingSideInputProcessor<>(
+ new StreamingSideInputFetcher<InputT, W>(
+ helpers.fnInfo.getSideInputViews(),
+ helpers.fnInfo.getInputCoder(),
+ (WindowingStrategy<?, W>)
helpers.fnInfo.getWindowingStrategy(),
+ (StreamingModeExecutionContext.StreamingModeStepContext)
+ helpers.userStepContext));
+
+ boolean hasState = helpers.hasState();
+ sideInputProcessor.tryUnblockElements(
+ unblockedElements -> {
+ for (WindowedValue<InputT> unblockedElement : unblockedElements) {
+ helpers.fnRunner.processElement(unblockedElement);
+ if (hasState) {
+ // These elements are now processed. Register cleanup timers
for all the unblocked
+ // windows.
+ helpers.registerStateCleanup(
+ (WindowingStrategy<?, W>)
getDoFnInfo().getWindowingStrategy(),
+ (Collection<W>) unblockedElement.getWindows());
+ }
}
-
- try {
- receiver.process(output);
- } catch (RuntimeException | Error e) {
- // Rethrow unchecked exceptions as-is to avoid excessive nesting
- // via a chain of DoFn's.
- throw e;
- } catch (Exception e) {
- // This should never happen in practice with DoFn's, but can
happen
- // with other Receivers.
- throw new RuntimeException(e);
- }
- }
- };
- fnInfo = (DoFnInfo) doFnInstanceManager.get();
- fnSignature = DoFnSignatures.getSignature(fnInfo.getDoFn().getClass());
-
- fnRunner =
- runnerFactory.createRunner(
- fnInfo.getDoFn(),
- options,
- mainOutputTag,
- sideOutputTags,
- fnInfo.getSideInputViews(),
- sideInputReader,
- fnInfo.getInputCoder(),
- fnInfo.getOutputCoders(),
- fnInfo.getWindowingStrategy(),
- stepContext,
- userStepContext,
- outputManager,
- doFnSchemaInformation,
- sideInputMapping);
-
- fnRunner.startBundle();
+ });
+ }
}
@Override
@SuppressWarnings("unchecked")
public void processElement(Object untypedElem) throws Exception {
- if (fnRunner == null) {
+ if (helpers.fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure to
switch the state
// sampler into the start state.
- try (Closeable start = operationContext.enterStart()) {
- reallyStartBundle();
+ try (Closeable start = helpers.operationContext.enterStart()) {
+ helpers.reallyStartBundle();
+ onStartKey();
}
}
+ helpers.outputsPerElementTracker.onProcessElement();
WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
-
- if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
- registerStateCleanup(
- (WindowingStrategy<?, BoundedWindow>)
getDoFnInfo().getWindowingStrategy(),
- (Collection<BoundedWindow>) elem.getWindows());
+ onProcessWindowedValue(elem);
+
+ helpers.outputsPerElementTracker.onProcessElementSuccess();
+ }
+
+ protected void onProcessWindowedValue(WindowedValue<InputT> elem) {
+ boolean hasState = helpers.hasState();
+
+ Collection<W> windowsProcessed;
+ if (sideInputProcessor != null) {
+ windowsProcessed = hasState ? Lists.newArrayList() :
Collections.emptyList();
+ for (Iterator<? extends WindowedValue<InputT>> it =
+ sideInputProcessor.handleProcessElement(elem);
+ it.hasNext(); ) {
+ WindowedValue<InputT> toProcess = it.next();
+ helpers.fnRunner.processElement(toProcess);
+ if (hasState) {
+ windowsProcessed.addAll((Collection<W>) toProcess.getWindows());
+ // If the element was blocked, don't register a cleanup timer. The
timer will be
+ // registered
+ // when the window is unblocked ensuring that it is not processed
until the element is.
+ }
+ }
+ } else {
+ helpers.fnRunner.processElement(elem);
+ windowsProcessed = (Collection<W>) elem.getWindows();
+ }
+ if (hasState) {
+ helpers.registerStateCleanup(
+ (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
windowsProcessed);
}
-
- outputsPerElementTracker.onProcessElement();
- fnRunner.processElement(elem);
- outputsPerElementTracker.onProcessElementSuccess();
}
@Override
@@ -355,180 +177,33 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
// exist without actually decoding them.
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>)
- (fnInfo != null ? fnInfo : doFnInstanceManager.peek())
+ (helpers.fnInfo != null ? helpers.fnInfo :
helpers.doFnInstanceManager.peek())
.getWindowingStrategy()
.getWindowFn()
.windowCoder();
- processTimers(TimerType.USER, userStepContext, windowCoder);
- processTimers(TimerType.SYSTEM, stepContext, windowCoder);
- }
-
- private void processUserTimer(TimerData timer) throws Exception {
- if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
- ||
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
- BoundedWindow window = ((WindowNamespace)
timer.getNamespace()).getWindow();
- fnRunner.onTimer(
- timer.getTimerId(),
- timer.getTimerFamilyId(),
- this.stepContext.stateInternals().getKey(),
- window,
- timer.getTimestamp(),
- timer.getOutputTimestamp(),
- timer.getDomain(),
- timer.causedByDrain());
- }
- }
-
- private void processSystemTimer(TimerData timer) throws Exception {
-
- // Timer owned by this class, for cleaning up state in expired windows
- if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
- checkState(
- timer.getDomain().equals(TimeDomain.EVENT_TIME),
- "%s received cleanup timer with domain not EVENT_TIME: %s",
- this,
- timer);
-
- checkState(
- timer.getNamespace() instanceof WindowNamespace,
- "%s received cleanup timer not for a %s: %s",
- this,
- WindowNamespace.class.getSimpleName(),
- timer);
-
- BoundedWindow window = ((WindowNamespace)
timer.getNamespace()).getWindow();
- Instant targetTime = earliestAllowableCleanupTime(window,
fnInfo.getWindowingStrategy());
-
- checkState(
- !targetTime.isAfter(timer.getTimestamp()),
- "%s received state cleanup timer for window %s "
- + " that is before the appropriate cleanup time %s",
- this,
- window,
- targetTime);
-
- fnRunner.onWindowExpiration(
- window, timer.getOutputTimestamp(),
this.stepContext.stateInternals().getKey());
-
- // This is for a timer for a window that is expired, so clean it up.
- for (StateDeclaration stateDecl :
fnSignature.stateDeclarations().values()) {
- StateTag<?> tag;
- try {
- tag =
- StateTags.tagForSpec(
- stateDecl.id(), (StateSpec)
stateDecl.field().get(fnInfo.getDoFn()));
- } catch (IllegalAccessException e) {
- throw new RuntimeException(
- String.format(
- "Error accessing %s for %s",
- StateSpec.class.getName(),
fnInfo.getDoFn().getClass().getName()),
- e);
- }
-
- StateInternals stateInternals = userStepContext.stateInternals();
- org.apache.beam.sdk.state.State state =
stateInternals.state(timer.getNamespace(), tag);
- state.clear();
- }
- }
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.USER,
+ helpers.userStepContext,
+ windowCoder,
+ this::onStartKey,
+ () -> sideInputProcessor);
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.SYSTEM,
+ helpers.stepContext,
+ windowCoder,
+ this::onStartKey,
+ () -> sideInputProcessor);
}
@Override
public void finishBundle() throws Exception {
- if (fnRunner != null) {
- fnRunner.finishBundle();
- doFnInstanceManager.complete(fnInfo);
- fnRunner = null;
- fnInfo = null;
- fnSignature = null;
- }
+ helpers.finishBundle(sideInputProcessor);
+ this.sideInputProcessor = null;
}
@Override
public void abort() throws Exception {
- doFnInstanceManager.abort(fnInfo);
- fnRunner = null;
- fnInfo = null;
- }
-
- @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
-
- private enum TimerType {
- USER {
- @Override
- public void processTimer(SimpleParDoFn doFn, TimerData timer) throws
Exception {
- doFn.processUserTimer(timer);
- }
- },
- SYSTEM {
- @Override
- public void processTimer(SimpleParDoFn doFn, TimerData timer) throws
Exception {
- doFn.processSystemTimer(timer);
- }
- };
-
- public abstract void processTimer(SimpleParDoFn doFn, TimerData timer)
throws Exception;
- };
-
- private void processTimers(
- TimerType mode,
- DataflowExecutionContext.DataflowStepContext context,
- Coder<BoundedWindow> windowCoder)
- throws Exception {
- TimerData timer = context.getNextFiredTimer(windowCoder);
-
- if (timer != null && fnRunner == null) {
- // If we need to run reallyStartBundle in here, we need to make sure to
switch the state
- // sampler into the start state.
- try (Closeable start = operationContext.enterStart()) {
- reallyStartBundle();
- }
- }
-
- while (timer != null) {
- mode.processTimer(this, timer);
- timer = context.getNextFiredTimer(windowCoder);
- }
- }
-
- private <W extends BoundedWindow> void registerStateCleanup(
- WindowingStrategy<?, W> windowingStrategy, Collection<W>
windowsToCleanup) {
- Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
- for (W window : windowsToCleanup) {
- // The stepContext is the thing that know if it is batch or streaming,
hence
- // whether state needs to be cleaned up or will simply be discarded so
the
- // timer can be ignored.
- Instant cleanupTime = earliestAllowableCleanupTime(window,
windowingStrategy);
- // Set a cleanup timer for state at the end of the window to trigger
onWindowExpiration and
- // garbage collect state. We avoid doing this for the global window if
there is no window
- // expiration set as the state will be up when the pipeline terminates.
Setting the timer
- // leads to a unbounded growth of timers for pipelines with many unique
keys in the global
- // window.
- if (cleanupTime.isBefore(GlobalWindow.INSTANCE.maxTimestamp())
- || fnSignature.onWindowExpiration() != null) {
- // If the DoFn has OnWindowExpiration, then set the watermark hold so
that the watermark
- // does
- // not advance until OnWindowExpiration completes.
- Instant cleanupOutputTimestamp =
- fnSignature.onWindowExpiration() == null
- ? cleanupTime
- : cleanupTime.minus(Duration.millis(1L));
- stepContext.setStateCleanupTimer(
- CLEANUP_TIMER_ID, window, windowCoder, cleanupTime,
cleanupOutputTimestamp);
- }
- }
- }
-
- private Instant earliestAllowableCleanupTime(
- BoundedWindow window, WindowingStrategy windowingStrategy) {
- Instant cleanupTime =
- window
- .maxTimestamp()
- .plus(windowingStrategy.getAllowedLateness())
- .plus(Duration.millis(1L));
- return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
- ? BoundedWindow.TIMESTAMP_MAX_VALUE
- : cleanupTime;
+ helpers.abort();
}
/**
@@ -540,6 +215,6 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
@VisibleForTesting
@Nullable
DoFnInfo<?, ?> getDoFnInfo() {
- return fnInfo;
+ return helpers.fnInfo;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
similarity index 76%
copy from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
copy to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
index 434d46c20a5..964cf2323d5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
@@ -25,29 +25,29 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -65,18 +65,12 @@ import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * A base class providing simple set up, processing, and tear down for a
wrapped {@link
- * GroupAlsoByWindowFn}.
- *
- * <p>Subclasses override just a method to provide a {@link DoFnInfo} for the
wrapped {@link
- * GroupAlsoByWindowFn}.
- */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
+class SimpleParDoFnHelpers<InputT, OutputT, W extends BoundedWindow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SimpleParDoFnHelpers.class);
// TODO: Remove once Distributions has shipped.
@VisibleForTesting
@@ -84,36 +78,33 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
private static final String COUNTER_NAME = "per-element-output-count";
- private static final Logger LOG =
LoggerFactory.getLogger(SimpleParDoFn.class);
-
- protected final PipelineOptions options;
- private final DoFnInstanceManager doFnInstanceManager;
+ final PipelineOptions options;
+ final DoFnInstanceManager doFnInstanceManager;
private final SideInputReader sideInputReader;
- private final DataflowOperationContext operationContext;
+ final DataflowOperationContext operationContext;
private final TupleTag<OutputT> mainOutputTag;
private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
private final List<TupleTag<?>> sideOutputTags;
- private final DataflowExecutionContext.DataflowStepContext stepContext;
- private final DataflowExecutionContext.DataflowStepContext userStepContext;
+ final DataflowExecutionContext.DataflowStepContext stepContext;
+ final DataflowExecutionContext.DataflowStepContext userStepContext;
private final CounterFactory counterFactory;
private final DoFnRunnerFactory runnerFactory;
- private final boolean hasStreamingSideInput;
- private final OutputsPerElementTracker outputsPerElementTracker;
+ final boolean hasStreamingSideInput;
+ final OutputsPerElementTracker outputsPerElementTracker;
private final DoFnSchemaInformation doFnSchemaInformation;
private final Map<String, PCollectionView<?>> sideInputMapping;
// Various DoFn helpers, null between bundles
- private @Nullable DoFnRunner<InputT, OutputT> fnRunner;
+ @Nullable DoFnRunner<InputT, OutputT> fnRunner;
@Nullable DoFnInfo<InputT, OutputT> fnInfo;
private Receiver @Nullable [] receivers;
// This may additionally be null if it is not a real DoFn but an OldDoFn or
// GroupAlsoByWindowViaWindowSetDoFn
- private @Nullable DoFnSignature fnSignature;
+ protected @Nullable DoFnSignature fnSignature;
- /** Creates a {@link SimpleParDoFn} using basic information about the step
being executed. */
- SimpleParDoFn(
+ SimpleParDoFnHelpers(
PipelineOptions options,
DoFnInstanceManager doFnInstanceManager,
SideInputReader sideInputReader,
@@ -156,97 +147,19 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
this.sideInputMapping = sideInputMapping;
}
- private OutputsPerElementTracker createOutputsPerElementTracker() {
- // TODO: Remove once Distributions has shipped.
- if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
- return NoopOutputsPerElementTracker.INSTANCE;
- }
-
- // TODO: Remove log statement when functionality is enabled by default.
- LOG.info("{} counter enabled.", COUNTER_NAME);
-
- return new OutputsPerElementTrackerImpl();
- }
-
- private boolean hasExperiment(String experiment) {
- List<String> experiments =
options.as(DataflowPipelineDebugOptions.class).getExperiments();
- return experiments != null && experiments.contains(experiment);
- }
-
- /** Simple state tracker to calculate PerElementOutputCount counter. */
- private interface OutputsPerElementTracker {
-
- void onOutput();
-
- void onProcessElement();
-
- void onProcessElementSuccess();
- }
-
- private class OutputsPerElementTrackerImpl implements
OutputsPerElementTracker {
-
- private long outputsPerElement;
- private final Counter<Long, CounterFactory.CounterDistribution> counter;
-
- public OutputsPerElementTrackerImpl() {
- this.counter =
- counterFactory.distribution(
-
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
- }
-
- @Override
- public void onProcessElement() {
- reset();
- }
-
- @Override
- public void onOutput() {
- outputsPerElement++;
- }
-
- @Override
- public void onProcessElementSuccess() {
- counter.addValue(outputsPerElement);
- reset();
- }
-
- private void reset() {
- outputsPerElement = 0L;
- }
- }
-
- /** No-op {@link OutputsPerElementTracker} implementation used when the
counter is disabled. */
- private static class NoopOutputsPerElementTracker implements
OutputsPerElementTracker {
-
- private NoopOutputsPerElementTracker() {}
-
- public static final OutputsPerElementTracker INSTANCE = new
NoopOutputsPerElementTracker();
-
- @Override
- public void onOutput() {}
-
- @Override
- public void onProcessElement() {}
-
- @Override
- public void onProcessElementSuccess() {}
+ boolean hasState() {
+ return fnSignature != null && !fnSignature.stateDeclarations().isEmpty();
}
- @Override
- public void startBundle(Receiver... receivers) throws Exception {
+ void startBundle(Receiver... receivers) throws Exception {
checkArgument(
receivers.length == outputTupleTagsToReceiverIndices.size(),
"unexpected number of receivers for DoFn");
this.receivers = receivers;
- if (hasStreamingSideInput) {
- // There is non-trivial setup that needs to be performed for watermark
propagation
- // even on empty bundles.
- reallyStartBundle();
- }
}
- private void reallyStartBundle() throws Exception {
+ void reallyStartBundle() throws Exception {
checkState(fnRunner == null, "bundle already started (or not properly
finished)");
WindowedValueMultiReceiver outputManager =
@@ -317,56 +230,103 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
outputManager,
doFnSchemaInformation,
sideInputMapping);
-
fnRunner.startBundle();
}
- @Override
- @SuppressWarnings("unchecked")
- public void processElement(Object untypedElem) throws Exception {
- if (fnRunner == null) {
+ void finishBundle(StreamingSideInputProcessor<?, ?> sideInputProcessor)
throws Exception {
+ if (fnRunner != null) {
+ fnRunner.finishBundle();
+ if (sideInputProcessor != null) {
+ sideInputProcessor.handleFinishBundle();
+ }
+ doFnInstanceManager.complete(fnInfo);
+ fnRunner = null;
+ fnInfo = null;
+ fnSignature = null;
+ sideInputProcessor = null;
+ }
+ }
+
+ void abort() throws Exception {
+ doFnInstanceManager.abort(fnInfo);
+ fnRunner = null;
+ fnInfo = null;
+ }
+
+ @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
+
+ enum TimerType {
+ USER {
+ @Override
+ public void processTimer(
+ SimpleParDoFnHelpers doFn,
+ TimerInternals.TimerData timer,
+ Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+ throws Exception {
+ doFn.processUserTimer(timer, sideInputProcessor.get());
+ }
+ },
+ FAIL_USER {
+ @Override
+ public void processTimer(
+ SimpleParDoFnHelpers doFn,
+ TimerInternals.TimerData timer,
+ Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+ throws Exception {
+ throw new UnsupportedOperationException(
+ "Attempt to deliver a timer to a DoFn, but timers are not
supported here.");
+ }
+ },
+ SYSTEM {
+ @Override
+ public void processTimer(
+ SimpleParDoFnHelpers doFn,
+ TimerInternals.TimerData timer,
+ Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+ throws Exception {
+ doFn.processSystemTimer(timer, sideInputProcessor.get());
+ }
+ };
+
+ public abstract void processTimer(
+ SimpleParDoFnHelpers doFn,
+ TimerInternals.TimerData timer,
+ Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+ throws Exception;
+ };
+
+ void processTimers(
+ TimerType mode,
+ DataflowExecutionContext.DataflowStepContext context,
+ Coder<BoundedWindow> windowCoder,
+ Runnable startKey,
+ Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+ throws Exception {
+ TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);
+
+ if (timer != null && fnRunner == null) {
// If we need to run reallyStartBundle in here, we need to make sure to
switch the state
// sampler into the start state.
try (Closeable start = operationContext.enterStart()) {
reallyStartBundle();
+ startKey.run();
}
}
- WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
-
- if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
- registerStateCleanup(
- (WindowingStrategy<?, BoundedWindow>)
getDoFnInfo().getWindowingStrategy(),
- (Collection<BoundedWindow>) elem.getWindows());
+ while (timer != null) {
+ mode.processTimer(this, timer, sideInputProcessor);
+ timer = context.getNextFiredTimer(windowCoder);
}
-
- outputsPerElementTracker.onProcessElement();
- fnRunner.processElement(elem);
- outputsPerElementTracker.onProcessElementSuccess();
- }
-
- @Override
- public void processTimers() throws Exception {
-
- // Note: We need to get windowCoder to decode the timers. If we haven't
already deserialized
- // the fnInfo, we peek at a new instance to retrieve that. If this extra
deserialization becomes
- // excessively costly, we could either (1) have the DoFnInstanceManager
remember the associated
- // windowCoder (allowing us to get it without a DoFnInfo instance) or (2)
check whether timers
- // exist without actually decoding them.
- Coder<BoundedWindow> windowCoder =
- (Coder<BoundedWindow>)
- (fnInfo != null ? fnInfo : doFnInstanceManager.peek())
- .getWindowingStrategy()
- .getWindowFn()
- .windowCoder();
- processTimers(TimerType.USER, userStepContext, windowCoder);
- processTimers(TimerType.SYSTEM, stepContext, windowCoder);
}
- private void processUserTimer(TimerData timer) throws Exception {
+ protected void processUserTimer(
+ TimerInternals.TimerData timer, StreamingSideInputProcessor<InputT, W>
sideInputProcessor) {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
||
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
- BoundedWindow window = ((WindowNamespace)
timer.getNamespace()).getWindow();
+ BoundedWindow window = ((StateNamespaces.WindowNamespace)
timer.getNamespace()).getWindow();
+ if (sideInputProcessor != null) {
+ sideInputProcessor.handleProcessTimer(timer);
+ }
fnRunner.onTimer(
timer.getTimerId(),
timer.getTimerFamilyId(),
@@ -379,8 +339,9 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
}
}
- private void processSystemTimer(TimerData timer) throws Exception {
-
+ private void processSystemTimer(
+ TimerInternals.TimerData timer, StreamingSideInputProcessor<InputT, W>
sideInputProcessor)
+ throws Exception {
// Timer owned by this class, for cleaning up state in expired windows
if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
checkState(
@@ -390,13 +351,20 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
timer);
checkState(
- timer.getNamespace() instanceof WindowNamespace,
+ timer.getNamespace() instanceof StateNamespaces.WindowNamespace,
"%s received cleanup timer not for a %s: %s",
this,
- WindowNamespace.class.getSimpleName(),
+ StateNamespaces.WindowNamespace.class.getSimpleName(),
timer);
- BoundedWindow window = ((WindowNamespace)
timer.getNamespace()).getWindow();
+ if (sideInputProcessor != null) {
+ // We must call this to ensure the side-input is cached for
onWindowExpiration. Since we
+ // don't set cleanup
+ // timers until we actually call processElement, the window must be
unblocked here.
+ sideInputProcessor.handleProcessTimer(timer);
+ }
+
+ BoundedWindow window = ((StateNamespaces.WindowNamespace)
timer.getNamespace()).getWindow();
Instant targetTime = earliestAllowableCleanupTime(window,
fnInfo.getWindowingStrategy());
checkState(
@@ -411,7 +379,7 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
window, timer.getOutputTimestamp(),
this.stepContext.stateInternals().getKey());
// This is for a timer for a window that is expired, so clean it up.
- for (StateDeclaration stateDecl :
fnSignature.stateDeclarations().values()) {
+ for (DoFnSignature.StateDeclaration stateDecl :
fnSignature.stateDeclarations().values()) {
StateTag<?> tag;
try {
tag =
@@ -426,71 +394,100 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
}
StateInternals stateInternals = userStepContext.stateInternals();
- org.apache.beam.sdk.state.State state =
stateInternals.state(timer.getNamespace(), tag);
+ State state = stateInternals.state(timer.getNamespace(), tag);
state.clear();
}
}
}
- @Override
- public void finishBundle() throws Exception {
- if (fnRunner != null) {
- fnRunner.finishBundle();
- doFnInstanceManager.complete(fnInfo);
- fnRunner = null;
- fnInfo = null;
- fnSignature = null;
+ private OutputsPerElementTracker createOutputsPerElementTracker() {
+ // TODO: Remove once Distributions has shipped.
+ if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
+ return NoopOutputsPerElementTracker.INSTANCE;
}
+
+ // TODO: Remove log statement when functionality is enabled by default.
+ LOG.info("{} counter enabled.", COUNTER_NAME);
+
+ return new OutputsPerElementTrackerImpl();
}
- @Override
- public void abort() throws Exception {
- doFnInstanceManager.abort(fnInfo);
- fnRunner = null;
- fnInfo = null;
+ private boolean hasExperiment(String experiment) {
+ List<String> experiments =
options.as(DataflowPipelineDebugOptions.class).getExperiments();
+ return experiments != null && experiments.contains(experiment);
}
- @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
+ /** Simple state tracker to calculate PerElementOutputCount counter. */
+ interface OutputsPerElementTracker {
- private enum TimerType {
- USER {
- @Override
- public void processTimer(SimpleParDoFn doFn, TimerData timer) throws
Exception {
- doFn.processUserTimer(timer);
- }
- },
- SYSTEM {
- @Override
- public void processTimer(SimpleParDoFn doFn, TimerData timer) throws
Exception {
- doFn.processSystemTimer(timer);
- }
- };
+ void onOutput();
- public abstract void processTimer(SimpleParDoFn doFn, TimerData timer)
throws Exception;
- };
+ void onProcessElement();
- private void processTimers(
- TimerType mode,
- DataflowExecutionContext.DataflowStepContext context,
- Coder<BoundedWindow> windowCoder)
- throws Exception {
- TimerData timer = context.getNextFiredTimer(windowCoder);
+ void onProcessElementSuccess();
+ }
- if (timer != null && fnRunner == null) {
- // If we need to run reallyStartBundle in here, we need to make sure to
switch the state
- // sampler into the start state.
- try (Closeable start = operationContext.enterStart()) {
- reallyStartBundle();
- }
+ private class OutputsPerElementTrackerImpl implements
OutputsPerElementTracker {
+
+ private long outputsPerElement;
+ private final Counter<Long, CounterFactory.CounterDistribution> counter;
+
+ public OutputsPerElementTrackerImpl() {
+ this.counter =
+ counterFactory.distribution(
+
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
}
- while (timer != null) {
- mode.processTimer(this, timer);
- timer = context.getNextFiredTimer(windowCoder);
+ @Override
+ public void onProcessElement() {
+ reset();
+ }
+
+ @Override
+ public void onOutput() {
+ outputsPerElement++;
+ }
+
+ @Override
+ public void onProcessElementSuccess() {
+ counter.addValue(outputsPerElement);
+ reset();
+ }
+
+ private void reset() {
+ outputsPerElement = 0L;
}
}
- private <W extends BoundedWindow> void registerStateCleanup(
+ /** No-op {@link OutputsPerElementTracker} implementation used when the
counter is disabled. */
+ private static class NoopOutputsPerElementTracker implements
OutputsPerElementTracker {
+
+ private NoopOutputsPerElementTracker() {}
+
+ public static final OutputsPerElementTracker INSTANCE = new
NoopOutputsPerElementTracker();
+
+ @Override
+ public void onOutput() {}
+
+ @Override
+ public void onProcessElement() {}
+
+ @Override
+ public void onProcessElementSuccess() {}
+ }
+
+ Instant earliestAllowableCleanupTime(BoundedWindow window, WindowingStrategy
windowingStrategy) {
+ Instant cleanupTime =
+ window
+ .maxTimestamp()
+ .plus(windowingStrategy.getAllowedLateness())
+ .plus(Duration.millis(1L));
+ return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
+ ? BoundedWindow.TIMESTAMP_MAX_VALUE
+ : cleanupTime;
+ }
+
+ protected void registerStateCleanup(
WindowingStrategy<?, W> windowingStrategy, Collection<W>
windowsToCleanup) {
Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();
@@ -518,28 +515,4 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
}
}
}
-
- private Instant earliestAllowableCleanupTime(
- BoundedWindow window, WindowingStrategy windowingStrategy) {
- Instant cleanupTime =
- window
- .maxTimestamp()
- .plus(windowingStrategy.getAllowedLateness())
- .plus(Duration.millis(1L));
- return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
- ? BoundedWindow.TIMESTAMP_MAX_VALUE
- : cleanupTime;
- }
-
- /**
- * Returns the {@link DoFnInfo} currently being used by this {@link
SimpleParDoFn}.
- *
- * <p>May be null if no element has been processed yet, or if the {@link
SimpleParDoFn} has
- * finished.
- */
- @VisibleForTesting
- @Nullable
- DoFnInfo<?, ?> getDoFnInfo() {
- return fnInfo;
- }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
index 3ad443ee2a2..b55d73cf579 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.util.DoFnInfo;
@@ -65,7 +64,8 @@ import org.joda.time.Duration;
})
class SplittableProcessFnFactory {
static final ParDoFnFactory createDefault() {
- return new UserParDoFnFactory(new ProcessFnExtractor(), new
SplittableDoFnRunnerFactory());
+ return new UserParDoFnFactory(
+ new ProcessFnExtractor(), new SplittableDoFnRunnerFactory(), true);
}
private static class ProcessFnExtractor implements
UserParDoFnFactory.DoFnExtractor {
@@ -174,22 +174,6 @@ class SplittableProcessFnFactory {
sideInputMapping);
DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT>
fnRunner =
new DataflowProcessFnRunner<>(simpleRunner);
- boolean hasStreamingSideInput =
- options.as(StreamingOptions.class).isStreaming() &&
!sideInputReader.isEmpty();
- KeyedWorkItemCoder<byte[], KV<InputT, RestrictionT>> kwiCoder =
- (KeyedWorkItemCoder<byte[], KV<InputT, RestrictionT>>) inputCoder;
- if (hasStreamingSideInput) {
- fnRunner =
- new StreamingKeyedWorkItemSideInputDoFnRunner<>(
- fnRunner,
- ByteArrayCoder.of(),
- new StreamingSideInputFetcher<>(
- sideInputViews,
- kwiCoder.getElementCoder(),
- processFn.getInputWindowingStrategy(),
- (StreamingModeExecutionContext.StreamingModeStepContext)
userStepContext),
- userStepContext);
- }
return fnRunner;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
new file mode 100644
index 00000000000..225bc6af0ea
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkItemSideInputParDoFn<K, InputT, OutputT, W
extends BoundedWindow>
+ implements ParDoFn {
+ private final StateTag<ValueState<K>> keyAddr;
+ private final Coder<InputT> inputCoder;
+ private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W>
helpers;
+ protected @Nullable StreamingSideInputProcessor<InputT, W>
sideInputProcessor;
+
+ StreamingKeyedWorkItemSideInputParDoFn(
+ PipelineOptions options,
+ DoFnInstanceManager doFnInstanceManager,
+ SideInputReader sideInputReader,
+ TupleTag<OutputT> mainOutputTag,
+ Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+ DataflowExecutionContext.DataflowStepContext stepContext,
+ DataflowOperationContext operationContext,
+ DoFnSchemaInformation doFnSchemaInformation,
+ Map<String, PCollectionView<?>> sideInputMapping,
+ DoFnRunnerFactory runnerFactory,
+ Coder<K> keyCoder,
+ Coder<InputT> inputCoder) {
+ helpers =
+ new SimpleParDoFnHelpers<>(
+ options,
+ doFnInstanceManager,
+ sideInputReader,
+ mainOutputTag,
+ outputTupleTagsToReceiverIndices,
+ stepContext,
+ operationContext,
+ doFnSchemaInformation,
+ sideInputMapping,
+ runnerFactory);
+ this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key",
keyCoder));
+ this.inputCoder = inputCoder;
+ }
+
+ ValueState<K> keyValue() {
+ return
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+ }
+
+ @Override
+ public void startBundle(Receiver... receivers) throws Exception {
+ helpers.startBundle(receivers);
+ if (helpers.hasStreamingSideInput) {
+ // There is non-trivial setup that needs to be performed for watermark
propagation
+ // even on empty bundles.
+ helpers.reallyStartBundle();
+ onStartKey();
+ }
+ }
+
+ protected void onStartKey() {
+ if (helpers.hasStreamingSideInput) {
+ sideInputProcessor =
+ new StreamingSideInputProcessor<>(
+ new StreamingSideInputFetcher<InputT, W>(
+ helpers.fnInfo.getSideInputViews(),
+ inputCoder,
+ (WindowingStrategy<?, W>)
helpers.fnInfo.getWindowingStrategy(),
+ (StreamingModeExecutionContext.StreamingModeStepContext)
+ helpers.userStepContext));
+ }
+
+ if (sideInputProcessor != null) {
+ boolean hasState = helpers.hasState();
+
+ // TODO(relax): We should be able to get this without writing it to
state!
+ @Nullable K key = keyValue().read();
+ if (key != null) {
+ sideInputProcessor.tryUnblockElementsAndTimers(
+ (unblockedElements, unblockedTimers) -> {
+ if (!Iterables.isEmpty(unblockedElements) ||
!Iterables.isEmpty(unblockedTimers)) {
+ helpers.fnRunner.processElement(
+ new ValueInEmptyWindows<>(
+ KeyedWorkItems.workItem(key, unblockedTimers,
unblockedElements)));
+ }
+ if (hasState) {
+ List<W> windows =
+ (List<W>)
+ StreamSupport.stream(unblockedElements.spliterator(),
false)
+ .flatMap(wv -> wv.getWindows().stream())
+ .collect(Collectors.toList());
+ // These elements are now processed. Register cleanup timers
for all the unblocked
+ // windows.
+ helpers.registerStateCleanup(
+ (WindowingStrategy<?, W>)
getDoFnInfo().getWindowingStrategy(), windows);
+ }
+ });
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processElement(Object untypedElem) throws Exception {
+ if (helpers.fnRunner == null) {
+ // If we need to run reallyStartBundle in here, we need to make sure to
switch the state
+ // sampler into the start state.
+ try (Closeable start = helpers.operationContext.enterStart()) {
+ helpers.reallyStartBundle();
+ onStartKey();
+ }
+ }
+ helpers.outputsPerElementTracker.onProcessElement();
+
+ WindowedValue<KeyedWorkItem<K, InputT>> elem =
+ (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
+ onProcessWindowedValue(elem);
+
+ helpers.outputsPerElementTracker.onProcessElementSuccess();
+ }
+
+ @Override
+ public void processTimers() throws Exception {
+
+ // Note: We need to get windowCoder to decode the timers. If we haven't
already deserialized
+ // the fnInfo, we peek at a new instance to retrieve that. If this extra
deserialization becomes
+ // excessively costly, we could either (1) have the DoFnInstanceManager
remember the associated
+ // windowCoder (allowing us to get it without a DoFnInfo instance) or (2)
check whether timers
+ // exist without actually decoding them.
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>)
+ (helpers.fnInfo != null ? helpers.fnInfo :
helpers.doFnInstanceManager.peek())
+ .getWindowingStrategy()
+ .getWindowFn()
+ .windowCoder();
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.FAIL_USER,
+ helpers.userStepContext,
+ windowCoder,
+ this::onStartKey,
+ () -> sideInputProcessor);
+ helpers.processTimers(
+ SimpleParDoFnHelpers.TimerType.SYSTEM,
+ helpers.stepContext,
+ windowCoder,
+ this::onStartKey,
+ () -> sideInputProcessor);
+ }
+
+ @Override
+ public void finishBundle() throws Exception {
+ helpers.finishBundle(sideInputProcessor);
+ this.sideInputProcessor = null;
+ }
+
+ @Override
+ public void abort() throws Exception {
+ helpers.abort();
+ }
+
+ protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K,
InputT>> elem) {
+ // TODO: Get rid of this!
+ final K key = elem.getValue().key();
+ keyValue().write(key);
+
+ boolean hasState = helpers.hasState();
+ Collection<W> windowsProcessed;
+ if (sideInputProcessor != null) {
+ windowsProcessed = hasState ? Lists.newArrayList() :
Collections.emptyList();
+ WindowedValue<KeyedWorkItem<K, InputT>> unblocked =
+ sideInputProcessor.handleProcessKeyedWorkItem(elem);
+ if (!Iterables.isEmpty(unblocked.getValue().elementsIterable())
+ || !Iterables.isEmpty(unblocked.getValue().timersIterable())) {
+ helpers.fnRunner.processElement(unblocked);
+ }
+ if (hasState) {
+ windowsProcessed.addAll((Collection<W>) unblocked.getWindows());
+ }
+ } else {
+ helpers.fnRunner.processElement(elem);
+ windowsProcessed = (Collection<W>) elem.getWindows();
+ }
+ if (hasState) {
+ helpers.registerStateCleanup(
+ (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
windowsProcessed);
+ }
+ }
+
+ /**
+ * Returns the {@link DoFnInfo} currently being used by this {@link
SimpleParDoFn}.
+ *
+ * <p>May be null if no element has been processed yet, or if the {@link
SimpleParDoFn} has
+ * finished.
+ */
+ @VisibleForTesting
+ @Nullable
+ DoFnInfo<?, ?> getDoFnInfo() {
+ return helpers.fnInfo;
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index 3b7891c5378..a64d1a970d3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -17,9 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import java.util.Set;
+import java.util.Iterator;
import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -37,43 +36,33 @@ import org.joda.time.Instant;
public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
implements DoFnRunner<InputT, OutputT> {
private final DoFnRunner<InputT, OutputT> simpleDoFnRunner;
- private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+ private final StreamingSideInputProcessor<InputT, W> sideInputProcessor;
public StreamingSideInputDoFnRunner(
DoFnRunner<InputT, OutputT> simpleDoFnRunner,
StreamingSideInputFetcher<InputT, W> sideInputFetcher) {
this.simpleDoFnRunner = simpleDoFnRunner;
- this.sideInputFetcher = sideInputFetcher;
+ this.sideInputProcessor = new
StreamingSideInputProcessor<>(sideInputFetcher);
}
@Override
public void startBundle() {
simpleDoFnRunner.startBundle();
- sideInputFetcher.prefetchBlockedMap();
-
- // Find the set of ready windows.
- Set<W> readyWindows = sideInputFetcher.getReadyWindows();
-
- Iterable<BagState<WindowedValue<InputT>>> elementsBags =
- sideInputFetcher.prefetchElements(readyWindows);
-
- // Run the DoFn code now that all side inputs are ready.
- for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
- Iterable<WindowedValue<InputT>> elements = elementsBag.read();
- for (WindowedValue<InputT> elem : elements) {
- simpleDoFnRunner.processElement(elem);
- }
- elementsBag.clear();
- }
- sideInputFetcher.releaseBlockedWindows(readyWindows);
+ sideInputProcessor.tryUnblockElements(
+ unblocked -> {
+ for (WindowedValue<InputT> elem : unblocked) {
+ simpleDoFnRunner.processElement(elem);
+ }
+ });
}
@Override
public void processElement(WindowedValue<InputT> compressedElem) {
- for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
- if (!sideInputFetcher.storeIfBlocked(elem)) {
- simpleDoFnRunner.processElement(elem);
- }
+ for (Iterator<? extends WindowedValue<InputT>> it =
+ sideInputProcessor.handleProcessElement(compressedElem);
+ it.hasNext(); ) {
+ WindowedValue<InputT> elem = it.next();
+ simpleDoFnRunner.processElement(elem);
}
}
@@ -94,7 +83,7 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W
extends BoundedWind
@Override
public void finishBundle() {
simpleDoFnRunner.finishBundle();
- sideInputFetcher.persist();
+ sideInputProcessor.handleFinishBundle();
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index 76913baa6aa..0369b82be73 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -83,7 +84,6 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
this.stepContext = stepContext;
this.mainWindowCoder = windowingStrategy.getWindowFn().windowCoder();
-
this.sideInputViews = new HashMap<>();
for (PCollectionView<?> view : views) {
sideInputViews.put(view.getTagInternal().getId(), view);
@@ -188,11 +188,7 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
return timers;
}
- /** Compute the set of side inputs that are not yet ready for the given main
input window. */
- public boolean storeIfBlocked(WindowedValue<InputT> elem) {
- @SuppressWarnings("unchecked")
- W window = (W) Iterables.getOnlyElement(elem.getWindows());
-
+ private Set<Windmill.GlobalDataRequest> checkIfBlocked(W window) {
Set<Windmill.GlobalDataRequest> blocked = blockedMap().get(window);
if (blocked == null) {
for (PCollectionView<?> view : sideInputViews.values()) {
@@ -205,7 +201,16 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
}
}
}
- if (blocked != null) {
+ return blocked == null ? Collections.emptySet() : blocked;
+ }
+
+ /** Compute the set of side inputs that are not yet ready for the given main
input window. */
+ public boolean storeIfBlocked(WindowedValue<InputT> elem) {
+ @SuppressWarnings("unchecked")
+ W window = (W) Iterables.getOnlyElement(elem.getWindows());
+
+ Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
+ if (!blocked.isEmpty()) {
elementBag(window).add(elem);
watermarkHold(window).add(elem.getTimestamp());
stepContext.addBlockingSideInputs(blocked);
@@ -223,17 +228,12 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
@SuppressWarnings("unchecked")
WindowNamespace<W> windowNamespace = (WindowNamespace<W>)
timer.getNamespace();
W window = windowNamespace.getWindow();
-
- boolean blocked = false;
- for (PCollectionView<?> view : sideInputViews.values()) {
- if (!stepContext.issueSideInputFetch(view, window,
SideInputState.UNKNOWN)) {
- blocked = true;
- }
- }
- if (blocked) {
+ Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
+ if (!blocked.isEmpty()) {
timerBag(window).add(timer);
+ return true;
}
- return blocked;
+ return false;
}
public void persist() {
@@ -332,7 +332,7 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
.build();
}
- private static class GlobalDataRequestCoder extends
AtomicCoder<GlobalDataRequest> {
+ static class GlobalDataRequestCoder extends AtomicCoder<GlobalDataRequest> {
private final Class<Windmill.GlobalDataRequest> protoMessageClass =
Windmill.GlobalDataRequest.class;
private transient Parser<Windmill.GlobalDataRequest> memoizedParser;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
new file mode 100644
index 00000000000..34c1a06d54d
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+
+/** Helper class for handling elements blocked on side inputs. */
+@SuppressWarnings("nullness" //
TODO(https://github.com/apache/beam/issues/20497)
+)
+class StreamingSideInputProcessor<InputT, W extends BoundedWindow> {
+ private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+
+ public StreamingSideInputProcessor(StreamingSideInputFetcher<InputT, W>
sideInputFetcher) {
+ this.sideInputFetcher = sideInputFetcher;
+ }
+
+ /**
+ * Handle's startBundle. If there are unblocked elements, process them and
then return the set of
+ * windows that were unblocked.
+ */
+ void tryUnblockElements(Consumer<Iterable<WindowedValue<InputT>>> consumer) {
+ sideInputFetcher.prefetchBlockedMap();
+
+ // Find the set of ready windows.
+ Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+ Iterable<BagState<WindowedValue<InputT>>> elementBags =
+ sideInputFetcher.prefetchElements(readyWindows);
+ Iterable<WindowedValue<InputT>> releasedElements =
+ Iterables.concat(Iterables.transform(elementBags, BagState::read));
+ consumer.accept(releasedElements);
+ elementBags.forEach(BagState::clear);
+ sideInputFetcher.releaseBlockedWindows(readyWindows);
+ }
+
+ void tryUnblockElementsAndTimers(
+ BiConsumer<Iterable<WindowedValue<InputT>>,
Iterable<TimerInternals.TimerData>> consumer) {
+ sideInputFetcher.prefetchBlockedMap();
+
+ // Find the set of ready windows.
+ Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+ Iterable<BagState<TimerInternals.TimerData>> timerBags =
+ sideInputFetcher.prefetchTimers(readyWindows);
+ Iterable<TimerInternals.TimerData> releasedTimers =
+ Iterables.concat(
+ Iterables.transform(sideInputFetcher.prefetchTimers(readyWindows),
BagState::read));
+ Iterable<BagState<WindowedValue<InputT>>> elementBags =
+ sideInputFetcher.prefetchElements(readyWindows);
+ Iterable<WindowedValue<InputT>> releasedElements =
+ Iterables.concat(Iterables.transform(elementBags, BagState::read));
+
+ consumer.accept(releasedElements, releasedTimers);
+ timerBags.forEach(BagState::clear);
+ elementBags.forEach(BagState::clear);
+ sideInputFetcher.releaseBlockedWindows(readyWindows);
+ }
+
+ void handleFinishBundle() {
+ sideInputFetcher.persist();
+ }
+
+ /*
+ Handle process element. Runs the elements that have an available side input,
and buffers elements for which the
+ side input is blocked. Returns the list of elements that are unblocked and
should be processed.
+ */
+ Iterator<? extends WindowedValue<InputT>> handleProcessElement(
+ WindowedValue<InputT> compressedElem) {
+ // Note: We could write this as a three-line stream expression, but side
effects are discouraged
+ // in Java streams.
+ return Iterators.filter(
+ compressedElem.explodeWindows().iterator(),
+ (WindowedValue<InputT> e) -> !sideInputFetcher.storeIfBlocked(e));
+ }
+
+ <K> WindowedValue<KeyedWorkItem<K, InputT>> handleProcessKeyedWorkItem(
+ WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+ List<WindowedValue<InputT>> readyInputs =
+ Lists.newArrayList(
+ Iterables.filter(
+ elem.getValue().elementsIterable(),
+ input -> !sideInputFetcher.storeIfBlocked(input)));
+
+ List<TimerInternals.TimerData> readyTimers =
+ Lists.newArrayList(
+ Iterables.filter(
+ elem.getValue().timersIterable(),
+ timer -> !sideInputFetcher.storeIfBlocked(timer)));
+ KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.workItem(elem.getValue().key(), readyTimers,
readyInputs);
+
+ return elem.withValue(keyedWorkItem);
+ }
+
+ void handleProcessTimer(TimerInternals.TimerData timer) {
+ // We must call this to ensure the side-input is cached for the timer.
However since a user
+ // timer can only
+ // be set via element processing (or another timer) in the same window,
the window should be
+ // unblocked once
+ // we get here.
+ Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer));
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
index a8d5975e45e..9466ad60d41 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
@@ -24,18 +24,22 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.services.dataflow.model.SideInputInfo;
import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.DoFnInfo;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
@@ -52,7 +56,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
})
class UserParDoFnFactory implements ParDoFnFactory {
static UserParDoFnFactory createDefault() {
- return new UserParDoFnFactory(new UserDoFnExtractor(),
SimpleDoFnRunnerFactory.INSTANCE);
+ return new UserParDoFnFactory(new UserDoFnExtractor(),
SimpleDoFnRunnerFactory.INSTANCE, false);
}
interface DoFnExtractor {
@@ -74,10 +78,15 @@ class UserParDoFnFactory implements ParDoFnFactory {
private final DoFnExtractor doFnExtractor;
private final DoFnRunnerFactory runnerFactory;
+ private final boolean streamingKeyedWorkItem;
- UserParDoFnFactory(DoFnExtractor doFnExtractor, DoFnRunnerFactory
runnerFactory) {
+ UserParDoFnFactory(
+ DoFnExtractor doFnExtractor,
+ DoFnRunnerFactory runnerFactory,
+ boolean streamingKeyedWorkItem) {
this.doFnExtractor = doFnExtractor;
this.runnerFactory = runnerFactory;
+ this.streamingKeyedWorkItem = streamingKeyedWorkItem;
}
@Override
@@ -144,17 +153,38 @@ class UserParDoFnFactory implements ParDoFnFactory {
writerFn.getDataCoder(),
(Coder<BoundedWindow>)
doFnInfo.getWindowingStrategy().getWindowFn().windowCoder());
} else {
- return new SimpleParDoFn(
- options,
- instanceManager,
- sideInputReader,
- doFnInfo.getMainOutput(),
- outputTupleTagsToReceiverIndices,
- stepContext,
- operationContext,
- doFnInfo.getDoFnSchemaInformation(),
- doFnInfo.getSideInputMapping(),
- runnerFactory);
+ boolean hasStreamingSideInput =
+ options.as(StreamingOptions.class).isStreaming() &&
!sideInputReader.isEmpty();
+
+ if (streamingKeyedWorkItem && hasStreamingSideInput) {
+ KeyedWorkItemCoder<byte[], KV<?, ?>> kwiCoder =
+ (KeyedWorkItemCoder<byte[], KV<?, ?>>) doFnInfo.getInputCoder();
+ return new StreamingKeyedWorkItemSideInputParDoFn<>(
+ options,
+ instanceManager,
+ sideInputReader,
+ doFnInfo.getMainOutput(),
+ outputTupleTagsToReceiverIndices,
+ stepContext,
+ operationContext,
+ doFnInfo.getDoFnSchemaInformation(),
+ doFnInfo.getSideInputMapping(),
+ runnerFactory,
+ ByteArrayCoder.of(),
+ kwiCoder.getElementCoder());
+ } else {
+ return new SimpleParDoFn(
+ options,
+ instanceManager,
+ sideInputReader,
+ doFnInfo.getMainOutput(),
+ outputTupleTagsToReceiverIndices,
+ stepContext,
+ operationContext,
+ doFnInfo.getDoFnSchemaInformation(),
+ doFnInfo.getSideInputMapping(),
+ runnerFactory);
+ }
}
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
new file mode 100644
index 00000000000..6bbbf953967
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.SideInputReader;
+import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.TupleTag;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class SimpleParDoFnHelpersTest {
+ private PipelineOptions options;
+ @Mock DoFnInstanceManager doFnInstanceManager;
+ @Mock SideInputReader sideInputReader;
+ @Mock DataflowStepContext stepContext;
+ @Mock DataflowStepContext userStepContext;
+ @Mock DataflowOperationContext operationContext;
+ @Mock DoFnRunnerFactory<String, String> runnerFactory;
+ @Mock DoFnRunner<String, String> mockRunner;
+
+ @Mock StreamingSideInputProcessor<String, GlobalWindow> sideInputProcessor;
+
+ @Mock DoFnInfo<String, String> doFnInfo;
+ @Mock CounterFactory counterFactory;
+
+ private static class TestDoFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement() {}
+ }
+
+ private TestDoFn doFn = new TestDoFn();
+
+ private SimpleParDoFnHelpers<String, String, GlobalWindow> helpers;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ options = PipelineOptionsFactory.create();
+ when(stepContext.namespacedToUser()).thenReturn(userStepContext);
+ when(operationContext.counterFactory()).thenReturn(counterFactory);
+
+ when(doFnInstanceManager.get()).thenReturn((DoFnInfo) doFnInfo);
+ when(doFnInfo.getDoFn()).thenReturn(doFn);
+
+ when(runnerFactory.createRunner(
+ any(), any(), any(), any(), any(), any(), any(), any(), any(),
any(), any(), any(),
+ any(), any()))
+ .thenReturn(mockRunner);
+
+ helpers =
+ new SimpleParDoFnHelpers<>(
+ options,
+ doFnInstanceManager,
+ sideInputReader,
+ new TupleTag<>("main"),
+ ImmutableMap.of(new TupleTag<>("main"), 0),
+ stepContext,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ ImmutableMap.of(),
+ runnerFactory);
+ }
+
+ @Test
+ public void testReallyStartBundle() throws Exception {
+ helpers.startBundle(mock(Receiver.class));
+ helpers.reallyStartBundle();
+
+ verify(runnerFactory)
+ .createRunner(
+ any(), any(), any(), any(), any(), any(), any(), any(), any(),
any(), any(), any(),
+ any(), any());
+ verify(mockRunner).startBundle();
+ }
+
+ @Test
+ public void testFinishBundle() throws Exception {
+ helpers.startBundle(mock(Receiver.class));
+ helpers.reallyStartBundle();
+
+ helpers.finishBundle(sideInputProcessor);
+
+ verify(mockRunner).finishBundle();
+ verify(sideInputProcessor).handleFinishBundle();
+ verify(doFnInstanceManager).complete(any());
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ helpers.startBundle(mock(Receiver.class));
+ helpers.reallyStartBundle();
+
+ helpers.abort();
+
+ verify(doFnInstanceManager).abort(any());
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
index 9c9f5386f44..5b1d7a8d366 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
@@ -92,7 +92,7 @@ public class SimpleParDoFnTest {
// TODO: Remove once Distributions has shipped.
options
.as(DataflowPipelineDebugOptions.class)
-
.setExperiments(Lists.newArrayList(SimpleParDoFn.OUTPUTS_PER_ELEMENT_EXPERIMENT));
+
.setExperiments(Lists.newArrayList(SimpleParDoFnHelpers.OUTPUTS_PER_ELEMENT_EXPERIMENT));
operationContext = TestOperationContext.create();
stepContext =
@@ -489,7 +489,7 @@ public class SimpleParDoFnTest {
}
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
+ public void processElement() throws Exception {
assertThat(startCalled, equalTo(true));
assertThat(tracker.getCurrentState(),
equalTo(operationContext.getProcessState()));
}
@@ -558,7 +558,7 @@ public class SimpleParDoFnTest {
public void testOutputsPerElementCounterDisabledViaExperiment() throws
Exception {
DataflowPipelineDebugOptions debugOptions =
options.as(DataflowPipelineDebugOptions.class);
List<String> experiments = debugOptions.getExperiments();
- experiments.remove(SimpleParDoFn.OUTPUTS_PER_ELEMENT_EXPERIMENT);
+ experiments.remove(SimpleParDoFnHelpers.OUTPUTS_PER_ELEMENT_EXPERIMENT);
debugOptions.setExperiments(experiments);
List<CounterUpdate> counterUpdates = executeParDoFnCounterTest(0);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
index e12ddd95f91..654707aae91 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
@@ -202,8 +202,7 @@ public class StreamingKeyedWorkItemSideInputDoFnRunnerTest {
(WindowedValue<KV<String, Integer>> windowedValue) ->
outputManager.output(mainOutputTag, windowedValue),
stepContext);
- return new StreamingKeyedWorkItemSideInputDoFnRunner<
- String, Integer, KV<String, Integer>, IntervalWindow>(
+ return new StreamingKeyedWorkItemSideInputDoFnRunner<>(
simpleDoFnRunner, keyCoder, sideInputFetcher, stepContext);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
new file mode 100644
index 00000000000..2fed6fd405a
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link StreamingKeyedWorkItemSideInputParDoFn}. */
+@RunWith(JUnit4.class)
+public class StreamingKeyedWorkItemSideInputParDoFnTest {
+ private static final FixedWindows WINDOW_FN =
FixedWindows.of(Duration.millis(10));
+ private static final TupleTag<KV<String, Integer>> MAIN_OUTPUT_TAG = new
TupleTag<>();
+
+ private final InMemoryStateInternals<String> state =
InMemoryStateInternals.forKey("a");
+
+ @Mock private StreamingModeExecutionContext.StepContext stepContext;
+ @Mock private TimerInternals mockTimerInternals;
+ @Mock private SideInputReader mockSideInputReader;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(stepContext.stateInternals()).thenReturn((StateInternals) state);
+ when(stepContext.timerInternals()).thenReturn(mockTimerInternals);
+ when(stepContext.namespacedToUser()).thenReturn(stepContext);
+ when(mockSideInputReader.isEmpty()).thenReturn(false);
+ }
+
+ @Test
+ public void testInvokeProcessElement() throws Exception {
+ PCollectionView<String> view = createView();
+
+ when(stepContext.issueSideInputFetch(
+ eq(view), any(BoundedWindow.class), eq(SideInputState.UNKNOWN)))
+ .thenReturn(false);
+ when(stepContext.issueSideInputFetch(
+ eq(view), any(BoundedWindow.class),
eq(SideInputState.KNOWN_READY)))
+ .thenReturn(true);
+
+
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(15L));
+ StreamingKeyedWorkItemSideInputParDoFn<String, Integer, KV<String,
Integer>, IntervalWindow>
+ runner = createRunner(view);
+
+ TestReceiver receiver = new TestReceiver();
+ runner.startBundle(receiver);
+
+ KeyedWorkItem<String, Integer> elemsWorkItem =
+ KeyedWorkItems.elementsWorkItem(
+ "a",
+ ImmutableList.of(
+ createDatum(13, 13L),
+ createDatum(16, 16L), // side inputs non-ready element
+ createDatum(18, 18L)));
+
+ runner.processElement(new ValueInEmptyWindows<>(elemsWorkItem));
+
+ // Initially blocked! No output.
+ assertEquals(0, receiver.outputs.size());
+
+
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(20));
+ runner.processElement(
+ new ValueInEmptyWindows<>(
+ KeyedWorkItems.<String, Integer>timersWorkItem(
+ "a",
+ ImmutableList.of(
+ timerData(window(10, 20), Instant.ofEpochMilli(19),
Timer.Type.WATERMARK)))));
+
+ // Timer is blocked too!
+ assertEquals(0, receiver.outputs.size());
+
+ // Now make it ready!
+ IntervalWindow readyWindow = window(10, 20);
+ Windmill.GlobalDataId id =
+ Windmill.GlobalDataId.newBuilder()
+ .setTag(view.getTagInternal().getId())
+ .setVersion(
+ ByteString.copyFrom(
+ CoderUtils.encodeToByteArray(IntervalWindow.getCoder(),
readyWindow)))
+ .build();
+
+ when(stepContext.getSideInputNotifications())
+ .thenReturn(Arrays.<Windmill.GlobalDataId>asList(id));
+
+ runner.finishBundle();
+
+ runner.startBundle(receiver);
+
+ // We don't check for output here because we just wanted to see if the
runner works
+ // without exceptions. The issue was lifecycle of the runner bundle
(finishBundle, startBundle).
+ }
+
+ static class TestSplittableDoFn extends DoFn<Integer, KV<String, Integer>> {
+ @ProcessElement
+ public void processElement(ProcessContext c, RestrictionTracker<String, ?>
tracker) {
+ c.output(KV.of(tracker.currentRestriction(), c.element()));
+ }
+
+ @GetInitialRestriction
+ public String getInitialRestriction(@Element Integer element) {
+ return "restriction";
+ }
+
+ @NewTracker
+ public RestrictionTracker<String, ?> newTracker(@Restriction String
restriction) {
+ return new RestrictionTracker<String, Object>() {
+ @Override
+ public boolean tryClaim(Object position) {
+ return true;
+ }
+
+ @Override
+ public String currentRestriction() {
+ return restriction;
+ }
+
+ @Override
+ public SplitResult<String> trySplit(double fractionOfRemainder) {
+ return null;
+ }
+
+ @Override
+ public void checkDone() {}
+
+ @Override
+ public IsBounded isBounded() {
+ return IsBounded.BOUNDED;
+ }
+ };
+ }
+ }
+
+ @Test
+ public void testSplittableProcessElement() throws Exception {
+ PCollectionView<String> view = createView();
+
+ when(stepContext.issueSideInputFetch(
+ eq(view), any(BoundedWindow.class), eq(SideInputState.UNKNOWN)))
+ .thenReturn(false);
+ when(stepContext.issueSideInputFetch(
+ eq(view), any(BoundedWindow.class),
eq(SideInputState.KNOWN_READY)))
+ .thenReturn(true);
+
+
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(15L));
+ StreamingKeyedWorkItemSideInputParDoFn<
+ byte[], KV<Integer, String>, KV<String, Integer>, IntervalWindow>
+ runner = createSplittableRunner(view);
+
+ TestReceiver receiver = new TestReceiver();
+ runner.startBundle(receiver);
+
+ KeyedWorkItem<byte[], KV<Integer, String>> elemsWorkItem =
+ KeyedWorkItems.elementsWorkItem(
+ new byte[] {1}, ImmutableList.of(createDatum(KV.of(13,
"restriction"), 13L)));
+
+ runner.processElement(new ValueInEmptyWindows<>(elemsWorkItem));
+
+ // Initially blocked! No output.
+ assertEquals(0, receiver.outputs.size());
+ runner.finishBundle();
+
+ // Now make it ready!
+ IntervalWindow readyWindow = window(10, 20);
+ Windmill.GlobalDataId id =
+ Windmill.GlobalDataId.newBuilder()
+ .setTag(view.getTagInternal().getId())
+ .setVersion(
+ ByteString.copyFrom(
+ CoderUtils.encodeToByteArray(IntervalWindow.getCoder(),
readyWindow)))
+ .build();
+
+ when(stepContext.getSideInputNotifications())
+ .thenReturn(Arrays.<Windmill.GlobalDataId>asList(id));
+
+ runner.startBundle(receiver);
+
+ // Note: unblocking logic would run here if the environment is fully
mocked to push
+ // blocked items back into processing. For the purpose of testing
SplittableDoFn initialization,
+ // this suffices.
+ }
+
+ private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {
+ Instant timestamp = Instant.ofEpochMilli(timestampMillis);
+ return WindowedValues.of(
+ element, timestamp, Arrays.asList(WINDOW_FN.assignWindow(timestamp)),
PaneInfo.NO_FIRING);
+ }
+
+ private TimerData timerData(IntervalWindow window, Instant timestamp,
Timer.Type type) {
+ return TimerData.of(
+ StateNamespaces.window(IntervalWindow.getCoder(), window),
+ timestamp,
+ timestamp,
+ type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME :
TimeDomain.PROCESSING_TIME,
+ CausedByDrain.NORMAL);
+ }
+
+ private IntervalWindow window(long start, long end) {
+ return new IntervalWindow(Instant.ofEpochMilli(start),
Instant.ofEpochMilli(end));
+ }
+
+ private PCollectionView<String> createView() {
+ return TestPipeline.create()
+ .apply(Create.empty(StringUtf8Coder.of()))
+ .apply(Window.<String>into(WINDOW_FN))
+ .apply(View.<String>asSingleton());
+ }
+
+ static class TestReceiver implements Receiver {
+ List<WindowedValue<KV<String, Integer>>> outputs = new ArrayList<>();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void process(Object outputElem) {
+ outputs.add((WindowedValue<KV<String, Integer>>) outputElem);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private StreamingKeyedWorkItemSideInputParDoFn<
+ String, Integer, KV<String, Integer>, IntervalWindow>
+ createRunner(PCollectionView<String> view) throws Exception {
+ Coder<String> keyCoder = StringUtf8Coder.of();
+ Coder<Integer> inputCoder = BigEndianIntegerCoder.of();
+
+ WindowingStrategy<Object, IntervalWindow> windowingStrategy =
WindowingStrategy.of(WINDOW_FN);
+
+ DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>> theDoFn =
+ new DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ KeyedWorkItem<String, Integer> kwi = c.element();
+ for (WindowedValue<Integer> wv : kwi.elementsIterable()) {
+ c.output(KV.of(kwi.key(), wv.getValue()));
+ }
+ }
+ };
+
+ DoFnInfo<KeyedWorkItem<String, Integer>, KV<String, Integer>> fnInfo =
+ DoFnInfo.forFn(
+ theDoFn,
+ windowingStrategy,
+ ImmutableList.of(view),
+ (Coder) null,
+ MAIN_OUTPUT_TAG,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+
+ DoFnRunnerFactory<KeyedWorkItem<String, Integer>, KV<String, Integer>>
runnerFactory =
+ new DoFnRunnerFactory<KeyedWorkItem<String, Integer>, KV<String,
Integer>>() {
+ @Override
+ public DoFnRunner<KeyedWorkItem<String, Integer>, KV<String,
Integer>> createRunner(
+ DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>> fn,
+ PipelineOptions options,
+ TupleTag<KV<String, Integer>> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ Iterable<PCollectionView<?>> sideInputViews,
+ SideInputReader sideInputReader,
+ Coder<KeyedWorkItem<String, Integer>> inputCoder,
+ Map<TupleTag<?>, Coder<?>> outputCoders,
+ WindowingStrategy<?, ?> windowingStrategy,
+ DataflowExecutionContext.DataflowStepContext stepContext,
+ DataflowExecutionContext.DataflowStepContext userStepContext,
+ WindowedValueMultiReceiver outputManager2,
+ DoFnSchemaInformation doFnSchemaInformation,
+ Map<String, PCollectionView<?>> sideInputMapping) {
+ return new SimpleDoFnRunner<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager2,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ inputCoder,
+ outputCoders,
+ windowingStrategy,
+ doFnSchemaInformation,
+ sideInputMapping);
+ }
+ };
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.as(StreamingOptions.class).setStreaming(true);
+
+ return new StreamingKeyedWorkItemSideInputParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ mockSideInputReader,
+ MAIN_OUTPUT_TAG,
+ ImmutableMap.of(MAIN_OUTPUT_TAG, 0),
+ stepContext,
+ TestOperationContext.create(),
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ runnerFactory,
+ keyCoder,
+ inputCoder);
+ }
+
+ @SuppressWarnings("unchecked")
+ private StreamingKeyedWorkItemSideInputParDoFn<
+ byte[], KV<Integer, String>, KV<String, Integer>, IntervalWindow>
+ createSplittableRunner(PCollectionView<String> view) throws Exception {
+ ByteArrayCoder keyCoder = ByteArrayCoder.of();
+ Coder<KV<Integer, String>> inputCoder =
+ KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of());
+
+ WindowingStrategy<Integer, IntervalWindow> windowingStrategy =
+ (WindowingStrategy) WindowingStrategy.of(WINDOW_FN);
+
+ TestSplittableDoFn theDoFn = new TestSplittableDoFn();
+
+ ProcessFn<Integer, KV<String, Integer>, String, Object, Object> processFn =
+ new ProcessFn<Integer, KV<String, Integer>, String, Object, Object>(
+ theDoFn,
+ BigEndianIntegerCoder.of(),
+ StringUtf8Coder.of(),
+ (Coder) StringUtf8Coder.of(), // watermarkEstimatorStateCoder
+ windowingStrategy,
+ Collections.emptyMap());
+ processFn.setup(PipelineOptionsFactory.create());
+
+ DoFnInfo<KeyedWorkItem<byte[], KV<Integer, String>>, KV<String, Integer>>
fnInfo =
+ DoFnInfo.forFn(
+ processFn,
+ windowingStrategy,
+ ImmutableList.of(view),
+ (Coder) null,
+ MAIN_OUTPUT_TAG,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+
+ DoFnRunnerFactory<KeyedWorkItem<byte[], KV<Integer, String>>, KV<String,
Integer>>
+ runnerFactory =
+ new DoFnRunnerFactory<
+ KeyedWorkItem<byte[], KV<Integer, String>>, KV<String,
Integer>>() {
+ @Override
+ public DoFnRunner<KeyedWorkItem<byte[], KV<Integer, String>>,
KV<String, Integer>>
+ createRunner(
+ DoFn<KeyedWorkItem<byte[], KV<Integer, String>>,
KV<String, Integer>> fn,
+ PipelineOptions options,
+ TupleTag<KV<String, Integer>> mainOutputTag,
+ List<TupleTag<?>> sideOutputTags,
+ Iterable<PCollectionView<?>> sideInputViews,
+ SideInputReader sideInputReader,
+ Coder<KeyedWorkItem<byte[], KV<Integer, String>>>
inputCoder,
+ Map<TupleTag<?>, Coder<?>> outputCoders,
+ WindowingStrategy<?, ?> windowingStrategy,
+ DataflowExecutionContext.DataflowStepContext stepContext,
+ DataflowExecutionContext.DataflowStepContext
userStepContext,
+ WindowedValueMultiReceiver outputManager2,
+ DoFnSchemaInformation doFnSchemaInformation,
+ Map<String, PCollectionView<?>> sideInputMapping) {
+
+ ProcessFn<Integer, KV<String, Integer>, String, Object,
Object> fn2 =
+ (ProcessFn<Integer, KV<String, Integer>, String, Object,
Object>) fn;
+ fn2.setStateInternalsFactory(key -> (StateInternals)
stepContext.stateInternals());
+ fn2.setTimerInternalsFactory(key ->
stepContext.timerInternals());
+ fn2.setSideInputReader(sideInputReader);
+ fn2.setProcessElementInvoker(
+ new OutputAndTimeBoundedSplittableProcessElementInvoker<
+ Integer, KV<String, Integer>, String, Object, Object>(
+ fn2.getFn(),
+ options,
+ outputManager2,
+ mainOutputTag,
+ sideInputReader,
+ Executors.newSingleThreadScheduledExecutor(),
+ 10000,
+ Duration.standardSeconds(10),
+ () -> null));
+
+ return new SimpleDoFnRunner<>(
+ options,
+ fn,
+ sideInputReader,
+ outputManager2,
+ mainOutputTag,
+ sideOutputTags,
+ stepContext,
+ inputCoder,
+ outputCoders,
+ windowingStrategy,
+ doFnSchemaInformation,
+ sideInputMapping);
+ }
+ };
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.as(StreamingOptions.class).setStreaming(true);
+
+ return new StreamingKeyedWorkItemSideInputParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ mockSideInputReader,
+ MAIN_OUTPUT_TAG,
+ ImmutableMap.of(MAIN_OUTPUT_TAG, 0),
+ stepContext,
+ TestOperationContext.create(),
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ runnerFactory,
+ keyCoder,
+ inputCoder);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
new file mode 100644
index 00000000000..19e22b03883
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.Lists;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link StreamingSideInputProcessor}. */
+@RunWith(JUnit4.class)
+public class StreamingSideInputProcessorTest {
+
+ @Mock private StreamingSideInputFetcher<String, IntervalWindow> mockFetcher;
+ private StreamingSideInputProcessor<String, IntervalWindow> processor;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ processor = new StreamingSideInputProcessor<>(mockFetcher);
+ }
+
+ @Test
+ public void testTryUnblockElementsNoReadyWindows() {
+ // Given
+ doNothing().when(mockFetcher).prefetchBlockedMap();
+ when(mockFetcher.getReadyWindows()).thenReturn(Collections.emptySet());
+
+ // When
+ processor.tryUnblockElements(unblocked -> assertThat(unblocked,
emptyIterable()));
+
+ // Then
+ verify(mockFetcher).prefetchBlockedMap();
+ verify(mockFetcher).getReadyWindows();
+ }
+
+ @Test
+ public void testTryUnblockElementsWithReadyWindows() {
+ // Given
+ IntervalWindow window1 = new IntervalWindow(Instant.ofEpochMilli(0),
Instant.ofEpochMilli(10));
+ IntervalWindow window2 = new IntervalWindow(Instant.ofEpochMilli(10),
Instant.ofEpochMilli(20));
+ Set<IntervalWindow> readyWindows = new HashSet<>(Arrays.asList(window1,
window2));
+
+ WindowedValue<String> element1 =
+ WindowedValues.of(
+ "e1", Instant.ofEpochMilli(5), Arrays.asList(window1),
PaneInfo.NO_FIRING);
+ WindowedValue<String> element2 =
+ WindowedValues.of(
+ "e2", Instant.ofEpochMilli(15), Arrays.asList(window2),
PaneInfo.NO_FIRING);
+
+ @SuppressWarnings("unchecked")
+ BagState<WindowedValue<String>> mockBag1 = mock(BagState.class);
+ @SuppressWarnings("unchecked")
+ BagState<WindowedValue<String>> mockBag2 = mock(BagState.class);
+
+ when(mockBag1.read()).thenReturn(Arrays.asList(element1));
+ when(mockBag2.read()).thenReturn(Arrays.asList(element2));
+
+ doNothing().when(mockFetcher).prefetchBlockedMap();
+ when(mockFetcher.getReadyWindows()).thenReturn(readyWindows);
+
when(mockFetcher.prefetchElements(readyWindows)).thenReturn(Arrays.asList(mockBag1,
mockBag2));
+ doNothing().when(mockFetcher).releaseBlockedWindows(readyWindows);
+
+ // When
+ processor.tryUnblockElements(
+ unblocked -> assertThat(unblocked, containsInAnyOrder(element1,
element2)));
+
+ // Then
+ verify(mockFetcher).prefetchBlockedMap();
+ verify(mockFetcher).getReadyWindows();
+ verify(mockFetcher).prefetchElements(readyWindows);
+ verify(mockBag1).read();
+ verify(mockBag1).clear();
+ verify(mockBag2).read();
+ verify(mockBag2).clear();
+ verify(mockFetcher).releaseBlockedWindows(readyWindows);
+ }
+
+ @Test
+ public void testHandleFinishBundle() {
+ // Given
+ doNothing().when(mockFetcher).persist();
+
+ // When
+ processor.handleFinishBundle();
+
+ // Then
+ verify(mockFetcher).persist();
+ }
+
+ @Test
+ public void testHandleProcessElementBlocked() {
+ // Given
+ IntervalWindow window = new IntervalWindow(Instant.ofEpochMilli(0),
Instant.ofEpochMilli(10));
+ WindowedValue<String> compressedElement =
+ WindowedValues.of("e", Instant.ofEpochMilli(5), Arrays.asList(window),
PaneInfo.NO_FIRING);
+
+
when(mockFetcher.storeIfBlocked(any(WindowedValue.class))).thenReturn(true);
+
+ // When
+ Iterator<? extends WindowedValue<String>> unblocked =
+ processor.handleProcessElement(compressedElement);
+
+ // Then
+ assertFalse(unblocked.hasNext());
+ for (WindowedValue<String> exploded : compressedElement.explodeWindows()) {
+ verify(mockFetcher).storeIfBlocked(exploded);
+ }
+ }
+
+ @Test
+ public void testHandleProcessElementUnblocked() {
+ // Given
+ IntervalWindow window1 = new IntervalWindow(Instant.ofEpochMilli(0),
Instant.ofEpochMilli(10));
+ IntervalWindow window2 = new IntervalWindow(Instant.ofEpochMilli(10),
Instant.ofEpochMilli(20));
+ WindowedValue<String> compressedElement =
+ WindowedValues.of(
+ "e", Instant.ofEpochMilli(5), Arrays.asList(window1, window2),
PaneInfo.NO_FIRING);
+
+
when(mockFetcher.storeIfBlocked(any(WindowedValue.class))).thenReturn(false);
+
+ // When
+ Iterator<? extends WindowedValue<String>> unblocked =
+ processor.handleProcessElement(compressedElement);
+ // Then
+ assertThat(
+ Lists.newArrayList(unblocked),
+ containsInAnyOrder(
+ Iterables.toArray(compressedElement.explodeWindows(),
WindowedValue.class)));
+ for (WindowedValue<String> exploded : compressedElement.explodeWindows()) {
+ verify(mockFetcher).storeIfBlocked(exploded);
+ }
+ }
+
+ @Test
+ public void testHandleProcessTimerSuccess() {
+ // Given
+ TimerData testTimer =
+ TimerData.of(
+ StateNamespaces.global(),
+ Instant.ofEpochMilli(1000),
+ Instant.ofEpochMilli(2000),
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
+ when(mockFetcher.storeIfBlocked(testTimer)).thenReturn(false);
+
+ // When
+ processor.handleProcessTimer(testTimer);
+
+ // Then
+ verify(mockFetcher).storeIfBlocked(testTimer);
+ }
+
+ @Test
+ public void testHandleProcessTimerThrowsPreconditionFail() {
+ // Given
+ TimerData testTimer =
+ TimerData.of(
+ StateNamespaces.global(),
+ Instant.ofEpochMilli(1000),
+ Instant.ofEpochMilli(2000),
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
+ when(mockFetcher.storeIfBlocked(testTimer)).thenReturn(true);
+
+ // When & Then
+ assertThrows(IllegalStateException.class, () ->
processor.handleProcessTimer(testTimer));
+ verify(mockFetcher).storeIfBlocked(testTimer);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index 9d3fa9b211b..43331d11a7e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -323,10 +323,6 @@ public class UserParDoFnFactoryTest {
private CloudObject getCloudObject(DoFn<?, ?> fn, WindowingStrategy<?, ?>
windowingStrategy) {
CloudObject object = CloudObject.forClassName("DoFn");
- @SuppressWarnings({
- "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
- "unchecked"
- })
DoFnInfo<?, ?> info =
DoFnInfo.forFn(
fn,
@@ -377,13 +373,14 @@ public class UserParDoFnFactoryTest {
Receiver rcvr = new OutputReceiver();
parDoFn.startBundle(rcvr);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(10));
+ IntervalWindow firstWindow =
+ new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(10));
parDoFn.processElement(
- WindowedValues.of("foo", new Instant(1), firstWindow,
PaneInfo.NO_FIRING));
+ WindowedValues.of("foo", Instant.ofEpochMilli(1), firstWindow,
PaneInfo.NO_FIRING));
verify(stepContext)
.setStateCleanupTimer(
- SimpleParDoFn.CLEANUP_TIMER_ID,
+ SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
firstWindow,
IntervalWindow.getCoder(),
firstWindow.maxTimestamp().plus(Duration.millis(1L)),
@@ -436,14 +433,14 @@ public class UserParDoFnFactoryTest {
GlobalWindow globalWindow = GlobalWindow.INSTANCE;
parDoFn.processElement(
- WindowedValues.of("foo", new Instant(1), globalWindow,
PaneInfo.NO_FIRING));
+ WindowedValues.of("foo", Instant.ofEpochMilli(1), globalWindow,
PaneInfo.NO_FIRING));
assertThat(
globalWindow.maxTimestamp().plus(allowedLateness),
greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE));
verify(stepContext)
.setStateCleanupTimer(
- SimpleParDoFn.CLEANUP_TIMER_ID,
+ SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
globalWindow,
GlobalWindow.Coder.INSTANCE,
BoundedWindow.TIMESTAMP_MAX_VALUE,
@@ -459,7 +456,7 @@ public class UserParDoFnFactoryTest {
when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE))
.thenReturn(
TimerData.of(
- SimpleParDoFn.CLEANUP_TIMER_ID,
+ SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
globalWindowNamespace,
BoundedWindow.TIMESTAMP_MAX_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
@@ -516,8 +513,10 @@ public class UserParDoFnFactoryTest {
Receiver rcvr = new OutputReceiver();
parDoFn.startBundle(rcvr);
- IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new
Instant(9));
- IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new
Instant(19));
+ IntervalWindow firstWindow =
+ new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(9));
+ IntervalWindow secondWindow =
+ new IntervalWindow(Instant.ofEpochMilli(10), Instant.ofEpochMilli(19));
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
StateNamespace firstWindowNamespace = StateNamespaces.window(windowCoder,
firstWindow);
@@ -535,7 +534,7 @@ public class UserParDoFnFactoryTest {
when(stepContext.getNextFiredTimer(windowCoder))
.thenReturn(
TimerData.of(
- SimpleParDoFn.CLEANUP_TIMER_ID,
+ SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
firstWindowNamespace,
firstWindow.maxTimestamp().plus(Duration.millis(1L)),
firstWindow.maxTimestamp().plus(Duration.millis(1L)),
@@ -552,7 +551,7 @@ public class UserParDoFnFactoryTest {
when(stepContext.getNextFiredTimer((Coder) windowCoder))
.thenReturn(
TimerData.of(
- SimpleParDoFn.CLEANUP_TIMER_ID,
+ SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
secondWindowNamespace,
secondWindow.maxTimestamp().plus(Duration.millis(1L)),
secondWindow.maxTimestamp().plus(Duration.millis(1L)),
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 9357515f36c..03049f29331 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -162,6 +162,9 @@ def sickbayTests = [
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
+ // Triggers index-out-of-bound error in Prism
+ 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTimerSideInput',
+
// Missing output due to processing time timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
new file mode 100644
index 00000000000..8320c1451d1
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Category tag for validation tests which use sideinputs in OnTimer and
OnWindowExpiration. Tests
+ * tagged with {@link UsesSideInputsInTimer} should be run for runners which
support sideinputs.
+ */
+@Internal
+public class UsesSideInputsInTimer {}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a366ded4fe2..bfd04908b99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -359,6 +359,14 @@ public abstract class DoFn<InputT extends @Nullable
Object, OutputT extends @Nul
/** Returns the time domain of the current timer. */
public abstract TimeDomain timeDomain();
+ /**
+ * Returns the value of the side input.
+ *
+ * @throws IllegalArgumentException if this is not a side input
+ */
+ @Pure
+ public abstract <T> T sideInput(PCollectionView<T> view);
+
@Pure
public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain();
}
@@ -368,6 +376,14 @@ public abstract class DoFn<InputT extends @Nullable
Object, OutputT extends @Nul
/** Returns the window in which the window expiration is firing. */
@Pure
public abstract BoundedWindow window();
+
+ /**
+ * Returns the value of the side input.
+ *
+ * @throws IllegalArgumentException if this is not a side input
+ */
+ @Pure
+ public abstract <T> T sideInput(PCollectionView<T> view);
}
/**
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 0bd2c1c888f..2983fc94021 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -197,7 +197,8 @@ public class DoFnSignatures {
Parameter.TimerIdParameter.class,
Parameter.FireTimestampParameter.class,
Parameter.CausedByDrainParameter.class,
- Parameter.KeyParameter.class);
+ Parameter.KeyParameter.class,
+ Parameter.SideInputParameter.class);
private static final ImmutableList<Class<? extends Parameter>>
ALLOWED_ON_TIMER_FAMILY_PARAMETERS =
@@ -215,7 +216,8 @@ public class DoFnSignatures {
Parameter.TimerIdParameter.class,
Parameter.FireTimestampParameter.class,
Parameter.CausedByDrainParameter.class,
- Parameter.KeyParameter.class);
+ Parameter.KeyParameter.class,
+ Parameter.SideInputParameter.class);
private static final Collection<Class<? extends Parameter>>
ALLOWED_ON_WINDOW_EXPIRATION_PARAMETERS =
@@ -226,7 +228,8 @@ public class DoFnSignatures {
Parameter.TaggedOutputReceiverParameter.class,
Parameter.StateParameter.class,
Parameter.TimestampParameter.class,
- Parameter.KeyParameter.class);
+ Parameter.KeyParameter.class,
+ Parameter.SideInputParameter.class);
private static final Collection<Class<? extends Parameter>>
ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS =
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 8a273127b4f..0c984d01c8f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -114,6 +114,7 @@ import org.apache.beam.sdk.testing.UsesProcessingTimeTimers;
import org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput;
import org.apache.beam.sdk.testing.UsesSetState;
import org.apache.beam.sdk.testing.UsesSideInputs;
+import org.apache.beam.sdk.testing.UsesSideInputsInTimer;
import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
@@ -3678,6 +3679,154 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTestStream.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testTimerSideInput() {
+ // SideInput tag id
+ final String sideInputTag1 = "tag1";
+
+ final PCollectionView<Integer> sideInput =
+ pipeline
+ .apply("CreateSideInput1", Create.of(2))
+ .apply("ViewSideInput1", View.asSingleton());
+
+ DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+ new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("foo")
+ private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(@Timestamp Instant ts, @TimerId("timer") Timer
timer) {
+ timer.align(Duration.standardSeconds(10)).setRelative();
+ }
+
+ @OnTimer("timer")
+ public void onTimer(
+ OutputReceiver<KV<Integer, Integer>> o,
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+
+ @OnWindowExpiration
+ public void onWindowExpiration(
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ OutputReceiver<KV<Integer, Integer>> o,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+ };
+
+ final int numTestElements = 10;
+ final Instant now = new Instant(0);
+ TestStream.Builder<KV<Integer, Integer>> builder =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder =
+ builder.addElements(
+ TimestampedValue.of(KV.of(i % 2, i),
now.plus(Duration.millis(i * 1000))));
+ if ((i + 1) % 10 == 0) {
+ builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i +
1) * 1000)));
+ }
+ }
+ List<KV<Integer, Integer>> expected =
+ IntStream.rangeClosed(0, 1)
+ .boxed()
+ .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i,
2)).stream())
+ .collect(Collectors.toList());
+
+ PCollection<KV<Integer, Integer>> output =
+ pipeline
+ .apply(builder.advanceWatermarkToInfinity())
+ .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+ PAssert.that(output).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class
+ })
+ public void testSideInputNotReadyTimer() {
+ final String sideInputTag = "tag1";
+
+ // Create a side input that is delayed by 5 seconds using Thread.sleep
+ DoFn<KV<String, String>, String> delayFn =
+ new DoFn<KV<String, String>, String>() {
+ @ProcessElement
+ public void process(OutputReceiver<String> o) throws
InterruptedException {
+ Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
+ o.output("side-value");
+ }
+ };
+
+ PCollectionView<String> sideInput =
+ pipeline
+ .apply("CreateSideSource", Create.of(KV.of("dummyKey", "")))
+ .apply("DelaySideInput", ParDo.of(delayFn))
+ .apply(View.asSingleton());
+
+ // Main input in global window
+ DoFn<KV<String, String>, String> fn =
+ new DoFn<KV<String, String>, String>() {
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("dummy")
+ private final StateSpec<ValueState<Integer>> dummy =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(
+ @Timestamp Instant ts,
+ @TimerId("timer") Timer timer,
+ @DoFn.SideInput(sideInputTag) String sideInputValue,
+ OutputReceiver<String> o) {
+ // Set timer to fire at current timestamp + 1 millis
+ timer.offset(Duration.millis(1)).setRelative();
+ o.output(sideInputValue);
+ }
+
+ @OnTimer("timer")
+ public void onTimer(
+ OutputReceiver<String> o, @DoFn.SideInput(sideInputTag) String
sideInputValue) {
+ o.output(sideInputValue);
+ }
+
+ @OnWindowExpiration
+ public void onWindowExpiration(
+ OutputReceiver<String> o, @DoFn.SideInput(sideInputTag) String
sideInputValue) {
+ o.output(sideInputValue);
+ }
+ };
+
+ PCollection<String> output =
+ pipeline
+ .apply("CreateMainKV", Create.of(KV.of("key", "main-elem")))
+ .apply(ParDo.of(fn).withSideInput(sideInputTag, sideInput));
+
+ PAssert.that(output).containsInAnyOrder("side-value", "side-value",
"side-value");
+ pipeline.run();
+ }
+
@Test
@Category({
ValidatesRunner.class,
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index d92a84ea9ff..3e4675ab074 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2362,6 +2362,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
return currentWindow;
}
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return stateAccessor.get(view, currentWindow);
+ }
+
@Override
public OutputBuilder<OutputT> builder(OutputT value) {
return WindowedValues.<OutputT>builder()
@@ -2489,6 +2494,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
return (K) currentTimer.getUserKey();
}
+ @Override
+ public @Nullable Object sideInput(String tagId) {
+ PCollectionView<?> view = sideInputMapping.get(tagId);
+ if (view == null) {
+ throw new IllegalArgumentException("Unknown side input: " + tagId);
+ }
+ return stateAccessor.get(view, currentWindow);
+ }
+
@Override
public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
return context;
@@ -2649,6 +2663,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
return currentWindow;
}
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return stateAccessor.get(view, currentWindow);
+ }
+
@Override
public CausedByDrain causedByDrain() {
return causedByDrain;
@@ -2800,6 +2819,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
return currentTimer.getFireTimestamp();
}
+ @Override
+ public @Nullable Object sideInput(String tagId) {
+ PCollectionView<?> view = sideInputMapping.get(tagId);
+ if (view == null) {
+ throw new IllegalArgumentException("Unknown side input: " + tagId);
+ }
+ return stateAccessor.get(view, currentWindow);
+ }
+
@Override
public K key() {
return (K) currentTimer.getUserKey();