Rename ParDos to ParDoTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44609383 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44609383 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44609383 Branch: refs/heads/master Commit: 446093836016dabf021d34ca0a858e313f493e2f Parents: 9b6728e Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:28:49 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ParDoTranslation.java | 348 +++++++++++++++++++ .../beam/runners/core/construction/ParDos.java | 348 ------------------- .../core/construction/ParDoTranslationTest.java | 234 +++++++++++++ .../runners/core/construction/ParDosTest.java | 233 ------------- 4 files changed, 582 insertions(+), 581 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java new file mode 100644 index 0000000..baed246 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; + +/** + * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. + */ +public class ParDoTranslation { + /** + * The URN for a {@link ParDoPayload}. + */ + public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1"; + /** + * The URN for an unknown Java {@link DoFn}. + */ + public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1"; + /** + * The URN for an unknown Java {@link ViewFn}. + */ + public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1"; + /** + * The URN for an unknown Java {@link WindowMappingFn}. + */ + public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN = + "urn:beam:windowmappingfn:javasdk:0.1"; + + /** + * A {@link TransformPayloadTranslator} for {@link ParDo}. + */ + public static class ParDoPayloadTranslator + implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { + public static TransformPayloadTranslator create() { + return new ParDoPayloadTranslator(); + } + + private ParDoPayloadTranslator() {} + + @Override + public FunctionSpec translate( + AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) { + ParDoPayload payload = toProto(transform.getTransform(), components); + return RunnerApi.FunctionSpec.newBuilder() + .setUrn(PAR_DO_PAYLOAD_URN) + .setParameter(Any.pack(payload)) + .build(); + } + + /** + * Registers {@link ParDoPayloadTranslator}. + */ + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator()); + } + } + } + + public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) { + DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass()); + Map<String, StateDeclaration> states = signature.stateDeclarations(); + Map<String, TimerDeclaration> timers = signature.timerDeclarations(); + List<Parameter> parameters = signature.processElement().extraParameters(); + + ParDoPayload.Builder builder = ParDoPayload.newBuilder(); + builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag())); + for (PCollectionView<?> sideInput : parDo.getSideInputs()) { + builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput)); + } + for (Parameter parameter : parameters) { + Optional<RunnerApi.Parameter> protoParameter = toProto(parameter); + if (protoParameter.isPresent()) { + builder.addParameters(protoParameter.get()); + } + } + for (Map.Entry<String, StateDeclaration> state : states.entrySet()) { + StateSpec spec = toProto(state.getValue()); + builder.putStateSpecs(state.getKey(), spec); + } + for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) { + TimerSpec spec = toProto(timer.getValue()); + builder.putTimerSpecs(timer.getKey(), spec); + } + return builder.build(); + } + + public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException { + return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn(); + } + + public static TupleTag<?> getMainOutputTag(ParDoPayload payload) + throws InvalidProtocolBufferException { + return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); + } + + public static RunnerApi.PCollection getMainInput( + RunnerApi.PTransform ptransform, Components components) throws IOException { + checkArgument( + ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN), + "Unexpected payload type %s", + ptransform.getSpec().getUrn()); + ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); + String mainInputId = + Iterables.getOnlyElement( + Sets.difference( + ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet())); + return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId)); + } + + // TODO: Implement + private static StateSpec toProto(StateDeclaration state) { + throw new UnsupportedOperationException("Not yet supported"); + } + + // TODO: Implement + private static TimerSpec toProto(TimerDeclaration timer) { + throw new UnsupportedOperationException("Not yet supported"); + } + + @AutoValue + abstract static class DoFnAndMainOutput implements Serializable { + public static DoFnAndMainOutput of( + DoFn<?, ?> fn, TupleTag<?> tag) { + return new AutoValue_ParDoTranslation_DoFnAndMainOutput(fn, tag); + } + + abstract DoFn<?, ?> getDoFn(); + abstract TupleTag<?> getMainOutputTag(); + } + + private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_DO_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray( + DoFnAndMainOutput.of(fn, tag)))) + .build()))) + .build(); + } + + private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec) + throws InvalidProtocolBufferException { + checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN)); + byte[] serializedFn = + fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); + return (DoFnAndMainOutput) + SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag"); + } + + private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) { + return parameter.match( + new Cases.WithDefault<Optional<RunnerApi.Parameter>>() { + @Override + public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) { + return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build()); + } + + @Override + public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) { + return Optional.of( + RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build()); + } + + @Override + protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) { + return Optional.absent(); + } + }); + } + + private static SideInput toProto(PCollectionView<?> view) { + Builder builder = SideInput.newBuilder(); + builder.setAccessPattern( + FunctionSpec.newBuilder() + .setUrn(view.getViewFn().getMaterialization().getUrn()) + .build()); + builder.setViewFn(toProto(view.getViewFn())); + builder.setWindowMappingFn(toProto(view.getWindowMappingFn())); + return builder.build(); + } + + public static PCollectionView<?> fromProto( + SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components) + throws IOException { + TupleTag<?> tag = new TupleTag<>(id); + WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn()); + ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); + + RunnerApi.PCollection inputCollection = + components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); + WindowingStrategy<?, ?> windowingStrategy = + WindowingStrategies.fromProto( + components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), + components); + Coder<?> elemCoder = + Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components); + Coder<Iterable<WindowedValue<?>>> coder = + (Coder) + IterableCoder.of( + FullWindowedValueCoder.of( + elemCoder, windowingStrategy.getWindowFn().windowCoder())); + checkArgument( + sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN), + "Unknown View Materialization URN %s", + sideInput.getAccessPattern().getUrn()); + + PCollectionView<?> view = + new RunnerPCollectionView<>( + (TupleTag<Iterable<WindowedValue<?>>>) tag, + (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, + windowMappingFn, + windowingStrategy, + coder); + return view; + } + + private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_VIEW_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn))) + .build()))) + .build(); + } + + private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn) + throws InvalidProtocolBufferException { + FunctionSpec spec = viewFn.getSpec(); + checkArgument( + spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN), + "Can't deserialize unknown %s type %s", + ViewFn.class.getSimpleName(), + spec.getUrn()); + return (ViewFn<?, ?>) + SerializableUtils.deserializeFromByteArray( + spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn"); + } + + private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) { + return SdkFunctionSpec.newBuilder() + .setSpec( + FunctionSpec.newBuilder() + .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN) + .setParameter( + Any.pack( + BytesValue.newBuilder() + .setValue( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(windowMappingFn))) + .build()))) + .build(); + } + + private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn) + throws InvalidProtocolBufferException { + FunctionSpec spec = windowMappingFn.getSpec(); + checkArgument( + spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN), + "Can't deserialize unknown %s type %s", + WindowMappingFn.class.getSimpleName(), + spec.getUrn()); + return (WindowMappingFn<?>) + SerializableUtils.deserializeFromByteArray( + spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), + "Custom WinodwMappingFn"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java deleted file mode 100644 index 12f2969..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.auto.service.AutoService; -import com.google.auto.value.AutoValue; -import com.google.common.base.Optional; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.BytesValue; -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Parameter.Type; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput.Builder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.StateSpec; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.TimerSpec; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Materializations; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; -import org.apache.beam.sdk.transforms.ViewFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Cases; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; - -/** - * Utilities for interacting with {@link ParDo} instances and {@link ParDoPayload} protos. - */ -public class ParDos { - /** - * The URN for a {@link ParDoPayload}. - */ - public static final String PAR_DO_PAYLOAD_URN = "urn:beam:pardo:v1"; - /** - * The URN for an unknown Java {@link DoFn}. - */ - public static final String CUSTOM_JAVA_DO_FN_URN = "urn:beam:dofn:javasdk:0.1"; - /** - * The URN for an unknown Java {@link ViewFn}. - */ - public static final String CUSTOM_JAVA_VIEW_FN_URN = "urn:beam:viewfn:javasdk:0.1"; - /** - * The URN for an unknown Java {@link WindowMappingFn}. - */ - public static final String CUSTOM_JAVA_WINDOW_MAPPING_FN_URN = - "urn:beam:windowmappingfn:javasdk:0.1"; - - /** - * A {@link TransformPayloadTranslator} for {@link ParDo}. - */ - public static class ParDoPayloadTranslator - implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { - public static TransformPayloadTranslator create() { - return new ParDoPayloadTranslator(); - } - - private ParDoPayloadTranslator() {} - - @Override - public FunctionSpec translate( - AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents components) { - ParDoPayload payload = toProto(transform.getTransform(), components); - return RunnerApi.FunctionSpec.newBuilder() - .setUrn(PAR_DO_PAYLOAD_URN) - .setParameter(Any.pack(payload)) - .build(); - } - - /** - * Registers {@link ParDoPayloadTranslator}. - */ - @AutoService(TransformPayloadTranslatorRegistrar.class) - public static class Registrar implements TransformPayloadTranslatorRegistrar { - @Override - public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator> - getTransformPayloadTranslators() { - return Collections.singletonMap(ParDo.MultiOutput.class, new ParDoPayloadTranslator()); - } - } - } - - public static ParDoPayload toProto(ParDo.MultiOutput<?, ?> parDo, SdkComponents components) { - DoFnSignature signature = DoFnSignatures.getSignature(parDo.getFn().getClass()); - Map<String, StateDeclaration> states = signature.stateDeclarations(); - Map<String, TimerDeclaration> timers = signature.timerDeclarations(); - List<Parameter> parameters = signature.processElement().extraParameters(); - - ParDoPayload.Builder builder = ParDoPayload.newBuilder(); - builder.setDoFn(toProto(parDo.getFn(), parDo.getMainOutputTag())); - for (PCollectionView<?> sideInput : parDo.getSideInputs()) { - builder.putSideInputs(sideInput.getTagInternal().getId(), toProto(sideInput)); - } - for (Parameter parameter : parameters) { - Optional<RunnerApi.Parameter> protoParameter = toProto(parameter); - if (protoParameter.isPresent()) { - builder.addParameters(protoParameter.get()); - } - } - for (Map.Entry<String, StateDeclaration> state : states.entrySet()) { - StateSpec spec = toProto(state.getValue()); - builder.putStateSpecs(state.getKey(), spec); - } - for (Map.Entry<String, TimerDeclaration> timer : timers.entrySet()) { - TimerSpec spec = toProto(timer.getValue()); - builder.putTimerSpecs(timer.getKey(), spec); - } - return builder.build(); - } - - public static DoFn<?, ?> getDoFn(ParDoPayload payload) throws InvalidProtocolBufferException { - return doFnAndMainOutputTagFromProto(payload.getDoFn()).getDoFn(); - } - - public static TupleTag<?> getMainOutputTag(ParDoPayload payload) - throws InvalidProtocolBufferException { - return doFnAndMainOutputTagFromProto(payload.getDoFn()).getMainOutputTag(); - } - - public static RunnerApi.PCollection getMainInput( - RunnerApi.PTransform ptransform, Components components) throws IOException { - checkArgument( - ptransform.getSpec().getUrn().equals(PAR_DO_PAYLOAD_URN), - "Unexpected payload type %s", - ptransform.getSpec().getUrn()); - ParDoPayload payload = ptransform.getSpec().getParameter().unpack(ParDoPayload.class); - String mainInputId = - Iterables.getOnlyElement( - Sets.difference( - ptransform.getInputsMap().keySet(), payload.getSideInputsMap().keySet())); - return components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId)); - } - - // TODO: Implement - private static StateSpec toProto(StateDeclaration state) { - throw new UnsupportedOperationException("Not yet supported"); - } - - // TODO: Implement - private static TimerSpec toProto(TimerDeclaration timer) { - throw new UnsupportedOperationException("Not yet supported"); - } - - @AutoValue - abstract static class DoFnAndMainOutput implements Serializable { - public static DoFnAndMainOutput of( - DoFn<?, ?> fn, TupleTag<?> tag) { - return new AutoValue_ParDos_DoFnAndMainOutput(fn, tag); - } - - abstract DoFn<?, ?> getDoFn(); - abstract TupleTag<?> getMainOutputTag(); - } - - private static SdkFunctionSpec toProto(DoFn<?, ?> fn, TupleTag<?> tag) { - return SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_JAVA_DO_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray( - DoFnAndMainOutput.of(fn, tag)))) - .build()))) - .build(); - } - - private static DoFnAndMainOutput doFnAndMainOutputTagFromProto(SdkFunctionSpec fnSpec) - throws InvalidProtocolBufferException { - checkArgument(fnSpec.getSpec().getUrn().equals(CUSTOM_JAVA_DO_FN_URN)); - byte[] serializedFn = - fnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(); - return (DoFnAndMainOutput) - SerializableUtils.deserializeFromByteArray(serializedFn, "Custom DoFn And Main Output tag"); - } - - private static Optional<RunnerApi.Parameter> toProto(Parameter parameter) { - return parameter.match( - new Cases.WithDefault<Optional<RunnerApi.Parameter>>() { - @Override - public Optional<RunnerApi.Parameter> dispatch(WindowParameter p) { - return Optional.of(RunnerApi.Parameter.newBuilder().setType(Type.WINDOW).build()); - } - - @Override - public Optional<RunnerApi.Parameter> dispatch(RestrictionTrackerParameter p) { - return Optional.of( - RunnerApi.Parameter.newBuilder().setType(Type.RESTRICTION_TRACKER).build()); - } - - @Override - protected Optional<RunnerApi.Parameter> dispatchDefault(Parameter p) { - return Optional.absent(); - } - }); - } - - private static SideInput toProto(PCollectionView<?> view) { - Builder builder = SideInput.newBuilder(); - builder.setAccessPattern( - FunctionSpec.newBuilder() - .setUrn(view.getViewFn().getMaterialization().getUrn()) - .build()); - builder.setViewFn(toProto(view.getViewFn())); - builder.setWindowMappingFn(toProto(view.getWindowMappingFn())); - return builder.build(); - } - - public static PCollectionView<?> fromProto( - SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components) - throws IOException { - TupleTag<?> tag = new TupleTag<>(id); - WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn()); - ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); - - RunnerApi.PCollection inputCollection = - components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id)); - WindowingStrategy<?, ?> windowingStrategy = - WindowingStrategies.fromProto( - components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()), - components); - Coder<?> elemCoder = - Coders.fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components); - Coder<Iterable<WindowedValue<?>>> coder = - (Coder) - IterableCoder.of( - FullWindowedValueCoder.of( - elemCoder, windowingStrategy.getWindowFn().windowCoder())); - checkArgument( - sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN), - "Unknown View Materialization URN %s", - sideInput.getAccessPattern().getUrn()); - - PCollectionView<?> view = - new RunnerPCollectionView<>( - (TupleTag<Iterable<WindowedValue<?>>>) tag, - (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, - windowMappingFn, - windowingStrategy, - coder); - return view; - } - - private static SdkFunctionSpec toProto(ViewFn<?, ?> viewFn) { - return SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_JAVA_VIEW_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom(SerializableUtils.serializeToByteArray(viewFn))) - .build()))) - .build(); - } - - private static ViewFn<?, ?> viewFnFromProto(SdkFunctionSpec viewFn) - throws InvalidProtocolBufferException { - FunctionSpec spec = viewFn.getSpec(); - checkArgument( - spec.getUrn().equals(CUSTOM_JAVA_VIEW_FN_URN), - "Can't deserialize unknown %s type %s", - ViewFn.class.getSimpleName(), - spec.getUrn()); - return (ViewFn<?, ?>) - SerializableUtils.deserializeFromByteArray( - spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), "Custom ViewFn"); - } - - private static SdkFunctionSpec toProto(WindowMappingFn<?> windowMappingFn) { - return SdkFunctionSpec.newBuilder() - .setSpec( - FunctionSpec.newBuilder() - .setUrn(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN) - .setParameter( - Any.pack( - BytesValue.newBuilder() - .setValue( - ByteString.copyFrom( - SerializableUtils.serializeToByteArray(windowMappingFn))) - .build()))) - .build(); - } - - private static WindowMappingFn<?> windowMappingFnFromProto(SdkFunctionSpec windowMappingFn) - throws InvalidProtocolBufferException { - FunctionSpec spec = windowMappingFn.getSpec(); - checkArgument( - spec.getUrn().equals(CUSTOM_JAVA_WINDOW_MAPPING_FN_URN), - "Can't deserialize unknown %s type %s", - WindowMappingFn.class.getSimpleName(), - spec.getUrn()); - return (WindowMappingFn<?>) - SerializableUtils.deserializeFromByteArray( - spec.getParameter().unpack(BytesValue.class).getValue().toByteArray(), - "Custom WinodwMappingFn"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java new file mode 100644 index 0000000..ec27957 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link ParDoTranslation}. */ +@RunWith(Parameterized.class) +public class ParDoTranslationTest { + public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + private static PCollectionView<Long> singletonSideInput = + p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L)) + .apply(View.<Long>asSingleton()); + private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput = + p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam"))) + .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())) + .apply(View.<Long, String>asMultimap()); + + private static PCollection<KV<Long, String>> mainInput = + p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))); + + @Parameters(name = "{index}: {0}") + public static Iterable<ParDo.MultiOutput<?, ?>> data() { + return ImmutableList.<ParDo.MultiOutput<?, ?>>of( + ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()), + ParDo.of(new DropElementsFn()) + .withOutputTags(new TupleTag<Void>(), TupleTagList.empty()) + .withSideInputs(singletonSideInput, multimapSideInput), + ParDo.of(new DropElementsFn()) + .withOutputTags( + new TupleTag<Void>(), + TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})) + .withSideInputs(singletonSideInput, multimapSideInput), + ParDo.of(new DropElementsFn()) + .withOutputTags( + new TupleTag<Void>(), + TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))); + } + + @Parameter(0) + public ParDo.MultiOutput<KV<Long, String>, Void> parDo; + + @Test + public void testToAndFromProto() throws Exception { + SdkComponents components = SdkComponents.create(); + ParDoPayload payload = ParDoTranslation.toProto(parDo, components); + + assertThat(ParDoTranslation.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn())); + assertThat( + ParDoTranslation.getMainOutputTag(payload), + Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag())); + for (PCollectionView<?> view : parDo.getSideInputs()) { + payload.getSideInputsOrThrow(view.getTagInternal().getId()); + } + } + + @Test + public void toAndFromTransformProto() throws Exception { + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput); + inputs.putAll(parDo.getAdditionalInputs()); + PCollectionTuple output = mainInput.apply(parDo); + + SdkComponents components = SdkComponents.create(); + String transformId = + components.registerPTransform( + AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of( + "foo", inputs, output.expand(), parDo, p), + Collections.<AppliedPTransform<?, ?, ?>>emptyList()); + + Components protoComponents = components.toComponents(); + RunnerApi.PTransform protoTransform = + protoComponents.getTransformsOrThrow(transformId); + ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); + for (PCollectionView<?> view : parDo.getSideInputs()) { + SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); + PCollectionView<?> restoredView = + ParDoTranslation.fromProto( + sideInput, view.getTagInternal().getId(), protoTransform, protoComponents); + assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); + assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); + assertThat( + restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass())); + assertThat( + restoredView.getWindowingStrategyInternal(), + Matchers.<WindowingStrategy<?, ?>>equalTo( + view.getWindowingStrategyInternal().fixDefaults())); + assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); + } + String mainInputId = components.registerPCollection(mainInput); + assertThat( + ParDoTranslation.getMainInput(protoTransform, protoComponents), + equalTo(protoComponents.getPcollectionsOrThrow(mainInputId))); + } + + private static class DropElementsFn extends DoFn<KV<Long, String>, Void> { + @ProcessElement + public void proc(ProcessContext context, BoundedWindow window) { + context.output(null); + } + + @Override + public boolean equals(Object other) { + return other instanceof DropElementsFn; + } + + @Override + public int hashCode() { + return DropElementsFn.class.hashCode(); + } + } + + @SuppressWarnings("unused") + private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> { + private static final String BAG_STATE_ID = "bagState"; + private static final String COMBINING_STATE_ID = "combiningState"; + private static final String EVENT_TIMER_ID = "eventTimer"; + private static final String PROCESSING_TIMER_ID = "processingTimer"; + + @StateId(BAG_STATE_ID) + private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of()); + + @StateId(COMBINING_STATE_ID) + private final StateSpec<CombiningState<Long, long[], Long>> combiningState = + StateSpecs.combining( + new BinaryCombineLongFn() { + @Override + public long apply(long left, long right) { + return Math.max(left, right); + } + + @Override + public long identity() { + return Long.MIN_VALUE; + } + }); + + @TimerId(EVENT_TIMER_ID) + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @TimerId(PROCESSING_TIMER_ID) + private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement + public void dropInput( + ProcessContext context, + BoundedWindow window, + @StateId(BAG_STATE_ID) BagState<String> bagStateState, + @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState, + @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer, + @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) { + context.output(null); + } + + @OnTimer(EVENT_TIMER_ID) + public void onEventTime(OnTimerContext context) {} + + @OnTimer(PROCESSING_TIMER_ID) + public void onProcessingTime(OnTimerContext context) {} + + @Override + public boolean equals(Object other) { + return other instanceof StateTimerDropElementsFn; + } + + @Override + public int hashCode() { + return StateTimerDropElementsFn.class.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/44609383/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java deleted file mode 100644 index b6f0b7d..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDosTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.state.BagState; -import org.apache.beam.sdk.state.CombiningState; -import org.apache.beam.sdk.state.StateSpec; -import org.apache.beam.sdk.state.StateSpecs; -import org.apache.beam.sdk.state.TimeDomain; -import org.apache.beam.sdk.state.Timer; -import org.apache.beam.sdk.state.TimerSpec; -import org.apache.beam.sdk.state.TimerSpecs; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine.BinaryCombineLongFn; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.MultiOutput; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.hamcrest.Matchers; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Tests for {@link ParDos}. */ -@RunWith(Parameterized.class) -public class ParDosTest { - public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); - - private static PCollectionView<Long> singletonSideInput = - p.apply("GenerateSingleton", GenerateSequence.from(0L).to(1L)) - .apply(View.<Long>asSingleton()); - private static PCollectionView<Map<Long, Iterable<String>>> multimapSideInput = - p.apply("CreateMultimap", Create.of(KV.of(1L, "foo"), KV.of(1L, "bar"), KV.of(2L, "spam"))) - .setCoder(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of())) - .apply(View.<Long, String>asMultimap()); - - private static PCollection<KV<Long, String>> mainInput = - p.apply("CreateMainInput", Create.empty(KvCoder.of(VarLongCoder.of(), StringUtf8Coder.of()))); - - @Parameters(name = "{index}: {0}") - public static Iterable<ParDo.MultiOutput<?, ?>> data() { - return ImmutableList.<ParDo.MultiOutput<?, ?>>of( - ParDo.of(new DropElementsFn()).withOutputTags(new TupleTag<Void>(), TupleTagList.empty()), - ParDo.of(new DropElementsFn()) - .withOutputTags(new TupleTag<Void>(), TupleTagList.empty()) - .withSideInputs(singletonSideInput, multimapSideInput), - ParDo.of(new DropElementsFn()) - .withOutputTags( - new TupleTag<Void>(), - TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {})) - .withSideInputs(singletonSideInput, multimapSideInput), - ParDo.of(new DropElementsFn()) - .withOutputTags( - new TupleTag<Void>(), - TupleTagList.of(new TupleTag<byte[]>() {}).and(new TupleTag<Integer>() {}))); - } - - @Parameter(0) - public ParDo.MultiOutput<KV<Long, String>, Void> parDo; - - @Test - public void testToAndFromProto() throws Exception { - SdkComponents components = SdkComponents.create(); - ParDoPayload payload = ParDos.toProto(parDo, components); - - assertThat(ParDos.getDoFn(payload), Matchers.<DoFn<?, ?>>equalTo(parDo.getFn())); - assertThat( - ParDos.getMainOutputTag(payload), Matchers.<TupleTag<?>>equalTo(parDo.getMainOutputTag())); - for (PCollectionView<?> view : parDo.getSideInputs()) { - payload.getSideInputsOrThrow(view.getTagInternal().getId()); - } - } - - @Test - public void toAndFromTransformProto() throws Exception { - Map<TupleTag<?>, PValue> inputs = new HashMap<>(); - inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput); - inputs.putAll(parDo.getAdditionalInputs()); - PCollectionTuple output = mainInput.apply(parDo); - - SdkComponents components = SdkComponents.create(); - String transformId = - components.registerPTransform( - AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of( - "foo", inputs, output.expand(), parDo, p), - Collections.<AppliedPTransform<?, ?, ?>>emptyList()); - - Components protoComponents = components.toComponents(); - RunnerApi.PTransform protoTransform = - protoComponents.getTransformsOrThrow(transformId); - ParDoPayload parDoPayload = protoTransform.getSpec().getParameter().unpack(ParDoPayload.class); - for (PCollectionView<?> view : parDo.getSideInputs()) { - SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId()); - PCollectionView<?> restoredView = - ParDos.fromProto( - sideInput, view.getTagInternal().getId(), protoTransform, protoComponents); - assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); - assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); - assertThat( - restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass())); - assertThat( - restoredView.getWindowingStrategyInternal(), - Matchers.<WindowingStrategy<?, ?>>equalTo( - view.getWindowingStrategyInternal().fixDefaults())); - assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); - } - String mainInputId = components.registerPCollection(mainInput); - assertThat( - ParDos.getMainInput(protoTransform, protoComponents), - equalTo(protoComponents.getPcollectionsOrThrow(mainInputId))); - } - - private static class DropElementsFn extends DoFn<KV<Long, String>, Void> { - @ProcessElement - public void proc(ProcessContext context, BoundedWindow window) { - context.output(null); - } - - @Override - public boolean equals(Object other) { - return other instanceof DropElementsFn; - } - - @Override - public int hashCode() { - return DropElementsFn.class.hashCode(); - } - } - - @SuppressWarnings("unused") - private static class StateTimerDropElementsFn extends DoFn<KV<Long, String>, Void> { - private static final String BAG_STATE_ID = "bagState"; - private static final String COMBINING_STATE_ID = "combiningState"; - private static final String EVENT_TIMER_ID = "eventTimer"; - private static final String PROCESSING_TIMER_ID = "processingTimer"; - - @StateId(BAG_STATE_ID) - private final StateSpec<BagState<String>> bagState = StateSpecs.bag(StringUtf8Coder.of()); - - @StateId(COMBINING_STATE_ID) - private final StateSpec<CombiningState<Long, long[], Long>> combiningState = - StateSpecs.combining( - new BinaryCombineLongFn() { - @Override - public long apply(long left, long right) { - return Math.max(left, right); - } - - @Override - public long identity() { - return Long.MIN_VALUE; - } - }); - - @TimerId(EVENT_TIMER_ID) - private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); - - @TimerId(PROCESSING_TIMER_ID) - private final TimerSpec processingTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); - - @ProcessElement - public void dropInput( - ProcessContext context, - BoundedWindow window, - @StateId(BAG_STATE_ID) BagState<String> bagStateState, - @StateId(COMBINING_STATE_ID) CombiningState<Long, long[], Long> combiningStateState, - @TimerId(EVENT_TIMER_ID) Timer eventTimerTimer, - @TimerId(PROCESSING_TIMER_ID) Timer processingTimerTimer) { - context.output(null); - } - - @OnTimer(EVENT_TIMER_ID) - public void onEventTime(OnTimerContext context) {} - - @OnTimer(PROCESSING_TIMER_ID) - public void onProcessingTime(OnTimerContext context) {} - - @Override - public boolean equals(Object other) { - return other instanceof StateTimerDropElementsFn; - } - - @Override - public int hashCode() { - return StateTimerDropElementsFn.class.hashCode(); - } - } -}
