[ 
https://issues.apache.org/jira/browse/BEAM-4654?focusedWorklogId=120179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120179
 ]

ASF GitHub Bot logged work on BEAM-4654:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jul/18 02:15
            Start Date: 07/Jul/18 02:15
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5883: [BEAM-4654] Treat 
timers as PCollections within proto representation in the Java SDK.
URL: https://github.com/apache/beam/pull/5883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 3da7871d6db..ee93a6fbc8f 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -23,9 +23,13 @@
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,10 +40,12 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
 import 
org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
+import 
org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -47,8 +53,8 @@
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Utilities for converting {@link PTransform PTransforms} to and from {@link 
RunnerApi Runner API
- * protocol buffers}.
+ * Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi 
Runner API protocol
+ * buffers}.
  */
 public class PTransformTranslation {
 
@@ -87,31 +93,16 @@
   public static final String MULTIMAP_SIDE_INPUT =
       getUrn(RunnerApi.StandardSideInputTypes.Enum.MULTIMAP);
 
-  private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
-      KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+  private static final Collection<TransformTranslator<?>> KNOWN_TRANSLATORS =
+      loadKnownTranslators();
 
-  private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
-      loadTransformPayloadTranslators() {
-    HashMap<Class<? extends PTransform>, TransformPayloadTranslator> 
translators = new HashMap<>();
-
-    for (TransformPayloadTranslatorRegistrar registrar :
-        ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
-
-      Map<Class<? extends PTransform>, TransformPayloadTranslator> 
newTranslators =
-          (Map) registrar.getTransformPayloadTranslators();
-
-      Set<Class<? extends PTransform>> alreadyRegistered =
-          Sets.intersection(translators.keySet(), newTranslators.keySet());
-
-      if (!alreadyRegistered.isEmpty()) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Classes already registered: %s", Joiner.on(", 
").join(alreadyRegistered)));
-      }
-
-      translators.putAll(newTranslators);
-    }
-    return ImmutableMap.copyOf(translators);
+  private static Collection<TransformTranslator<?>> loadKnownTranslators() {
+    return ImmutableSortedSet.<TransformTranslator<?>>orderedBy(
+            (Comparator) ObjectsClassComparator.INSTANCE)
+        .add(new RawPTransformTranslator())
+        .add(new KnownTransformPayloadTranslator())
+        .add(ParDoTranslator.create())
+        .build();
   }
 
   private PTransformTranslation() {}
@@ -126,62 +117,13 @@ private PTransformTranslation() {}
       List<AppliedPTransform<?, ?, ?>> subtransforms,
       SdkComponents components)
       throws IOException {
-    // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
-    RunnerApi.PTransform.Builder transformBuilder = 
RunnerApi.PTransform.newBuilder();
-    for (Map.Entry<TupleTag<?>, PValue> taggedInput : 
appliedPTransform.getInputs().entrySet()) {
-      checkArgument(
-          taggedInput.getValue() instanceof PCollection,
-          "Unexpected input type %s",
-          taggedInput.getValue().getClass());
-      transformBuilder.putInputs(
-          toProto(taggedInput.getKey()),
-          components.registerPCollection((PCollection<?>) 
taggedInput.getValue()));
-    }
-    for (Map.Entry<TupleTag<?>, PValue> taggedOutput : 
appliedPTransform.getOutputs().entrySet()) {
-      // TODO: Remove gating
-      if (taggedOutput.getValue() instanceof PCollection) {
-        checkArgument(
-            taggedOutput.getValue() instanceof PCollection,
-            "Unexpected output type %s",
-            taggedOutput.getValue().getClass());
-        transformBuilder.putOutputs(
-            toProto(taggedOutput.getKey()),
-            components.registerPCollection((PCollection<?>) 
taggedOutput.getValue()));
-      }
-    }
-    for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
-      
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
-    }
 
-    transformBuilder.setUniqueName(appliedPTransform.getFullName());
-    transformBuilder.setDisplayData(
-        
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
-
-    PTransform<?, ?> transform = appliedPTransform.getTransform();
-
-    // A RawPTransform directly vends its payload. Because it will generally be
-    // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
-    if (transform instanceof RawPTransform) {
-      // The raw transform was parsed in the context of other components; this 
puts it in the
-      // context of our current serialization
-      FunctionSpec spec = ((RawPTransform<?, ?>) 
transform).migrate(components);
-
-      // A composite transform is permitted to have a null spec. There are 
also some pseudo-
-      // primitives not yet supported by the portability framework that have 
null specs
-      if (spec != null) {
-        transformBuilder.setSpec(spec);
-      }
-    } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(transform.getClass())
-              .translate(appliedPTransform, components);
-      if (spec != null) {
-        transformBuilder.setSpec(spec);
-      }
-    }
-
-    return transformBuilder.build();
+    TransformTranslator<?> transformTranslator =
+        Iterables.find(
+            KNOWN_TRANSLATORS,
+            (translator) -> 
translator.canTranslate(appliedPTransform.getTransform()),
+            DefaultUnknownTransformTranslator.INSTANCE);
+    return transformTranslator.translate(appliedPTransform, subtransforms, 
components);
   }
 
   /**
@@ -204,18 +146,12 @@ private static String toProto(TupleTag<?> tag) {
   /** Returns the URN for the transform if it is known, otherwise {@code 
null}. */
   @Nullable
   public static String urnForTransformOrNull(PTransform<?, ?> transform) {
-
-    // A RawPTransform directly vends its URN. Because it will generally be
-    // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
-    if (transform instanceof RawPTransform) {
-      return ((RawPTransform) transform).getUrn();
-    }
-
-    TransformPayloadTranslator translator = 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
-    if (translator == null) {
-      return null;
-    }
-    return translator.getUrn(transform);
+    TransformTranslator<?> transformTranslator =
+        Iterables.find(
+            KNOWN_TRANSLATORS,
+            (translator) -> translator.canTranslate(transform),
+            DefaultUnknownTransformTranslator.INSTANCE);
+    return ((TransformTranslator) transformTranslator).getUrn(transform);
   }
 
   /** Returns the URN for the transform if it is known, otherwise throws. */
@@ -235,16 +171,207 @@ public static String 
urnForTransformOrNull(RunnerApi.PTransform transform) {
   }
 
   /**
-   * A bi-directional translator between a Java-based {@link PTransform} and a 
protobuf payload for
-   * that transform.
+   * A translator between a Java-based {@link PTransform} and a protobuf for 
that transform.
    *
    * <p>When going to a protocol buffer message, the translator produces a 
payload corresponding to
-   * the Java representation while registering components that payload 
references.
+   * the Java representation while registering components that transform 
references.
+   */
+  public interface TransformTranslator<T extends PTransform<?, ?>> {
+    @Nullable
+    String getUrn(T transform);
+
+    boolean canTranslate(PTransform<?, ?> pTransform);
+
+    RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException;
+  }
+
+  /** Translates all unknown transforms to have an empty {@link FunctionSpec} 
and unset URN. */
+  private static class DefaultUnknownTransformTranslator
+      implements TransformTranslator<PTransform<?, ?>> {
+    private static final TransformTranslator<?> INSTANCE = new 
DefaultUnknownTransformTranslator();
+
+    @Override
+    public String getUrn(PTransform<?, ?> transform) {
+      return null;
+    }
+
+    @Override
+    public boolean canTranslate(PTransform<?, ?> pTransform) {
+      return true;
+    }
+
+    @Override
+    public RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException {
+      return translateAppliedPTransform(appliedPTransform, subtransforms, 
components).build();
+    }
+  }
+
+  /**
+   * Translates {@link RawPTransform} by extracting the {@link FunctionSpec} 
and migrating over all
+   * referenced components.
+   */
+  private static class RawPTransformTranslator implements 
TransformTranslator<RawPTransform<?, ?>> {
+    @Override
+    public String getUrn(RawPTransform transform) {
+      return transform.getUrn();
+    }
+
+    @Override
+    public boolean canTranslate(PTransform<?, ?> pTransform) {
+      return pTransform instanceof RawPTransform;
+    }
+
+    @Override
+    public RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException {
+      RunnerApi.PTransform.Builder transformBuilder =
+          translateAppliedPTransform(appliedPTransform, subtransforms, 
components);
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      // The raw transform was parsed in the context of other components; this 
puts it in the
+      // context of our current serialization
+      FunctionSpec spec = ((RawPTransform<?, ?>) 
transform).migrate(components);
+
+      // A composite transform is permitted to have a null spec. There are 
also some pseudo-
+      // primitives not yet supported by the portability framework that have 
null specs
+      if (spec != null) {
+        transformBuilder.setSpec(spec);
+      }
+
+      return transformBuilder.build();
+    }
+  }
+
+  /**
+   * Translates a set of registered transforms whose content only differs 
based by differences in
+   * their {@link FunctionSpec}s and URNs.
+   */
+  private static class KnownTransformPayloadTranslator<T extends PTransform<?, 
?>>
+      implements TransformTranslator<T> {
+    private static final Map<Class<? extends PTransform>, 
TransformPayloadTranslator>
+        KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators();
+
+    private static Map<Class<? extends PTransform>, TransformPayloadTranslator>
+        loadTransformPayloadTranslators() {
+      HashMap<Class<? extends PTransform>, TransformPayloadTranslator> 
translators =
+          new HashMap<>();
+
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+
+        Map<Class<? extends PTransform>, TransformPayloadTranslator> 
newTranslators =
+            (Map) registrar.getTransformPayloadTranslators();
+
+        Set<Class<? extends PTransform>> alreadyRegistered =
+            Sets.intersection(translators.keySet(), newTranslators.keySet());
+
+        if (!alreadyRegistered.isEmpty()) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "Classes already registered: %s", Joiner.on(", 
").join(alreadyRegistered)));
+        }
+
+        translators.putAll(newTranslators);
+      }
+      return ImmutableMap.copyOf(translators);
+    }
+
+    @Override
+    public boolean canTranslate(PTransform pTransform) {
+      return KNOWN_PAYLOAD_TRANSLATORS.containsKey(pTransform.getClass());
+    }
+
+    @Override
+    public String getUrn(PTransform transform) {
+      return 
KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
+    }
+
+    @Override
+    public RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException {
+      RunnerApi.PTransform.Builder transformBuilder =
+          translateAppliedPTransform(appliedPTransform, subtransforms, 
components);
+
+      FunctionSpec spec =
+          KNOWN_PAYLOAD_TRANSLATORS
+              .get(appliedPTransform.getTransform().getClass())
+              .translate(appliedPTransform, components);
+      if (spec != null) {
+        transformBuilder.setSpec(spec);
+      }
+      return transformBuilder.build();
+    }
+  }
+
+  /**
+   * Translates an {@link AppliedPTransform} by:
    *
-   * <p>When "rehydrating" a protocol buffer message, the translator returns a 
{@link RawPTransform}
-   * - because the transform may not be Java-based, it is not possible to 
rebuild a Java-based
-   * {@link PTransform}. The resulting {@link RawPTransform} subclass 
encapsulates the knowledge of
-   * which components are referenced in the payload.
+   * <ul>
+   *   <li>adding an input to the PTransform for each {@link 
AppliedPTransform#getInputs()}.
+   *   <li>adding an output to the PTransform for each {@link 
AppliedPTransform#getOutputs()}.
+   *   <li>adding a PCollection for each {@link 
AppliedPTransform#getOutputs()}.
+   *   <li>adding a reference to each subtransform.
+   *   <li>set the unique name.
+   *   <li>set the display data.
+   * </ul>
+   */
+  static RunnerApi.PTransform.Builder translateAppliedPTransform(
+      AppliedPTransform<?, ?, ?> appliedPTransform,
+      List<AppliedPTransform<?, ?, ?>> subtransforms,
+      SdkComponents components)
+      throws IOException {
+    RunnerApi.PTransform.Builder transformBuilder = 
RunnerApi.PTransform.newBuilder();
+    for (Map.Entry<TupleTag<?>, PValue> taggedInput : 
appliedPTransform.getInputs().entrySet()) {
+      checkArgument(
+          taggedInput.getValue() instanceof PCollection,
+          "Unexpected input type %s",
+          taggedInput.getValue().getClass());
+      transformBuilder.putInputs(
+          toProto(taggedInput.getKey()),
+          components.registerPCollection((PCollection<?>) 
taggedInput.getValue()));
+    }
+    for (Map.Entry<TupleTag<?>, PValue> taggedOutput : 
appliedPTransform.getOutputs().entrySet()) {
+      // TODO: Remove gating
+      if (taggedOutput.getValue() instanceof PCollection) {
+        checkArgument(
+            taggedOutput.getValue() instanceof PCollection,
+            "Unexpected output type %s",
+            taggedOutput.getValue().getClass());
+        transformBuilder.putOutputs(
+            toProto(taggedOutput.getKey()),
+            components.registerPCollection((PCollection<?>) 
taggedOutput.getValue()));
+      }
+    }
+    for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
+      
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
+    }
+
+    transformBuilder.setUniqueName(appliedPTransform.getFullName());
+    transformBuilder.setDisplayData(
+        
DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
+    return transformBuilder;
+  }
+
+  /**
+   * A translator between a Java-based {@link PTransform} and a protobuf 
payload for that transform.
+   *
+   * <p>When going to a protocol buffer message, the translator produces a 
payload corresponding to
+   * the Java representation while registering components that payload 
references.
    */
   public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
     String getUrn(T transform);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 8a0c3431031..b2ab9c8f21c 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -24,7 +24,6 @@
 import static 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
 import static 
org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerSpecOrThrow;
 
-import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -47,8 +46,11 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder;
 import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import 
org.apache.beam.runners.core.construction.PTransformTranslation.TransformTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -72,6 +74,7 @@
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.DoFnAndMainOutput;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -88,13 +91,13 @@
       "urn:beam:windowmappingfn:javasdk:0.1";
 
   /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
-  public static class ParDoPayloadTranslator
-      implements TransformPayloadTranslator<MultiOutput<?, ?>> {
-    public static TransformPayloadTranslator create() {
-      return new ParDoPayloadTranslator();
+  public static class ParDoTranslator implements 
TransformTranslator<MultiOutput<?, ?>> {
+
+    public static TransformTranslator create() {
+      return new ParDoTranslator();
     }
 
-    private ParDoPayloadTranslator() {}
+    private ParDoTranslator() {}
 
     @Override
     public String getUrn(ParDo.MultiOutput<?, ?> transform) {
@@ -102,25 +105,58 @@ public String getUrn(ParDo.MultiOutput<?, ?> transform) {
     }
 
     @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents 
components)
-        throws IOException {
-      ParDoPayload payload =
-          translateParDo(transform.getTransform(), transform.getPipeline(), 
components);
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(PAR_DO_TRANSFORM_URN)
-          .setPayload(payload.toByteString())
-          .build();
+    public boolean canTranslate(PTransform<?, ?> pTransform) {
+      return pTransform instanceof ParDo.MultiOutput;
     }
 
-    /** Registers {@link ParDoPayloadTranslator}. */
-    @AutoService(TransformPayloadTranslatorRegistrar.class)
-    public static class Registrar implements 
TransformPayloadTranslatorRegistrar {
-      @Override
-      public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
-          getTransformPayloadTranslators() {
-        return Collections.singletonMap(ParDo.MultiOutput.class, new 
ParDoPayloadTranslator());
+    @Override
+    public RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException {
+      RunnerApi.PTransform.Builder builder =
+          PTransformTranslation.translateAppliedPTransform(
+              appliedPTransform, subtransforms, components);
+
+      ParDoPayload payload =
+          translateParDo(
+              (ParDo.MultiOutput) appliedPTransform.getTransform(),
+              appliedPTransform.getPipeline(),
+              components);
+      builder.setSpec(
+          RunnerApi.FunctionSpec.newBuilder()
+              .setUrn(PAR_DO_TRANSFORM_URN)
+              .setPayload(payload.toByteString())
+              .build());
+
+      String mainInputId = getMainInputId(builder, payload);
+      PCollection<KV<?, ?>> mainInput =
+          (PCollection) appliedPTransform.getInputs().get(new 
TupleTag(mainInputId));
+
+      // https://s.apache.org/beam-portability-timers
+      // Add a PCollection and coder for each timer. Also treat them as inputs 
and outputs.
+      for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+        PCollection<?> timerPCollection =
+            PCollection.createPrimitiveOutputInternal(
+                // Create a dummy pipeline since we don't want to modify the 
current
+                // users view of the pipeline they have constructed.
+                Pipeline.create(),
+                mainInput.getWindowingStrategy(),
+                mainInput.isBounded(),
+                KvCoder.of(
+                    ((KvCoder) mainInput.getCoder()).getKeyCoder(),
+                    // TODO: Add support for timer payloads to the SDK
+                    // We currently assume that all payloads are unspecified.
+                    Timer.Coder.of(VoidCoder.of())));
+        timerPCollection.setName(
+            String.format("%s.%s", appliedPTransform.getFullName(), 
localTimerName));
+        String timerPCollectionId = 
components.registerPCollection(timerPCollection);
+        builder.putInputs(localTimerName, timerPCollectionId);
+        builder.putOutputs(localTimerName, timerPCollectionId);
       }
+
+      return builder.build();
     }
   }
 
@@ -300,11 +336,17 @@ public static TupleTagList 
getAdditionalOutputTags(AppliedPTransform<?, ?, ?> ap
         "Unexpected payload type %s",
         ptransform.getSpec().getUrn());
     ParDoPayload payload = 
ParDoPayload.parseFrom(ptransform.getSpec().getPayload());
-    String mainInputId =
-        Iterables.getOnlyElement(
-            Sets.difference(
-                ptransform.getInputsMap().keySet(), 
payload.getSideInputsMap().keySet()));
-    return 
components.getPcollectionsOrThrow(ptransform.getInputsOrThrow(mainInputId));
+    return components.getPcollectionsOrThrow(
+        ptransform.getInputsOrThrow(getMainInputId(ptransform, payload)));
+  }
+
+  /** Returns the main input id of the ptransform. */
+  private static String getMainInputId(
+      RunnerApi.PTransformOrBuilder ptransform, RunnerApi.ParDoPayload 
payload) {
+    return Iterables.getOnlyElement(
+        Sets.difference(
+            ptransform.getInputsMap().keySet(),
+            Sets.union(payload.getSideInputsMap().keySet(), 
payload.getTimerSpecsMap().keySet())));
   }
 
   public static RunnerApi.StateSpec translateStateSpec(
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 572d24d77e9..68975dd4924 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -20,6 +20,7 @@
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
@@ -29,10 +30,12 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.BagState;
@@ -50,6 +53,8 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
@@ -101,6 +106,8 @@
                   new TupleTag<>(),
                   TupleTagList.of(new TupleTag<byte[]>() {}).and(new 
TupleTag<Integer>() {})),
           ParDo.of(new SplittableDropElementsFn())
+              .withOutputTags(new TupleTag<>(), TupleTagList.empty()),
+          ParDo.of(new StateTimerDropElementsFn())
               .withOutputTags(new TupleTag<>(), TupleTagList.empty()));
     }
 
@@ -108,7 +115,7 @@
     public ParDo.MultiOutput<KV<Long, String>, Void> parDo;
 
     @Test
-    public void testToAndFromProto() throws Exception {
+    public void testToProto() throws Exception {
       SdkComponents components = SdkComponents.create();
       
components.registerEnvironment(Environment.newBuilder().setUrl("java").build());
       ParDoPayload payload = ParDoTranslation.translateParDo(parDo, p, 
components);
@@ -121,7 +128,7 @@ public void testToAndFromProto() throws Exception {
     }
 
     @Test
-    public void toAndFromTransformProto() throws Exception {
+    public void toTransformProto() throws Exception {
       Map<TupleTag<?>, PValue> inputs = new HashMap<>();
       inputs.put(new TupleTag<KV<Long, String>>() {}, mainInput);
       inputs.putAll(parDo.getAdditionalInputs());
@@ -163,6 +170,35 @@ public void toAndFromTransformProto() throws Exception {
       assertThat(
           ParDoTranslation.getMainInput(protoTransform, components),
           equalTo(components.getPcollectionsOrThrow(mainInputId)));
+
+      // Validate that the timer PCollections are added correctly.
+      DoFnSignature signature = DoFnSignatures.signatureForDoFn(parDo.getFn());
+
+      for (String localTimerName : signature.timerDeclarations().keySet()) {
+        RunnerApi.PCollection timerPCollection =
+            components.getPcollectionsOrThrow(String.format("foo.%s", 
localTimerName));
+        assertEquals(
+            components.getPcollectionsOrThrow(mainInputId).getIsBounded(),
+            timerPCollection.getIsBounded());
+        assertEquals(
+            
components.getPcollectionsOrThrow(mainInputId).getWindowingStrategyId(),
+            timerPCollection.getWindowingStrategyId());
+        ModelCoders.KvCoderComponents timerKvCoderComponents =
+            ModelCoders.getKvCoderComponents(
+                components.getCodersOrThrow(timerPCollection.getCoderId()));
+        Coder<?> timerKeyCoder =
+            CoderTranslation.fromProto(
+                
components.getCodersOrThrow(timerKvCoderComponents.keyCoderId()),
+                rehydratedComponents);
+        assertEquals(VarLongCoder.of(), timerKeyCoder);
+        Coder<?> timerValueCoder =
+            CoderTranslation.fromProto(
+                
components.getCodersOrThrow(timerKvCoderComponents.valueCoderId()),
+                rehydratedComponents);
+        assertEquals(
+            
org.apache.beam.runners.core.construction.Timer.Coder.of(VoidCoder.of()),
+            timerValueCoder);
+      }
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120179)
    Time Spent: 1h 20m  (was: 1h 10m)

> Update pipeline translation for timers inside Java SDK
> ------------------------------------------------------
>
>                 Key: BEAM-4654
>                 URL: https://issues.apache.org/jira/browse/BEAM-4654
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>             Fix For: 2.6.0
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Add the timer PCollection and treat timers as inputs/outputs of the ParDo 
> PTransform.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to