[
https://issues.apache.org/jira/browse/BEAM-4654?focusedWorklogId=120179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120179
]
ASF GitHub Bot logged work on BEAM-4654:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jul/18 02:15
Start Date: 07/Jul/18 02:15
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #5883: [BEAM-4654] Treat
timers as PCollections within proto representation in the Java SDK.
URL: https://github.com/apache/beam/pull/5883
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 3da7871d6db..ee93a6fbc8f 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -23,9 +23,13 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,10 +40,12 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
import
org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
+import
org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -47,8 +53,8 @@
import org.apache.beam.sdk.values.TupleTag;
/**
- * Utilities for converting {@link PTransform PTransforms} to and from {@link
RunnerApi Runner API
- * protocol buffers}.
+ * Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi
Runner API protocol
+ * buffers}.
*/
public class PTransformTranslation {
@@ -87,31 +93,16 @@
public static final String MULTIMAP_SIDE_INPUT =
getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
- private static final Map<Class<? extends PTransform>,
TransformPayloadTranslator>
- KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+ private static final Collection<TransformTranslator<?>> KNOWN_TRANSLATORS =
+ loadKnownTranslators();
- private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
- loadTransformPayloadTranslators() {
- HashMap<Class<? extends PTransform>, TransformPayloadTranslator>
translators = new HashMap<>();
-
- for (TransformPayloadTranslatorRegistrar registrar :
- ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-
- Map<Class<? extends PTransform>, TransformPayloadTranslator>
newTranslators =
- (Map) registrar.getTransformPayloadTranslators();
-
- Set<Class<? extends PTransform>> alreadyRegistered =
- Sets.intersection(translators.keySet(), newTranslators.keySet());
-
- if (!alreadyRegistered.isEmpty()) {
- throw new IllegalArgumentException(
- String.format(
- "Classes already registered: %s", Joiner.on(",
").join(alreadyRegistered)));
- }
-
- translators.putAll(newTranslators);
- }
- return ImmutableMap.copyOf(translators);
+ private static Collection<TransformTranslator<?>> loadKnownTranslators() {
+ return ImmutableSortedSet.<TransformTranslator<?>>orderedBy(
+ (Comparator) ObjectsClassComparator.INSTANCE)
+ .add(new RawPTransformTranslator())
+ .add(new KnownTransformPayloadTranslator())
+ .add(ParDoTranslator.create())
+ .build();
}
private PTransformTranslation() {}
@@ -126,62 +117,13 @@ private PTransformTranslation() {}
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
- // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
- RunnerApi.PTransform.Builder transformBuilder =
RunnerApi.PTransform.newBuilder();
- for (Map.Entry<TupleTag<?>, PValue> taggedInput :
appliedPTransform.getInputs().entrySet()) {
- checkArgument(
- taggedInput.getValue() instanceof PCollection,
- "Unexpected input type %s",
- taggedInput.getValue().getClass());
- transformBuilder.putInputs(
- toProto(taggedInput.getKey()),
- components.registerPCollection((PCollection<?>)
taggedInput.getValue()));
- }
- for (Map.Entry<TupleTag<?>, PValue> taggedOutput :
appliedPTransform.getOutputs().entrySet()) {
- // TODO: Remove gating
- if (taggedOutput.getValue() instanceof PCollection) {
- checkArgument(
- taggedOutput.getValue() instanceof PCollection,
- "Unexpected output type %s",
- taggedOutput.getValue().getClass());
- transformBuilder.putOutputs(
- toProto(taggedOutput.getKey()),
- components.registerPCollection((PCollection<?>)
taggedOutput.getValue()));
- }
- }
- for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
-
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
- }
- transformBuilder.setUniqueName(appliedPTransform.getFullName());
- transformBuilder.setDisplayData(
-
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
-
- PTransform<?, ?> transform = appliedPTransform.getTransform();
-
- // A RawPTransform directly vends its payload. Because it will generally be
- // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
- if (transform instanceof RawPTransform) {
- // The raw transform was parsed in the context of other components; this
puts it in the
- // context of our current serialization
- FunctionSpec spec = ((RawPTransform<?, ?>)
transform).migrate(components);
-
- // A composite transform is permitted to have a null spec. There are
also some pseudo-
- // primitives not yet supported by the portability framework that have
null specs
- if (spec != null) {
- transformBuilder.setSpec(spec);
- }
- } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
- FunctionSpec spec =
- KNOWN_PAYLOAD_TRANSLATORS
- .get(transform.getClass())
- .translate(appliedPTransform, components);
- if (spec != null) {
- transformBuilder.setSpec(spec);
- }
- }
-
- return transformBuilder.build();
+ TransformTranslator<?> transformTranslator =
+ Iterables.find(
+ KNOWN_TRANSLATORS,
+ (translator) ->
translator.canTranslate(appliedPTransform.getTransform()),
+ DefaultUnknownTransformTranslator.INSTANCE);
+ return transformTranslator.translate(appliedPTransform, subtransforms,
components);
}
/**
@@ -204,18 +146,12 @@ private static String toProto(TupleTag<?> tag) {
/** Returns the URN for the transform if it is known, otherwise {@code
null}. */
@Nullable
public static String urnForTransformOrNull(PTransform<?, ?> transform) {
-
- // A RawPTransform directly vends its URN. Because it will generally be
- // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
- if (transform instanceof RawPTransform) {
- return ((RawPTransform) transform).getUrn();
- }
-
- TransformPayloadTranslator translator =
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
- if (translator == null) {
- return null;
- }
- return translator.getUrn(transform);
+ TransformTranslator<?> transformTranslator =
+ Iterables.find(
+ KNOWN_TRANSLATORS,
+ (translator) -> translator.canTranslate(transform),
+ DefaultUnknownTransformTranslator.INSTANCE);
+ return ((TransformTranslator) transformTranslator).getUrn(transform);
}
/** Returns the URN for the transform if it is known, otherwise throws. */
@@ -235,16 +171,207 @@ public static String
urnForTransformOrNull(RunnerApi.PTransform transform) {
}
/**
- * A bi-directional translator between a Java-based {@link PTransform} and a
protobuf payload for
- * that transform.
+ * A translator between a Java-based {@link PTransform} and a protobuf for
that transform.
*
* <p>When going to a protocol buffer message, the translator produces a
payload corresponding to
- * the Java representation while registering components that payload
references.
+ * the Java representation while registering components that transform
references.
+ */
+ public interface TransformTranslator<T extends PTransform<?, ?>> {
+ @Nullable
+ String getUrn(T transform);
+
+ boolean canTranslate(PTransform<?, ?> pTransform);
+
+ RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException;
+ }
+
+ /** Translates all unknown transforms to have an empty {@link FunctionSpec}
and unset URN. */
+ private static class DefaultUnknownTransformTranslator
+ implements TransformTranslator<PTransform<?, ?>> {
+ private static final TransformTranslator<?> INSTANCE = new
DefaultUnknownTransformTranslator();
+
+ @Override
+ public String getUrn(PTransform<?, ?> transform) {
+ return null;
+ }
+
+ @Override
+ public boolean canTranslate(PTransform<?, ?> pTransform) {
+ return true;
+ }
+
+ @Override
+ public RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ return translateAppliedPTransform(appliedPTransform, subtransforms,
components).build();
+ }
+ }
+
+ /**
+ * Translates {@link RawPTransform} by extracting the {@link FunctionSpec}
and migrating over all
+ * referenced components.
+ */
+ private static class RawPTransformTranslator implements
TransformTranslator<RawPTransform<?, ?>> {
+ @Override
+ public String getUrn(RawPTransform transform) {
+ return transform.getUrn();
+ }
+
+ @Override
+ public boolean canTranslate(PTransform<?, ?> pTransform) {
+ return pTransform instanceof RawPTransform;
+ }
+
+ @Override
+ public RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder transformBuilder =
+ translateAppliedPTransform(appliedPTransform, subtransforms,
components);
+
+ PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+ // The raw transform was parsed in the context of other components; this
puts it in the
+ // context of our current serialization
+ FunctionSpec spec = ((RawPTransform<?, ?>)
transform).migrate(components);
+
+ // A composite transform is permitted to have a null spec. There are
also some pseudo-
+ // primitives not yet supported by the portability framework that have
null specs
+ if (spec != null) {
+ transformBuilder.setSpec(spec);
+ }
+
+ return transformBuilder.build();
+ }
+ }
+
+ /**
+ * Translates a set of registered transforms whose content only differs
based by differences in
+ * their {@link FunctionSpec}s and URNs.
+ */
+ private static class KnownTransformPayloadTranslator<T extends PTransform<?,
?>>
+ implements TransformTranslator<T> {
+ private static final Map<Class<? extends PTransform>,
TransformPayloadTranslator>
+ KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+
+ private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
+ loadTransformPayloadTranslators() {
+ HashMap<Class<? extends PTransform>, TransformPayloadTranslator>
translators =
+ new HashMap<>();
+
+ for (TransformPayloadTranslatorRegistrar registrar :
+ ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+ Map<Class<? extends PTransform>, TransformPayloadTranslator>
newTranslators =
+ (Map) registrar.getTransformPayloadTranslators();
+
+ Set<Class<? extends PTransform>> alreadyRegistered =
+ Sets.intersection(translators.keySet(), newTranslators.keySet());
+
+ if (!alreadyRegistered.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Classes already registered: %s", Joiner.on(",
").join(alreadyRegistered)));
+ }
+
+ translators.putAll(newTranslators);
+ }
+ return ImmutableMap.copyOf(translators);
+ }
+
+ @Override
+ public boolean canTranslate(PTransform pTransform) {
+ return KNOWN_PAYLOAD_TRANSLATORS.containsKey(pTransform.getClass());
+ }
+
+ @Override
+ public String getUrn(PTransform transform) {
+ return
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
+ }
+
+ @Override
+ public RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder transformBuilder =
+ translateAppliedPTransform(appliedPTransform, subtransforms,
components);
+
+ FunctionSpec spec =
+ KNOWN_PAYLOAD_TRANSLATORS
+ .get(appliedPTransform.getTransform().getClass())
+ .translate(appliedPTransform, components);
+ if (spec != null) {
+ transformBuilder.setSpec(spec);
+ }
+ return transformBuilder.build();
+ }
+ }
+
+ /**
+ * Translates an {@link AppliedPTransform} by:
*
- * <p>When "rehydrating" a protocol buffer message, the translator returns a
{@link RawPTransform}
- * - because the transform may not be Java-based, it is not possible to
rebuild a Java-based
- * {@link PTransform}. The resulting {@link RawPTransform} subclass
encapsulates the knowledge of
- * which components are referenced in the payload.
+ * <ul>
+ * <li>adding an input to the PTransform for each {@link
AppliedPTransform#getInputs()}.
+ * <li>adding an output to the PTransform for each {@link
AppliedPTransform#getOutputs()}.
+ * <li>adding a PCollection for each {@link
AppliedPTransform#getOutputs()}.
+ * <li>adding a reference to each subtransform.
+ * <li>set the unique name.
+ * <li>set the display data.
+ * </ul>
+ */
+ static RunnerApi.PTransform.Builder translateAppliedPTransform(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder transformBuilder =
RunnerApi.PTransform.newBuilder();
+ for (Map.Entry<TupleTag<?>, PValue> taggedInput :
appliedPTransform.getInputs().entrySet()) {
+ checkArgument(
+ taggedInput.getValue() instanceof PCollection,
+ "Unexpected input type %s",
+ taggedInput.getValue().getClass());
+ transformBuilder.putInputs(
+ toProto(taggedInput.getKey()),
+ components.registerPCollection((PCollection<?>)
taggedInput.getValue()));
+ }
+ for (Map.Entry<TupleTag<?>, PValue> taggedOutput :
appliedPTransform.getOutputs().entrySet()) {
+ // TODO: Remove gating
+ if (taggedOutput.getValue() instanceof PCollection) {
+ checkArgument(
+ taggedOutput.getValue() instanceof PCollection,
+ "Unexpected output type %s",
+ taggedOutput.getValue().getClass());
+ transformBuilder.putOutputs(
+ toProto(taggedOutput.getKey()),
+ components.registerPCollection((PCollection<?>)
taggedOutput.getValue()));
+ }
+ }
+ for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
+
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
+ }
+
+ transformBuilder.setUniqueName(appliedPTransform.getFullName());
+ transformBuilder.setDisplayData(
+
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
+ return transformBuilder;
+ }
+
+ /**
+ * A translator between a Java-based {@link PTransform} and a protobuf
payload for that transform.
+ *
+ * <p>When going to a protocol buffer message, the translator produces a
payload corresponding to
+ * the Java representation while registering components that payload
references.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 8a0c3431031..b2ab9c8f21c 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -24,7 +24,6 @@
import static
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
import static
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerSpecOrThrow;
-import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -47,8 +46,11 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
import
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import
org.apache.beam.runners.core.construction.PTransformTranslation.TransformTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
@@ -72,6 +74,7 @@
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.util.DoFnAndMainOutput;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -88,13 +91,13 @@
"urn:beam:windowmappingfn:javasdk:0.1";
/** A {@link TransformPayloadTranslator} for {@link ParDo}. */
- public static class ParDoPayloadTranslator
- implements TransformPayloadTranslator<MultiOutput<?, ?>> {
- public static TransformPayloadTranslator create() {
- return new ParDoPayloadTranslator();
+ public static class ParDoTranslator implements
TransformTranslator<MultiOutput<?, ?>> {
+
+ public static TransformTranslator create() {
+ return new ParDoTranslator();
}
- private ParDoPayloadTranslator() {}
+ private ParDoTranslator() {}
@Override
public String getUrn(ParDo.MultiOutput<?, ?> transform) {
@@ -102,25 +105,58 @@ public String getUrn(ParDo.MultiOutput<?, ?> transform) {
}
@Override
- public FunctionSpec translate(
- AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents
components)
- throws IOException {
- ParDoPayload payload =
- translateParDo(transform.getTransform(), transform.getPipeline(),
components);
- return RunnerApi.FunctionSpec.newBuilder()
- .setUrn(PAR_DO_TRANSFORM_URN)
- .setPayload(payload.toByteString())
- .build();
+ public boolean canTranslate(PTransform<?, ?> pTransform) {
+ return pTransform instanceof ParDo.MultiOutput;
}
- /** Registers {@link ParDoPayloadTranslator}. */
- @AutoService(TransformPayloadTranslatorRegistrar.class)
- public static class Registrar implements
TransformPayloadTranslatorRegistrar {
- @Override
- public Map<? extends Class<? extends PTransform>, ? extends
TransformPayloadTranslator>
- getTransformPayloadTranslators() {
- return Collections.singletonMap(ParDo.MultiOutput.class, new
ParDoPayloadTranslator());
+ @Override
+ public RunnerApi.PTransform translate(
+ AppliedPTransform<?, ?, ?> appliedPTransform,
+ List<AppliedPTransform<?, ?, ?>> subtransforms,
+ SdkComponents components)
+ throws IOException {
+ RunnerApi.PTransform.Builder builder =
+ PTransformTranslation.translateAppliedPTransform(
+ appliedPTransform, subtransforms, components);
+
+ ParDoPayload payload =
+ translateParDo(
+ (ParDo.MultiOutput) appliedPTransform.getTransform(),
+ appliedPTransform.getPipeline(),
+ components);
+ builder.setSpec(
+ RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PAR_DO_TRANSFORM_URN)
+ .setPayload(payload.toByteString())
+ .build());
+
+ String mainInputId = getMainInputId(builder, payload);
+ PCollection<KV<?, ?>> mainInput =
+ (PCollection) appliedPTransform.getInputs().get(new
TupleTag(mainInputId));
+
+ // https://s.apache.org/beam-portability-timers
+ // Add a PCollection and coder for each timer. Also treat them as inputs
and outputs.
+ for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+ PCollection<?> timerPCollection =
+ PCollection.createPrimitiveOutputInternal(
+ // Create a dummy pipeline since we don't want to modify the
current
+ // users view of the pipeline they have constructed.
+ Pipeline.create(),
+ mainInput.getWindowingStrategy(),
+ mainInput.isBounded(),
+ KvCoder.of(
+ ((KvCoder) mainInput.getCoder()).getKeyCoder(),
+ // TODO: Add support for timer payloads to the SDK
+ // We currently assume that all payloads are unspecified.
+ Timer.Coder.of(VoidCoder.of())));
+ timerPCollection.setName(
+ String.format("%s.%s", appliedPTransform.getFullName(),
localTimerName));
+ String timerPCollectionId =
components.registerPCollection(timerPCollection);
+ builder.putInputs(localTimerName, timerPCollectionId);
+ builder.putOutputs(localTimerName, timerPCollectionId);
}
+
+ return builder.build();
}
}
@@ -300,11 +336,17 @@ public static TupleTagList
getAdditionalOutputTags(AppliedPTransform<?, ?, ?> ap
"Unexpected payload type %s",
ptransform.getSpec().getUrn());
ParDoPayload payload =
ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
- String mainInputId =
- Iterables.getOnlyElement(
- Sets.difference(
- ptransform.getInputsMap().keySet(),
payload.getSideInputsMap().keySet()));
- return
components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+ return components.getPcollectionsOrThrow(
+ ptransform.getInputsOrThrow(getMainInputId(ptransform, payload)));
+ }
+
+ /** Returns the main input id of the ptransform. */
+ private static String getMainInputId(
+ RunnerApi.PTransformOrBuilder ptransform, RunnerApi.ParDoPayload
payload) {
+ return Iterables.getOnlyElement(
+ Sets.difference(
+ ptransform.getInputsMap().keySet(),
+ Sets.union(payload.getSideInputsMap().keySet(),
payload.getTimerSpecsMap().keySet())));
}
public static RunnerApi.StateSpec translateStateSpec(
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 572d24d77e9..68975dd4924 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
@@ -29,10 +30,12 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.state.BagState;
@@ -50,6 +53,8 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
@@ -101,6 +106,8 @@
new TupleTag<>(),
TupleTagList.of(new TupleTag<byte[]>() {}).and(new
TupleTag<Integer>() {})),
ParDo.of(new SplittableDropElementsFn())
+ .withOutputTags(new TupleTag<>(), TupleTagList.empty()),
+ ParDo.of(new StateTimerDropElementsFn())
.withOutputTags(new TupleTag<>(), TupleTagList.empty()));
}
@@ -108,7 +115,7 @@
public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
@Test
- public void testToAndFromProto() throws Exception {
+ public void testToProto() throws Exception {
SdkComponents components = SdkComponents.create();
components.registerEnvironment(Environment.newBuilder().setUrl("java").build());
ParDoPayload payload = ParDoTranslation.translateParDo(parDo, p,
components);
@@ -121,7 +128,7 @@ public void testToAndFromProto() throws Exception {
}
@Test
- public void toAndFromTransformProto() throws Exception {
+ public void toTransformProto() throws Exception {
Map<TupleTag<?>, PValue> inputs = new HashMap<>();
inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
inputs.putAll(parDo.getAdditionalInputs());
@@ -163,6 +170,35 @@ public void toAndFromTransformProto() throws Exception {
assertThat(
ParDoTranslation.getMainInput(protoTransform, components),
equalTo(components.getPcollectionsOrThrow(mainInputId)));
+
+ // Validate that the timer PCollections are added correctly.
+ DoFnSignature signature = DoFnSignatures.signatureForDoFn(parDo.getFn());
+
+ for (String localTimerName : signature.timerDeclarations().keySet()) {
+ RunnerApi.PCollection timerPCollection =
+ components.getPcollectionsOrThrow(String.format("foo.%s",
localTimerName));
+ assertEquals(
+ components.getPcollectionsOrThrow(mainInputId).getIsBounded(),
+ timerPCollection.getIsBounded());
+ assertEquals(
+
components.getPcollectionsOrThrow(mainInputId).getWindowingStrategyId(),
+ timerPCollection.getWindowingStrategyId());
+ ModelCoders.KvCoderComponents timerKvCoderComponents =
+ ModelCoders.getKvCoderComponents(
+ components.getCodersOrThrow(timerPCollection.getCoderId()));
+ Coder<?> timerKeyCoder =
+ CoderTranslation.fromProto(
+
components.getCodersOrThrow(timerKvCoderComponents.keyCoderId()),
+ rehydratedComponents);
+ assertEquals(VarLongCoder.of(), timerKeyCoder);
+ Coder<?> timerValueCoder =
+ CoderTranslation.fromProto(
+
components.getCodersOrThrow(timerKvCoderComponents.valueCoderId()),
+ rehydratedComponents);
+ assertEquals(
+
org.apache.beam.runners.core.construction.Timer.Coder.of(VoidCoder.of()),
+ timerValueCoder);
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 120179)
Time Spent: 1h 20m (was: 1h 10m)
> Update pipeline translation for timers inside Java SDK
> ------------------------------------------------------
>
> Key: BEAM-4654
> URL: https://issues.apache.org/jira/browse/BEAM-4654
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-java-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Labels: portability
> Fix For: 2.6.0
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Add the timer PCollection and treat timers as inputs/outputs of the ParDo
> PTransform.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)