[ https://issues.apache.org/jira/browse/BEAM-4653?focusedWorklogId=125236&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-125236 ]
ASF GitHub Bot logged work on BEAM-4653: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Jul/18 20:24 Start Date: 19/Jul/18 20:24 Worklog Time Spent: 10m Work Description: lukecwik closed pull request #5898: [BEAM-4653] Add support to the Java SDK harness to execute timers. URL: https://github.com/apache/beam/pull/5898 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 6003aa23f0d..433533ed4fd 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -961,8 +961,24 @@ public TimerInternalsTimer( @Override public void set(Instant target) { - verifyAbsoluteTimeDomain(); - verifyTargetTime(target); + // Verifies that the time domain of this timer is acceptable for absolute timers. + if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + throw new IllegalStateException( + "Can only set relative timers in processing time domain. Use #setRelative()"); + } + + // Ensures that the target time is reasonable. For event time timers this means that the time + // should be prior to window GC time. + if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { + Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); + checkArgument( + !target.isAfter(windowExpiry), + "Attempted to set event time timer for %s but that is after" + + " the expiration of window %s", + target, + windowExpiry); + } + setUnderlyingTimer(target); } @@ -1006,30 +1022,6 @@ private Instant minTargetAndGcTime(Instant target) { return target; } - /** - * Ensures that the target time is reasonable. For event time timers this means that the time - * should be prior to window GC time. - */ - private void verifyTargetTime(Instant target) { - if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - Instant windowExpiry = window.maxTimestamp().plus(allowedLateness); - checkArgument( - !target.isAfter(windowExpiry), - "Attempted to set event time timer for %s but that is after" - + " the expiration of window %s", - target, - windowExpiry); - } - } - - /** Verifies that the time domain of this timer is acceptable for absolute timers. */ - private void verifyAbsoluteTimeDomain() { - if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) { - throw new IllegalStateException( - "Cannot only set relative timers in processing time domain." + " Use #setRelative()"); - } - } - /** * Sets the timer for the target time without checking anything about whether it is a reasonable * thing to do. For example, absolute processing time timers are not really sensible since the diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index bdf20048bab..ce6b2db3370 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -59,4 +59,5 @@ dependencies { testCompile library.java.junit testCompile library.java.mockito_core testCompile library.java.slf4j_jdk14 + testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest") } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index cb3c731545c..6ec31551288 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -22,7 +22,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -87,7 +87,7 @@ Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index 8d5ab91d25d..02f3babc817 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -22,7 +22,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; @@ -81,7 +81,7 @@ Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java index 1d40ca3dad9..323edfb3371 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java @@ -21,7 +21,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -79,7 +79,7 @@ Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java index 178c4c1c0d8..e2d21b71ac7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java @@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -125,7 +125,7 @@ void finishBundle() throws Exception { Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java index f7dcb5fd79d..61a341eaed2 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/DoFnPTransformRunnerFactory.java @@ -24,7 +24,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.io.IOException; import java.util.Map; @@ -41,16 +40,22 @@ import org.apache.beam.runners.core.construction.PCollectionViewTranslation; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; +import org.apache.beam.runners.core.construction.Timer; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.fn.function.ThrowingRunnable; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -66,6 +71,9 @@ void processElement(WindowedValue<T> input) throws Exception; + void processTimer( + String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> input); + void finishBundle() throws Exception; } @@ -80,7 +88,7 @@ public final RunnerT createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) { @@ -103,12 +111,28 @@ public final RunnerT createRunnerForPTransform( addStartFunction.accept(runner::startBundle); Iterable<String> mainInput = Sets.difference( - pTransform.getInputsMap().keySet(), context.parDoPayload.getSideInputsMap().keySet()); + pTransform.getInputsMap().keySet(), + Sets.union( + context.parDoPayload.getSideInputsMap().keySet(), + context.parDoPayload.getTimerSpecsMap().keySet())); for (String localInputName : mainInput) { pCollectionIdsToConsumers.put( pTransform.getInputsOrThrow(localInputName), (FnDataReceiver) (FnDataReceiver<WindowedValue<TransformInputT>>) runner::processElement); } + + // Register as a consumer for each timer PCollection. + for (String localName : context.parDoPayload.getTimerSpecsMap().keySet()) { + TimeDomain timeDomain = + DoFnSignatures.getTimerSpecOrThrow( + context.doFnSignature.timerDeclarations().get(localName), context.doFn) + .getTimeDomain(); + pCollectionIdsToConsumers.put( + pTransform.getInputsOrThrow(localName), + (timer) -> + runner.processTimer(localName, timeDomain, (WindowedValue<KV<Object, Timer>>) timer)); + } + addFinishFunction.accept(runner::finishBundle); return runner; } @@ -123,6 +147,7 @@ public final RunnerT createRunnerForPTransform( final Supplier<String> processBundleInstructionId; final RehydratedComponents rehydratedComponents; final DoFn<InputT, OutputT> doFn; + final DoFnSignature doFnSignature; final TupleTag<OutputT> mainOutputTag; final Coder<?> inputCoder; final Coder<?> keyCoder; @@ -131,7 +156,7 @@ public final RunnerT createRunnerForPTransform( final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap; Map<TupleTag<?>, Coder<?>> outputCoders; final ParDoPayload parDoPayload; - final ListMultimap<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> tagToConsumer; + final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer; final BundleSplitListener splitListener; Context( @@ -143,7 +168,7 @@ public final RunnerT createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, BundleSplitListener splitListener) { this.pipelineOptions = pipelineOptions; this.beamFnStateClient = beamFnStateClient; @@ -155,17 +180,23 @@ public final RunnerT createRunnerForPTransform( try { rehydratedComponents = RehydratedComponents.forComponents( - RunnerApi.Components.newBuilder() - .putAllCoders(coders) - .putAllWindowingStrategies(windowingStrategies) - .build()); + RunnerApi.Components.newBuilder() + .putAllCoders(coders) + .putAllPcollections(pCollections) + .putAllWindowingStrategies(windowingStrategies) + .build()) + .withPipeline(Pipeline.create()); parDoPayload = ParDoPayload.parseFrom(pTransform.getSpec().getPayload()); doFn = (DoFn) ParDoTranslation.getDoFn(parDoPayload); + doFnSignature = DoFnSignatures.signatureForDoFn(doFn); mainOutputTag = (TupleTag) ParDoTranslation.getMainOutputTag(parDoPayload); String mainInputTag = Iterables.getOnlyElement( Sets.difference( - pTransform.getInputsMap().keySet(), parDoPayload.getSideInputsMap().keySet())); + pTransform.getInputsMap().keySet(), + Sets.union( + parDoPayload.getSideInputsMap().keySet(), + parDoPayload.getTimerSpecsMap().keySet()))); PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); inputCoder = rehydratedComponents.getCoder(mainInput.getCoderId()); if (inputCoder instanceof KvCoder @@ -225,13 +256,13 @@ public final RunnerT createRunnerForPTransform( throw new IllegalArgumentException("Malformed ParDoPayload", exn); } - ImmutableListMultimap.Builder<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> - tagToConsumerBuilder = ImmutableListMultimap.builder(); + ImmutableListMultimap.Builder<String, FnDataReceiver<WindowedValue<?>>> + localNameToConsumerBuilder = ImmutableListMultimap.builder(); for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { - tagToConsumerBuilder.putAll( - new TupleTag<>(entry.getKey()), pCollectionIdsToConsumers.get(entry.getValue())); + localNameToConsumerBuilder.putAll( + entry.getKey(), pCollectionIdsToConsumers.get(entry.getValue())); } - tagToConsumer = tagToConsumerBuilder.build(); + localNameToConsumer = localNameToConsumerBuilder.build(); tagToSideInputSpecMap = tagToSideInputSpecMapBuilder.build(); this.splitListener = splitListener; } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java index f50c49944c6..8b764206cfe 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FlattenRunner.java @@ -22,7 +22,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; @@ -65,7 +65,7 @@ Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 392bc46eac5..b8a02f673a6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -23,6 +23,7 @@ import com.google.auto.service.AutoService; import com.google.common.collect.ImmutableMap; +import java.io.IOException; import java.util.Collection; import java.util.Iterator; import java.util.Map; @@ -30,7 +31,9 @@ import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.Context; import org.apache.beam.fn.harness.state.FnApiStateAccessor; import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.options.PipelineOptions; @@ -39,7 +42,6 @@ import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; @@ -50,15 +52,19 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.FieldAccessDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RowParameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.DateTimeUtils; +import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -91,16 +97,16 @@ private final Context<InputT, OutputT> context; private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers; private FnApiStateAccessor stateAccessor; - private final DoFnSignature doFnSignature; private final DoFnInvoker<InputT, OutputT> doFnInvoker; private final DoFn<InputT, OutputT>.StartBundleContext startBundleContext; private final ProcessBundleContext processContext; + private final OnTimerContext onTimerContext; private final DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext; /** Only valid during {@link #processElement}, null otherwise. */ private WindowedValue<InputT> currentElement; - /** Only valid during {@link #processElement}, null otherwise. */ + /** Only valid during {@link #processElement} and {@link #processTimer}, null otherwise. */ private BoundedWindow currentWindow; /** Following fields are only valid if a Schema is set, null otherwise. */ @@ -109,13 +115,18 @@ @Nullable private final SchemaCoder<OutputT> mainOutputSchemaCoder; @Nullable private final FieldAccessDescriptor fieldAccessDescriptor; + /** Only valid during {@link #processTimer}, null otherwise. */ + private WindowedValue<KV<Object, Timer>> currentTimer; + + /** Only valid during {@link #processTimer}, null otherwise. */ + private TimeDomain currentTimeDomain; + FnApiDoFnRunner(Context<InputT, OutputT> context) { this.context = context; this.mainOutputConsumers = (Collection<FnDataReceiver<WindowedValue<OutputT>>>) - (Collection) context.tagToConsumer.get(context.mainOutputTag); - this.doFnSignature = DoFnSignatures.signatureForDoFn(context.doFn); + (Collection) context.localNameToConsumer.get(context.mainOutputTag.getId()); this.doFnInvoker = DoFnInvokers.invokerFor(context.doFn); this.doFnInvoker.invokeSetup(); @@ -127,7 +138,8 @@ public PipelineOptions getPipelineOptions() { } }; this.processContext = new ProcessBundleContext(); - finishBundleContext = + this.onTimerContext = new OnTimerContext(); + this.finishBundleContext = this.context.doFn.new FinishBundleContext() { @Override public PipelineOptions getPipelineOptions() { @@ -145,7 +157,7 @@ public void output(OutputT output, Instant timestamp, BoundedWindow window) { public <T> void output( TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) context.tagToConsumer.get(tag); + (Collection) context.localNameToConsumer.get(tag.getId()); if (consumers == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } @@ -235,6 +247,25 @@ public void processElement(WindowedValue<InputT> elem) { } } + @Override + public void processTimer( + String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> timer) { + currentTimer = timer; + currentTimeDomain = timeDomain; + try { + Iterator<BoundedWindow> windowIterator = + (Iterator<BoundedWindow>) timer.getWindows().iterator(); + while (windowIterator.hasNext()) { + currentWindow = windowIterator.next(); + doFnInvoker.invokeOnTimer(timerId, onTimerContext); + } + } finally { + currentTimer = null; + currentTimeDomain = null; + currentWindow = null; + } + } + @Override public void finishBundle() { doFnInvoker.invokeFinishBundle(finishBundleContext); @@ -256,6 +287,125 @@ public void finishBundle() { } } + private class FnApiTimer implements org.apache.beam.sdk.state.Timer { + private final String timerId; + private final TimeDomain timeDomain; + private final Instant currentTimestamp; + private final Duration allowedLateness; + private final WindowedValue<?> currentElementOrTimer; + + private Duration period = Duration.ZERO; + private Duration offset = Duration.ZERO; + + FnApiTimer(String timerId, WindowedValue<KV<?, ?>> currentElementOrTimer) { + this.timerId = timerId; + this.currentElementOrTimer = currentElementOrTimer; + + TimerDeclaration timerDeclaration = context.doFnSignature.timerDeclarations().get(timerId); + this.timeDomain = + DoFnSignatures.getTimerSpecOrThrow(timerDeclaration, context.doFn).getTimeDomain(); + + switch (timeDomain) { + case EVENT_TIME: + this.currentTimestamp = currentElementOrTimer.getTimestamp(); + break; + case PROCESSING_TIME: + this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis()); + break; + case SYNCHRONIZED_PROCESSING_TIME: + this.currentTimestamp = new Instant(DateTimeUtils.currentTimeMillis()); + break; + default: + throw new IllegalArgumentException(String.format("Unknown time domain %s", timeDomain)); + } + + try { + this.allowedLateness = + context + .rehydratedComponents + .getPCollection(context.pTransform.getInputsOrThrow(timerId)) + .getWindowingStrategy() + .getAllowedLateness(); + } catch (IOException e) { + throw new IllegalArgumentException( + String.format("Unable to get allowed lateness for timer %s", timerId)); + } + } + + @Override + public void set(Instant absoluteTime) { + // Verifies that the time domain of this timer is acceptable for absolute timers. + if (!TimeDomain.EVENT_TIME.equals(timeDomain)) { + throw new IllegalArgumentException( + "Can only set relative timers in processing time domain. Use #setRelative()"); + } + + // Ensures that the target time is reasonable. For event time timers this means that the time + // should be prior to window GC time. + if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness); + checkArgument( + !absoluteTime.isAfter(windowExpiry), + "Attempted to set event time timer for %s but that is after" + + " the expiration of window %s", + absoluteTime, + windowExpiry); + } + + output(absoluteTime); + } + + @Override + public void setRelative() { + Instant target; + if (period.equals(Duration.ZERO)) { + target = currentTimestamp.plus(offset); + } else { + long millisSinceStart = currentTimestamp.plus(offset).getMillis() % period.getMillis(); + target = + millisSinceStart == 0 + ? currentTimestamp + : currentTimestamp.plus(period).minus(millisSinceStart); + } + target = minTargetAndGcTime(target); + output(target); + } + + @Override + public org.apache.beam.sdk.state.Timer offset(Duration offset) { + this.offset = offset; + return this; + } + + @Override + public org.apache.beam.sdk.state.Timer align(Duration period) { + this.period = period; + return this; + } + + /** + * For event time timers the target time should be prior to window GC time. So it returns + * min(time to set, GC Time of window). + */ + private Instant minTargetAndGcTime(Instant target) { + if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + Instant windowExpiry = LateDataUtils.garbageCollectionTime(currentWindow, allowedLateness); + if (target.isAfter(windowExpiry)) { + return windowExpiry; + } + } + return target; + } + + private void output(Instant scheduledTime) { + Object key = ((KV) currentElementOrTimer.getValue()).getKey(); + Collection<FnDataReceiver<WindowedValue<KV<Object, Timer>>>> consumers = + (Collection) context.localNameToConsumer.get(timerId); + + outputTo(consumers, currentElementOrTimer.withValue(KV.of(key, Timer.of(scheduledTime)))); + } + } + /** * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}. */ @@ -333,17 +483,18 @@ public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { @Override public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException("TODO: Add support for timers"); + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); } @Override public RestrictionTracker<?, ?> restrictionTracker() { - throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); } @Override public State state(String stateId) { - StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId); + StateDeclaration stateDeclaration = context.doFnSignature.stateDeclarations().get(stateId); checkNotNull(stateDeclaration, "No state declaration found for %s", stateId); StateSpec<?> spec; try { @@ -355,8 +506,13 @@ public State state(String stateId) { } @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("TODO: Add support for timers"); + public org.apache.beam.sdk.state.Timer timer(String timerId) { + checkState( + currentElement.getValue() instanceof KV, + "Accessing timer in unkeyed context. Current element is not a KV: %s.", + currentElement.getValue()); + + return new FnApiTimer(timerId, (WindowedValue) currentElement); } @Override @@ -387,7 +543,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { @Override public <T> void output(TupleTag<T> tag, T output) { Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) context.tagToConsumer.get(tag); + (Collection) context.localNameToConsumer.get(tag.getId()); if (consumers == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } @@ -400,7 +556,7 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { @Override public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { Collection<FnDataReceiver<WindowedValue<T>>> consumers = - (Collection) context.tagToConsumer.get(tag); + (Collection) context.localNameToConsumer.get(tag.getId()); if (consumers == null) { throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); } @@ -433,4 +589,178 @@ public void updateWatermark(Instant watermark) { throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); } } + + /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */ + private class OnTimerContext extends DoFn<InputT, OutputT>.OnTimerContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private OnTimerContext() { + context.doFn.super(); + } + + @Override + public BoundedWindow window() { + return currentWindow; + } + + @Override + public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access paneInfo outside of @ProcessElement methods."); + } + + @Override + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public InputT element(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("Element parameters are not supported."); + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp(); + } + + @Override + public Row asRow(@Nullable String id) { + throw new UnsupportedOperationException( + "Cannot access element outside of @ProcessElement method."); + } + + @Override + public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { + return timeDomain(); + } + + @Override + public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { + return DoFnOutputReceivers.windowedReceiver(this, null); + } + + @Override + public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { + return DoFnOutputReceivers.rowReceiver(this, null, mainOutputSchemaCoder); + } + + @Override + public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { + return DoFnOutputReceivers.windowedMultiReceiver(this); + } + + @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public RestrictionTracker<?, ?> restrictionTracker() { + throw new UnsupportedOperationException("RestrictionTracker parameters are not supported."); + } + + @Override + public State state(String stateId) { + StateDeclaration stateDeclaration = context.doFnSignature.stateDeclarations().get(stateId); + checkNotNull(stateDeclaration, "No state declaration found for %s", stateId); + StateSpec<?> spec; + try { + spec = (StateSpec<?>) stateDeclaration.field().get(context.doFn); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + return spec.bind(stateId, stateAccessor); + } + + @Override + public org.apache.beam.sdk.state.Timer timer(String timerId) { + checkState( + currentTimer.getValue() instanceof KV, + "Accessing timer in unkeyed context. Current timer is not a KV: %s.", + currentTimer); + + return new FnApiTimer(timerId, (WindowedValue) currentTimer); + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return context.pipelineOptions; + } + + @Override + public void output(OutputT output) { + outputTo( + mainOutputConsumers, + WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, PaneInfo.NO_FIRING)); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + checkArgument( + !currentTimer.getTimestamp().isAfter(timestamp), + "Output time %s can not be before timer timestamp %s.", + timestamp, + currentTimer.getTimestamp()); + outputTo( + mainOutputConsumers, + WindowedValue.of(output, timestamp, currentWindow, PaneInfo.NO_FIRING)); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + Collection<FnDataReceiver<WindowedValue<T>>> consumers = + (Collection) context.localNameToConsumer.get(tag.getId()); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo( + consumers, + WindowedValue.of(output, currentTimer.getTimestamp(), currentWindow, PaneInfo.NO_FIRING)); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + checkArgument( + !currentTimer.getTimestamp().isAfter(timestamp), + "Output time %s can not be before timer timestamp %s.", + timestamp, + currentTimer.getTimestamp()); + Collection<FnDataReceiver<WindowedValue<T>>> consumers = + (Collection) context.localNameToConsumer.get(tag.getId()); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, WindowedValue.of(output, timestamp, currentWindow, PaneInfo.NO_FIRING)); + } + + @Override + public TimeDomain timeDomain() { + return currentTimeDomain; + } + + @Override + public Instant timestamp() { + return currentTimer.getTimestamp(); + } + } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java index e2c61de7183..f0e8c684b16 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/MapFnRunners.java @@ -21,7 +21,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -108,7 +108,7 @@ private Factory(MapperFactory<InputT, OutputT> mapperFactory) { Map<String, PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java index edac09ea135..3e663d803c7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.fn.harness; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.Map; import java.util.function.Consumer; @@ -70,7 +70,7 @@ T createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java index 1af56c73e55..6ade4bbe234 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java @@ -36,9 +36,11 @@ import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableProcessElementInvoker; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.Timer; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; @@ -82,7 +84,7 @@ context, windowedCoder, (Collection<FnDataReceiver<WindowedValue<OutputT>>>) - (Collection) context.tagToConsumer.get(context.mainOutputTag), + (Collection) context.localNameToConsumer.get(context.mainOutputTag.getId()), Iterables.getOnlyElement(context.pTransform.getInputsMap().keySet())); } } @@ -196,7 +198,7 @@ public void outputWindowedValue( Collection<? extends BoundedWindow> windows, PaneInfo pane) { Collection<FnDataReceiver<WindowedValue<AdditionalOutputT>>> consumers = - (Collection) context.tagToConsumer.get(tag); + (Collection) context.localNameToConsumer.get(tag.getId()); if (consumers == null) { throw new IllegalArgumentException( String.format("Unknown output tag %s", tag)); @@ -247,6 +249,12 @@ public void outputWindowedValue( } } + @Override + public void processTimer( + String timerId, TimeDomain timeDomain, WindowedValue<KV<Object, Timer>> input) { + throw new UnsupportedOperationException("Timers are unsupported in a SplittableDoFn."); + } + @Override public void finishBundle() { doFnInvoker.invokeFinishBundle(finishBundleContext); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 4007426c17f..c76357cdf7a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -386,7 +386,7 @@ public Object createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) { diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index d885d1fa005..976731151ef 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -31,11 +31,11 @@ import static org.mockito.Mockito.when; import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; @@ -127,7 +127,7 @@ public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception { List<WindowedValue<String>> outputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); String localOutputId = "outputPC"; consumers.put( localOutputId, (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index 0b0fbf598fc..8714a425a12 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -33,10 +33,10 @@ import static org.mockito.Mockito.when; import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -117,7 +117,7 @@ public void setUp() { public void testCreatingAndProcessingBeamFnDataWriteRunner() throws Exception { String bundleId = "57L"; - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); List<ThrowingRunnable> startFunctions = new ArrayList<>(); List<ThrowingRunnable> finishFunctions = new ArrayList<>(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java index fa8282796a6..e5567ef8abd 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java @@ -26,10 +26,10 @@ import static org.junit.Assert.fail; import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -126,7 +126,7 @@ public void testStart() throws Exception { public void testCreatingAndProcessingSourceFromFactory() throws Exception { List<WindowedValue<String>> outputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put( "outputPC", (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) outputValues::add); List<ThrowingRunnable> startFunctions = new ArrayList<>(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java index 5cab09e503d..7ba5c7be5fa 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java @@ -24,9 +24,9 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertThat; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -123,7 +123,7 @@ public void createPipeline() throws Exception { @Test public void testPrecombine() throws Exception { // Create a map of consumers and an output target to check output values. - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new ArrayDeque<>(); consumers.put( Iterables.getOnlyElement(pTransform.getOutputsMap().values()), @@ -190,7 +190,7 @@ public void testPrecombine() throws Exception { @Test public void testMergeAccumulators() throws Exception { // Create a map of consumers and an output target to check output values. - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new ArrayDeque<>(); consumers.put( Iterables.getOnlyElement(pTransform.getOutputsMap().values()), @@ -245,7 +245,7 @@ public void testMergeAccumulators() throws Exception { @Test public void testExtractOutputs() throws Exception { // Create a map of consumers and an output target to check output values. - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new ArrayDeque<>(); consumers.put( Iterables.getOnlyElement(pTransform.getOutputsMap().values()), @@ -300,7 +300,7 @@ public void testExtractOutputs() throws Exception { @Test public void testCombineGroupedValues() throws Exception { // Create a map of consumers and an output target to check output values. - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); Deque<WindowedValue<KV<String, Integer>>> mainOutputValues = new ArrayDeque<>(); consumers.put( Iterables.getOnlyElement(pTransform.getOutputsMap().values()), diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java index b9393c29e5a..3ba0495d295 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java @@ -26,10 +26,8 @@ import com.google.common.base.Suppliers; import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimap; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -71,7 +69,7 @@ public void testCreatingAndProcessingDoFlatten() throws Exception { .build(); List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put( "mainOutputTarget", (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index df5d62ce744..5b2ee9ccee9 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.fn.harness; +import static org.apache.beam.sdk.util.WindowedValue.timestampedValueInGlobalWindow; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -28,18 +29,20 @@ import static org.junit.Assert.fail; import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.ServiceLoader; import org.apache.beam.fn.harness.state.FakeBeamFnStateClient; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.SdkComponents; @@ -52,7 +55,12 @@ import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.testing.ResetDateTimeProvider; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -76,6 +84,7 @@ import org.hamcrest.collection.IsMapContaining; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -84,6 +93,8 @@ @RunWith(JUnit4.class) public class FnApiDoFnRunnerTest implements Serializable { + @Rule public transient ResetDateTimeProvider dateTimeProvider = new ResetDateTimeProvider(); + public static final String TEST_PTRANSFORM_ID = "pTransformId"; private static class ConcatCombineFn extends CombineFn<String, String, String> { @@ -173,7 +184,7 @@ public void testUsingUserState() throws Exception { bagUserStateKey("combine", "X"), encode("X0"))); List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put( outputPCollectionId, (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add); @@ -332,7 +343,7 @@ public void testBasicWithSideInputsAndOutputs() throws Exception { List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); List<WindowedValue<String>> additionalOutputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put( outputPCollectionId, (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add); @@ -459,7 +470,7 @@ public void testSideInputIsAccessibleForDownstreamCallers() throws Exception { FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(stateData); List<WindowedValue<Iterable<String>>> mainOutputValues = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put( Iterables.getOnlyElement(pTransform.getOutputsMap().values()), (FnDataReceiver) (FnDataReceiver<WindowedValue<Iterable<String>>>) mainOutputValues::add); @@ -505,10 +516,223 @@ public void testSideInputIsAccessibleForDownstreamCallers() throws Exception { assertEquals(stateData, fakeClient.getData()); } + private static class TestTimerfulDoFn extends DoFn<KV<String, String>, String> { + @TimerId("event") + private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId("processing") + private final TimerSpec processingTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void processElement( + ProcessContext context, + @TimerId("event") Timer eventTimeTimer, + @TimerId("processing") Timer processingTimeTimer) { + context.output("main" + context.element().getKey()); + eventTimeTimer.set(context.timestamp().plus(1L)); + processingTimeTimer.offset(Duration.millis(2L)); + processingTimeTimer.setRelative(); + } + + @OnTimer("event") + public void eventTimer( + OnTimerContext context, + @TimerId("event") Timer eventTimeTimer, + @TimerId("processing") Timer processingTimeTimer) { + context.output("event"); + eventTimeTimer.set(context.timestamp().plus(11L)); + processingTimeTimer.offset(Duration.millis(12L)); + processingTimeTimer.setRelative(); + } + + @OnTimer("processing") + public void processingTimer( + OnTimerContext context, + @TimerId("event") Timer eventTimeTimer, + @TimerId("processing") Timer processingTimeTimer) { + context.output("processing"); + eventTimeTimer.set(context.timestamp().plus(21L)); + processingTimeTimer.offset(Duration.millis(22L)); + processingTimeTimer.setRelative(); + } + } + + @Test + public void testTimers() throws Exception { + dateTimeProvider.setDateTimeFixed(10000L); + + Pipeline p = Pipeline.create(); + PCollection<KV<String, String>> valuePCollection = + p.apply(Create.of(KV.of("unused", "unused"))); + PCollection<String> outputPCollection = + valuePCollection.apply(TEST_PTRANSFORM_ID, ParDo.of(new TestTimerfulDoFn())); + + SdkComponents sdkComponents = SdkComponents.create(); + sdkComponents.registerEnvironment(Environment.getDefaultInstance()); + // Note that the pipeline translation for timers creates a loop between the ParDo with + // the timer and the PCollection for that timer. This loop is unrolled by runners + // during execution which we redo here manually. + RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents); + String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection); + String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection); + String eventTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event"; + String eventTimerOutputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event.output"; + String processingTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).processing"; + String processingTimerOutputPCollectionId = + "pTransformId/ParMultiDo(TestTimerful).processing.output"; + + RunnerApi.PTransform pTransform = + pProto + .getComponents() + .getTransformsOrThrow( + pProto.getComponents().getTransformsOrThrow(TEST_PTRANSFORM_ID).getSubtransforms(0)) + .toBuilder() + // We need to re-write the "output" PCollections that a runner would have inserted + // on the way to a output sink. + .putOutputs("event", eventTimerOutputPCollectionId) + .putOutputs("processing", processingTimerOutputPCollectionId) + .build(); + + FakeBeamFnStateClient fakeClient = new FakeBeamFnStateClient(Collections.emptyMap()); + + List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); + List<WindowedValue<KV<String, Timer>>> eventTimerOutputValues = new ArrayList<>(); + List<WindowedValue<KV<String, Timer>>> processingTimerOutputValues = new ArrayList<>(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); + consumers.put( + outputPCollectionId, + (FnDataReceiver) (FnDataReceiver<WindowedValue<String>>) mainOutputValues::add); + consumers.put( + eventTimerOutputPCollectionId, + (FnDataReceiver) + (FnDataReceiver<WindowedValue<KV<String, Timer>>>) eventTimerOutputValues::add); + consumers.put( + processingTimerOutputPCollectionId, + (FnDataReceiver) + (FnDataReceiver<WindowedValue<KV<String, Timer>>>) processingTimerOutputValues::add); + + List<ThrowingRunnable> startFunctions = new ArrayList<>(); + List<ThrowingRunnable> finishFunctions = new ArrayList<>(); + + new FnApiDoFnRunner.Factory<>() + .createRunnerForPTransform( + PipelineOptionsFactory.create(), + null /* beamFnDataClient */, + fakeClient, + TEST_PTRANSFORM_ID, + pTransform, + Suppliers.ofInstance("57L")::get, + ImmutableMap.<String, RunnerApi.PCollection>builder() + .putAll(pProto.getComponents().getPcollectionsMap()) + // We need to insert the "output" PCollections that a runner would have inserted + // on the way to a output sink. + .put( + eventTimerOutputPCollectionId, + pProto.getComponents().getPcollectionsOrThrow(eventTimerInputPCollectionId)) + .put( + processingTimerOutputPCollectionId, + pProto + .getComponents() + .getPcollectionsOrThrow(processingTimerInputPCollectionId)) + .build(), + pProto.getComponents().getCodersMap(), + pProto.getComponents().getWindowingStrategiesMap(), + consumers, + startFunctions::add, + finishFunctions::add, + null /* splitListener */); + + Iterables.getOnlyElement(startFunctions).run(); + mainOutputValues.clear(); + + assertThat( + consumers.keySet(), + containsInAnyOrder( + inputPCollectionId, + outputPCollectionId, + eventTimerInputPCollectionId, + eventTimerOutputPCollectionId, + processingTimerInputPCollectionId, + processingTimerOutputPCollectionId)); + + // Ensure that bag user state that is initially empty or populated works. + // Ensure that the key order does not matter when we traverse over KV pairs. + FnDataReceiver<WindowedValue<?>> mainInput = + Iterables.getOnlyElement(consumers.get(inputPCollectionId)); + FnDataReceiver<WindowedValue<?>> eventTimerInput = + Iterables.getOnlyElement(consumers.get(eventTimerInputPCollectionId)); + FnDataReceiver<WindowedValue<?>> processingTimerInput = + Iterables.getOnlyElement(consumers.get(processingTimerInputPCollectionId)); + mainInput.accept(timestampedValueInGlobalWindow(KV.of("X", "X1"), new Instant(1000L))); + mainInput.accept(timestampedValueInGlobalWindow(KV.of("Y", "Y1"), new Instant(1100L))); + mainInput.accept(timestampedValueInGlobalWindow(KV.of("X", "X2"), new Instant(1200L))); + mainInput.accept(timestampedValueInGlobalWindow(KV.of("Y", "Y2"), new Instant(1300L))); + eventTimerInput.accept(timerInGlobalWindow("A", new Instant(1400L), new Instant(2400L))); + eventTimerInput.accept(timerInGlobalWindow("B", new Instant(1500L), new Instant(2500L))); + eventTimerInput.accept(timerInGlobalWindow("A", new Instant(1600L), new Instant(2600L))); + processingTimerInput.accept(timerInGlobalWindow("C", new Instant(1700L), new Instant(2700L))); + processingTimerInput.accept(timerInGlobalWindow("D", new Instant(1800L), new Instant(2800L))); + processingTimerInput.accept(timerInGlobalWindow("C", new Instant(1900L), new Instant(2900L))); + assertThat( + mainOutputValues, + contains( + timestampedValueInGlobalWindow("mainX", new Instant(1000L)), + timestampedValueInGlobalWindow("mainY", new Instant(1100L)), + timestampedValueInGlobalWindow("mainX", new Instant(1200L)), + timestampedValueInGlobalWindow("mainY", new Instant(1300L)), + timestampedValueInGlobalWindow("event", new Instant(1400L)), + timestampedValueInGlobalWindow("event", new Instant(1500L)), + timestampedValueInGlobalWindow("event", new Instant(1600L)), + timestampedValueInGlobalWindow("processing", new Instant(1700L)), + timestampedValueInGlobalWindow("processing", new Instant(1800L)), + timestampedValueInGlobalWindow("processing", new Instant(1900L)))); + assertThat( + eventTimerOutputValues, + contains( + timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)), + timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)), + timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)), + timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)), + timerInGlobalWindow("A", new Instant(1400L), new Instant(1411L)), + timerInGlobalWindow("B", new Instant(1500L), new Instant(1511L)), + timerInGlobalWindow("A", new Instant(1600L), new Instant(1611L)), + timerInGlobalWindow("C", new Instant(1700L), new Instant(1721L)), + timerInGlobalWindow("D", new Instant(1800L), new Instant(1821L)), + timerInGlobalWindow("C", new Instant(1900L), new Instant(1921L)))); + assertThat( + processingTimerOutputValues, + contains( + timerInGlobalWindow("X", new Instant(1000L), new Instant(10002L)), + timerInGlobalWindow("Y", new Instant(1100L), new Instant(10002L)), + timerInGlobalWindow("X", new Instant(1200L), new Instant(10002L)), + timerInGlobalWindow("Y", new Instant(1300L), new Instant(10002L)), + timerInGlobalWindow("A", new Instant(1400L), new Instant(10012L)), + timerInGlobalWindow("B", new Instant(1500L), new Instant(10012L)), + timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), + timerInGlobalWindow("C", new Instant(1700L), new Instant(10022L)), + timerInGlobalWindow("D", new Instant(1800L), new Instant(10022L)), + timerInGlobalWindow("C", new Instant(1900L), new Instant(10022L)))); + mainOutputValues.clear(); + + Iterables.getOnlyElement(finishFunctions).run(); + assertThat(mainOutputValues, empty()); + + assertEquals(ImmutableMap.of(), fakeClient.getData()); + mainOutputValues.clear(); + } + private <T> WindowedValue<T> valueInWindow(T value, BoundedWindow window) { return WindowedValue.of(value, window.maxTimestamp(), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); } + private <T> + WindowedValue<KV<T, org.apache.beam.runners.core.construction.Timer>> timerInGlobalWindow( + T value, Instant valueTimestamp, Instant scheduledTimestamp) { + return timestampedValueInGlobalWindow( + KV.of(value, org.apache.beam.runners.core.construction.Timer.of(scheduledTimestamp)), + valueTimestamp); + } + /** * Produces a multimap side input {@link StateKey} for the test PTransform id in the global * window. diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java index 93ca8086de6..34336c1bb35 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java @@ -25,10 +25,10 @@ import static org.junit.Assert.assertThat; import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -62,7 +62,7 @@ @Test public void testValueOnlyMapping() throws Exception { List<WindowedValue<?>> outputConsumer = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put("outputPC", outputConsumer::add); List<ThrowingRunnable> startFunctions = new ArrayList<>(); @@ -98,7 +98,7 @@ public void testValueOnlyMapping() throws Exception { @Test public void testFullWindowedValueMapping() throws Exception { List<WindowedValue<?>> outputConsumer = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put("outputPC", outputConsumer::add); List<ThrowingRunnable> startFunctions = new ArrayList<>(); @@ -133,7 +133,7 @@ public void testFullWindowedValueMapping() throws Exception { @Test public void testFullWindowedValueMappingWithCompressedWindow() throws Exception { List<WindowedValue<?>> outputConsumer = new ArrayList<>(); - Multimap<String, FnDataReceiver<WindowedValue<?>>> consumers = HashMultimap.create(); + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> consumers = ArrayListMultimap.create(); consumers.put("outputPC", outputConsumer::add); List<ThrowingRunnable> startFunctions = new ArrayList<>(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index d08eb326ec3..a8f0ff0db52 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ListMultimap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; @@ -365,7 +365,8 @@ public Object createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> + pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) @@ -426,7 +427,8 @@ public Object createRunnerForPTransform( Map<String, PCollection> pCollections, Map<String, Coder> coders, Map<String, WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, + ListMultimap<String, FnDataReceiver<WindowedValue<?>>> + pCollectionIdsToConsumers, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, BundleSplitListener splitListener) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 125236) Time Spent: 3h 40m (was: 3.5h) > Java SDK harness should support user timers > ------------------------------------------- > > Key: BEAM-4653 > URL: https://issues.apache.org/jira/browse/BEAM-4653 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-harness > Reporter: Luke Cwik > Assignee: Luke Cwik > Priority: Major > Labels: portability > Time Spent: 3h 40m > Remaining Estimate: 0h > > Wire up the onTimer method in the Java SDK harness FnApiDoFnRunner connecting > it to the RemoteGrpcPort read/write that is responsible for > producing/consumer timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)