Add ParDos Add ParDoPayloadTranslator to PTransformTranslator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/790e7fe6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/790e7fe6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/790e7fe6 Branch: refs/heads/master Commit: 790e7fe6653b926044d3dfecdccbc2fda9c998f0 Parents: 6a7eeeb Author: Thomas Groh <[email protected]> Authored: Tue Mar 21 15:06:58 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Thu May 18 15:41:17 2017 -0700 ---------------------------------------------------------------------- .../runners/core/construction/PTransforms.java | 8 +- .../beam/runners/core/construction/ParDos.java | 317 +++++++++++++++++++ .../construction/RunnerPCollectionView.java | 88 +++++ .../runners/core/construction/ParDosTest.java | 229 ++++++++++++++ .../src/main/proto/beam_runner_api.proto | 10 + .../sdk/transforms/windowing/GlobalWindows.java | 6 +- 6 files changed, 653 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java index d25d342..16276b9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java @@ -24,10 +24,12 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.construction.ParDos.ParDoPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -39,9 +41,9 @@ import org.apache.beam.sdk.values.TupleTag; public class PTransforms { private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> KNOWN_PAYLOAD_TRANSLATORS = - ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder().build(); - // TODO: ParDoPayload, WindowIntoPayload, ReadPayload, CombinePayload - // TODO: "Flatten Payload", etc? + ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder() + .put(ParDo.MultiOutput.class, ParDoPayloadTranslator.create()) + .build(); // TODO: Load via service loader. private PTransforms() {} http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java new file mode 100644 index 0000000..b2b29df --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Optional; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. + */ +public class ParDos { + /** + * The URN for a {@link ParDoPayload}. + */ + public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1"; + /** + * The URN for an unknown Java {@link DoFn}. + */ + public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1"; + /** + * The URN for an unknown Java {@link ViewFn}. + */ + public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1"; + /** + * The URN for an unknown Java {@link WindowMappingFn}. + */ + public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN = + "urn:beam:windowmappingfn:javasdk:0.1"; + + /** + * A {@link TransformPayloadTranslator} for {@link ParDo}. + */ + public static class ParDoPayloadTranslator + implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { + public static TransformPayloadTranslator create() { + return new ParDoPayloadTranslator(); + } + + private ParDoPayloadTranslator() {} + + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) { + ParDoPayload payload = toProto(transform.getTransform(), components); + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(PAR_DO_PAYLOAD_URN) + .setParameter(Any.pack(payload)) + .build(); + } + } + + public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) { + DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass()); + Map<String, StateDeclaration> states = signature.stateDeclarations(); + Map<String, TimerDeclaration> timers = signature.timerDeclarations(); + List<Parameter> parameters = signature.processElement().extraParameters(); + + ParDoPayload.Builder builder = ParDoPayload.newBuilder(); + builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag())); + for (PCollectionView<?> sideInput : parDo.getSideInputs()) { + builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput)); + } + for (Parameter parameter : parameters) { + Optional<RunnerApi.Parameter> protoParameter = toProto(parameter); + if (protoParameter.isPresent()) { + builder.addParameters(protoParameter.get()); + } + } + for (Map.Entry<String, StateDeclaration> state : states.entrySet()) { + StateSpec spec = toProto(state.getValue()); + builder.putStateSpecs(state.getKey(), spec); + } + for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) { + TimerSpec spec = toProto(timer.getValue()); + builder.putTimerSpecs(timer.getKey(), spec); + } + return builder.build(); + } + + public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException { + return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn(); + } + + public static TupleTag<?> getMainOutputTag(ParDoPayload payload) + throws InvalidProtocolBufferException { + return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); + } + + // TODO: Implement + private static StateSpec toProto(StateDeclaration state) { + throw new UnsupportedOperationException("Not yet supported"); + } + + // TODO: Implement + private static TimerSpec toProto(TimerDeclaration timer) { + throw new UnsupportedOperationException("Not yet supported"); + } + + @AutoValue + abstract static class DoFnAndMainOutput implements Serializable { + public static DoFnAndMainOutput of( + DoFn<?, ?> fn, TupleTag<?> tag) { + return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag); + } + + abstract DoFn<?, ?> getDoFn(); + abstract TupleTag<?> getMainOutputTag(); + } + + private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_DO_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray( + DoFnAndMainOutput.of(fn, tag)))) + .build()))) + .build(); + } + + private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec) + throws InvalidProtocolBufferException { + checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN)); + byte[] serializedFn = + fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + return (DoFnAndMainOutput) + SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag"); + } + + private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) { + return parameter.match( + new Cases.WithDefault<Optional<RunnerApi.Parameter>>() { + @Override + public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) { + return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build()); + } + + @Override + public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) { + return Optional.of( + RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build()); + } + + @Override + protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) { + return Optional.absent(); + } + }); + } + + private static SideInput toProto(PCollectionView<?> view) { + Builder builder = SideInput.newBuilder(); + builder.setAccessPattern( + FunctionSpec.newBuilder() + .setUrn(view.getViewFn().getMaterialization().getUrn()) + .build()); + builder.setViewFn(toProto(view.getViewFn())); + builder.setWindowMappingFn(toProto(view.getWindowMappingFn())); + return builder.build(); + } + + public static PCollectionView<?> fromProto( + SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components) + throws IOException { + TupleTag<?> tag = new TupleTag<>(id); + WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn()); + ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); + + RunnerApi.PCollection inputCollection = + components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); + WindowingStrategy<?, ?> windowingStrategy = + WindowingStrategies.fromProto( + components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), + components); + Coder<?> elemCoder = + Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components); + Coder<Iterable<WindowedValue<?>>> coder = + (Coder) + IterableCoder.of( + FullWindowedValueCoder.of( + elemCoder, windowingStrategy.getWindowFn().windowCoder())); + checkArgument( + sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN), + "Unknown View Materialization URN %s", + sideInput.getAccessPattern().getUrn()); + + PCollectionView<?> view = + new RunnerPCollectionView<>( + (TupleTag<Iterable<WindowedValue<?>>>) tag, + (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, + windowMappingFn, + windowingStrategy, + coder); + return view; + } + + private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_VIEW_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn))) + .build()))) + .build(); + } + + private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn) + throws InvalidProtocolBufferException { + FunctionSpec spec = viewFn.getSpec(); + checkArgument( + spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN), + "Can't deserialize unknown %s type %s", + ViewFn.class.getSimpleName(), + spec.getUrn()); + return (ViewFn<?, ?>) + SerializableUtils.deserializeFromByteArray( + spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn"); + } + + private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(windowMappingFn))) + .build()))) + .build(); + } + + private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn) + throws InvalidProtocolBufferException { + FunctionSpec spec = windowMappingFn.getSpec(); + checkArgument( + spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN), + "Can't deserialize unknown %s type %s", + WindowMappingFn.class.getSimpleName(), + spec.getUrn()); + return (WindowMappingFn<?>) + SerializableUtils.deserializeFromByteArray( + spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + "Custom WinodwMappingFn"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java new file mode 100644 index 0000000..89e8784 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValueBase; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** A {@link PCollectionView} created from the components of a {@link SideInput}. */ +class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T> { + private final TupleTag<Iterable<WindowedValue<?>>> tag; + private final ViewFn<Iterable<WindowedValue<?>>, T> viewFn; + private final WindowMappingFn<?> windowMappingFn; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Coder<Iterable<WindowedValue<?>>> coder; + + /** + * Create a new {@link RunnerPCollectionView} from the provided components. + */ + RunnerPCollectionView( + TupleTag<Iterable<WindowedValue<?>>> tag, + ViewFn<Iterable<WindowedValue<?>>, T> viewFn, + WindowMappingFn<?> windowMappingFn, + @Nullable WindowingStrategy<?, ?> windowingStrategy, + @Nullable Coder<Iterable<WindowedValue<?>>> coder) { + this.tag = tag; + this.viewFn = viewFn; + this.windowMappingFn = windowMappingFn; + this.windowingStrategy = windowingStrategy; + this.coder = coder; + } + + @Nullable + @Override + public PCollection<?> getPCollection() { + throw new IllegalStateException( + String.format("Cannot call getPCollection on a %s", getClass().getSimpleName())); + } + + @Override + public TupleTag<Iterable<WindowedValue<?>>> getTagInternal() { + return tag; + } + + @Override + public ViewFn<Iterable<WindowedValue<?>>, T> getViewFn() { + return viewFn; + } + + @Override + public WindowMappingFn<?> getWindowMappingFn() { + return windowMappingFn; + } + + @Override + public WindowingStrategy<?, ?> getWindowingStrategyInternal() { + return windowingStrategy; + } + + @Override + public Coder<Iterable<WindowedValue<?>>> getCoderInternal() { + return coder; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java new file mode 100644 index 0000000..74edec1 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link ParDos}. */ +@RunWith(Parameterized.class) +public class ParDosTest { + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + private static PCollectionView<Long> singletonSideInput = + p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L)) + .apply(View.<Long>asSingleton()); + private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput = + p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam"))) + .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())) + .apply(View.<Long, String>asMultimap()); + + private static PCollection<KV<Long, String>> mainInput = + p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))); + + @Parameters(name = "{index}: {0}") + public static Iterable<ParDo.MultiOutput<?, ?>> data() { + return ImmutableList.<ParDo.MultiOutput<?, ?>>of( + ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()), + ParDo.of(new DropElementsFn()) + .withOutputTags(new TupleTag<Void>(), TupleTagList.empty()) + .withSideInputs(singletonSideInput, multimapSideInput), + ParDo.of(new DropElementsFn()) + .withOutputTags( + new TupleTag<Void>(), + TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})) + .withSideInputs(singletonSideInput, multimapSideInput), + ParDo.of(new DropElementsFn()) + .withOutputTags( + new TupleTag<Void>(), + TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))); + } + + @Parameter(0) + public ParDo.MultiOutput<KV<Long, String>, Void> parDo; + + @Test + public void testToAndFromProto() throws Exception { + SdkComponents components = SdkComponents.create(); + ParDoPayload payload = ParDos.toProto(parDo, components); + + assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn())); + assertThat( + ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag())); + for (PCollectionView<?> view : parDo.getSideInputs()) { + payload.getSideInputsOrThrow(view.getTagInternal().getId()); + } + } + + @Test + public void toAndFromTransformProto() throws Exception { + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput); + inputs.putAll(parDo.getAdditionalInputs()); + PCollectionTuple output = mainInput.apply(parDo); + + SdkComponents components = SdkComponents.create(); + String transformId = + components.registerPTransform( + AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of( + "foo", inputs, output.expand(), parDo, p), + Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + + Components protoComponents = components.toComponents(); + RunnerApi.PTransform protoTransform = + protoComponents.getTransformsOrThrow(transformId); + ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); + for (PCollectionView<?> view : parDo.getSideInputs()) { + SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); + PCollectionView<?> restoredView = + ParDos.fromProto( + sideInput, view.getTagInternal().getId(), protoTransform, protoComponents); + assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); + assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); + assertThat( + restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass())); + assertThat( + restoredView.getWindowingStrategyInternal(), + Matchers.<WindowingStrategy<?, ?>>equalTo( + view.getWindowingStrategyInternal().fixDefaults())); + assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); + } + } + + private static class DropElementsFn extends DoFn<KV<Long, String>, Void> { + @ProcessElement + public void proc(ProcessContext context, BoundedWindow window) { + context.output(null); + } + + @Override + public boolean equals(Object other) { + return other instanceof DropElementsFn; + } + + @Override + public int hashCode() { + return DropElementsFn.class.hashCode(); + } + } + + @SuppressWarnings("unused") + private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> { + private static final String BAG_STATE_ID = "bagState"; + private static final String COMBINING_STATE_ID = "combiningState"; + private static final String EVENT_TIMER_ID = "eventTimer"; + private static final String PROCESSING_TIMER_ID = "processingTimer"; + + @StateId(BAG_STATE_ID) + private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of()); + + @StateId(COMBINING_STATE_ID) + private final StateSpec<CombiningState<Long, long[], Long>> combiningState = + StateSpecs.combining( + new BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.max(left, right); + } + + @Override + public long identity() { + return Long.MIN_VALUE; + } + }); + + @TimerId(EVENT_TIMER_ID) + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(PROCESSING_TIMER_ID) + private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void dropInput( + ProcessContext context, + BoundedWindow window, + @StateId(BAG_STATE_ID) BagState<String> bagStateState, + @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState, + @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer, + @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) { + context.output(null); + } + + @OnTimer(EVENT_TIMER_ID) + public void onEventTime(OnTimerContext context) {} + + @OnTimer(PROCESSING_TIMER_ID) + public void onProcessingTime(OnTimerContext context) {} + + @Override + public boolean equals(Object other) { + return other instanceof StateTimerDropElementsFn; + } + + @Override + public int hashCode() { + return StateTimerDropElementsFn.class.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/common/runner-api/src/main/proto/beam_runner_api.proto ---------------------------------------------------------------------- diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto index bf4df2a..c8722e6 100644 --- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto +++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto @@ -248,10 +248,20 @@ message Parameter { message StateSpec { // TODO: AST for state spec + string id = 1; + Type type = 2; + + enum Type { + VALUE = 0; + BAG = 1; + MAP = 2; + SET = 3; + } } message TimerSpec { // TODO: AST for timer spec + string id = 1; } enum IsBounded { http://git-wip-us.apache.org/repos/asf/beam/blob/790e7fe6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index 1103a24..d48d26b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms.windowing; +import com.google.auto.value.AutoValue; import java.util.Collection; import java.util.Collections; import org.apache.beam.sdk.coders.Coder; @@ -61,10 +62,11 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { @Override public WindowMappingFn<GlobalWindow> getDefaultWindowMappingFn() { - return new GlobalWindowMappingFn(); + return new AutoValue_GlobalWindows_GlobalWindowMappingFn(); } - static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow> { + @AutoValue + abstract static class GlobalWindowMappingFn extends WindowMappingFn<GlobalWindow> { @Override public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) { return GlobalWindow.INSTANCE;
