Repository: beam Updated Branches: refs/heads/master f62586a08 -> 56e4251de
[BEAM-1347] Add DoFnRunner specific to Fn Api. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7da08c98 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7da08c98 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7da08c98 Branch: refs/heads/master Commit: 7da08c981dcf49f91595a1b78abbaeb84ccbf287 Parents: 2295b90 Author: Luke Cwik <[email protected]> Authored: Fri Jun 23 14:34:36 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Fri Jul 7 13:44:22 2017 -0700 ---------------------------------------------------------------------- sdks/java/harness/pom.xml | 10 + .../beam/runners/core/FnApiDoFnRunner.java | 483 ++++++++++++++++--- .../beam/runners/core/FnApiDoFnRunnerTest.java | 7 +- 3 files changed, 438 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/7da08c98/sdks/java/harness/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml index 9cfadc2..fe5c2f1 100644 --- a/sdks/java/harness/pom.xml +++ b/sdks/java/harness/pom.xml @@ -83,6 +83,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-core-construction-java</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> </dependency> @@ -150,6 +155,11 @@ </dependency> <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/7da08c98/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java index adf735a..b3cf3a7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java @@ -27,49 +27,59 @@ import com.google.common.collect.Multimap; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.InvalidProtocolBufferException; -import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fake.FakeStepContext; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; /** - * Classes associated with converting {@link RunnerApi.PTransform}s to {@link DoFnRunner}s. - * - * <p>TODO: Move DoFnRunners into SDK harness and merge the methods below into it removing this - * class. + * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers + * of abstraction caused by StateInternals/TimerInternals since they model state and timer + * concepts differently. */ -public class FnApiDoFnRunner { - - private static final String URN = "urn:org.apache.beam:dofn:java:0.1"; - - /** A registrar which provides a factory to handle Java {@link DoFn}s. */ +public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + /** + * A registrar which provides a factory to handle Java {@link DoFn}s. + */ @AutoService(PTransformRunnerFactory.Registrar.class) public static class Registrar implements PTransformRunnerFactory.Registrar { @Override public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { - return ImmutableMap.of(URN, new Factory()); + return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory()); } } - /** A factory for {@link DoFnRunner}s. */ + /** A factory for {@link FnApiDoFnRunner}. */ static class Factory<InputT, OutputT> implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> { @@ -105,9 +115,9 @@ public class FnApiDoFnRunner { throw new IllegalArgumentException( String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e); } - DoFnInfo<?, ?> doFnInfo = - (DoFnInfo<?, ?>) - SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo"); + @SuppressWarnings({"unchecked", "rawtypes"}) + DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray( + serializedFn.toByteArray(), "DoFnInfo"); // Verify that the DoFnInfo tag to output map matches the output map on the PTransform. checkArgument( @@ -119,54 +129,26 @@ public class FnApiDoFnRunner { doFnInfo.getOutputMap()); ImmutableMultimap.Builder<TupleTag<?>, - ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput = + ThrowingConsumer<WindowedValue<?>>> tagToOutputMapBuilder = ImmutableMultimap.builder(); for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) { @SuppressWarnings({"unchecked", "rawtypes"}) - Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers = - (Collection) outputMap.get(Long.toString(entry.getKey())); - tagToOutput.putAll(entry.getValue(), consumers); + Collection<ThrowingConsumer<WindowedValue<?>>> consumers = + outputMap.get(Long.toString(entry.getKey())); + tagToOutputMapBuilder.putAll(entry.getValue(), consumers); } + ImmutableMultimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> tagToOutputMap = + tagToOutputMapBuilder.build(); + @SuppressWarnings({"unchecked", "rawtypes"}) - Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap = - (Map) tagToOutput.build().asMap(); - - OutputManager outputManager = - new OutputManager() { - Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput = - tagBasedOutputMap; - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - try { - Collection<ThrowingConsumer<WindowedValue<?>>> consumers = - tupleTagToOutput.get(tag); - if (consumers == null) { - /* This is a normal case, e.g., if a DoFn has output but that output is not - * consumed. Drop the output. */ - return; - } - for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) { - consumer.accept(output); - } - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - }; - - @SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) - DoFnRunner<InputT, OutputT> runner = - DoFnRunners.simpleRunner( - pipelineOptions, - (DoFn) doFnInfo.getDoFn(), - NullSideInputReader.empty(), /* TODO */ - outputManager, - (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()), - new ArrayList<>(doFnInfo.getOutputMap().values()), - new FakeStepContext(), - (WindowingStrategy) doFnInfo.getWindowingStrategy()); + DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>( + pipelineOptions, + doFnInfo.getDoFn(), + (Collection<ThrowingConsumer<WindowedValue<OutputT>>>) (Collection) + tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())), + tagToOutputMap, + doFnInfo.getWindowingStrategy()); // Register the appropriate handlers. addStartFunction.accept(runner::startBundle); @@ -179,4 +161,387 @@ public class FnApiDoFnRunner { return runner; } } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + + private final PipelineOptions pipelineOptions; + private final DoFn<InputT, OutputT> doFn; + private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers; + private final Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap; + private final DoFnInvoker<InputT, OutputT> doFnInvoker; + private final StartBundleContext startBundleContext; + private final ProcessBundleContext processBundleContext; + private final FinishBundleContext finishBundleContext; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private WindowedValue<InputT> currentElement; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private BoundedWindow currentWindow; + + FnApiDoFnRunner( + PipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers, + Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap, + WindowingStrategy windowingStrategy) { + this.pipelineOptions = pipelineOptions; + this.doFn = doFn; + this.mainOutputConsumers = mainOutputConsumers; + this.outputMap = outputMap; + this.doFnInvoker = DoFnInvokers.invokerFor(doFn); + this.startBundleContext = new StartBundleContext(); + this.processBundleContext = new ProcessBundleContext(); + this.finishBundleContext = new FinishBundleContext(); + } + + @Override + public void startBundle() { + doFnInvoker.invokeStartBundle(startBundleContext); + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + currentElement = elem; + try { + Iterator<BoundedWindow> windowIterator = + (Iterator<BoundedWindow>) elem.getWindows().iterator(); + while (windowIterator.hasNext()) { + currentWindow = windowIterator.next(); + doFnInvoker.invokeProcessElement(processBundleContext); + } + } finally { + currentElement = null; + currentWindow = null; + } + } + + @Override + public void onTimer( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public void finishBundle() { + doFnInvoker.invokeFinishBundle(finishBundleContext); + } + + /** + * Outputs the given element to the specified set of consumers wrapping any exceptions. + */ + private <T> void outputTo( + Collection<ThrowingConsumer<WindowedValue<T>>> consumers, + WindowedValue<T> output) { + Iterator<ThrowingConsumer<WindowedValue<T>>> consumerIterator; + try { + for (ThrowingConsumer<WindowedValue<T>> consumer : consumers) { + consumer.accept(output); + } + } catch (Throwable t) { + throw UserCodeException.wrap(t); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}. + */ + private class StartBundleContext + extends DoFn<InputT, OutputT>.StartBundleContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private StartBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( + DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}. + */ + private class ProcessBundleContext + extends DoFn<InputT, OutputT>.ProcessContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private ProcessBundleContext() { + doFn.super(); + } + + @Override + public BoundedWindow window() { + return currentWindow; + } + + @Override + public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("TODO: Add support for state"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public void output(OutputT output) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public InputT element() { + return currentElement.getValue(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new UnsupportedOperationException("TODO: Support side inputs"); + } + + @Override + public Instant timestamp() { + return currentElement.getTimestamp(); + } + + @Override + public PaneInfo pane() { + return currentElement.getPane(); + } + + @Override + public void updateWatermark(Instant watermark) { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}. + */ + private class FinishBundleContext + extends DoFn<InputT, OutputT>.FinishBundleContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private FinishBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( + DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public void output(OutputT output, Instant timestamp, BoundedWindow window) { + outputTo(mainOutputConsumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + + @Override + public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7da08c98/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java index ae5cbac..c4df77a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java @@ -44,6 +44,7 @@ import java.util.ServiceLoader; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; +import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.dataflow.util.CloudObjects; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.sdk.coders.Coder; @@ -71,7 +72,6 @@ public class FnApiDoFnRunnerTest { WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); private static final String STRING_CODER_SPEC_ID = "999L"; private static final RunnerApi.Coder STRING_CODER_SPEC; - private static final String URN = "urn:org.apache.beam:dofn:java:0.1"; static { try { @@ -132,7 +132,7 @@ public class FnApiDoFnRunnerTest { Long.parseLong(mainOutputId), TestDoFn.mainOutput, Long.parseLong(additionalOutputId), TestDoFn.additionalOutput)); RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder() - .setUrn("urn:org.apache.beam:dofn:java:0.1") + .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN) .setParameter(Any.pack(BytesValue.newBuilder() .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo))) .build())) @@ -200,7 +200,8 @@ public class FnApiDoFnRunnerTest { for (Registrar registrar : ServiceLoader.load(Registrar.class)) { if (registrar instanceof FnApiDoFnRunner.Registrar) { - assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN)); + assertThat(registrar.getPTransformRunnerFactories(), + IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)); return; } }
