[ 
https://issues.apache.org/jira/browse/BEAM-4654?focusedWorklogId=120101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-120101
 ]

ASF GitHub Bot logged work on BEAM-4654:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Jul/18 22:13
            Start Date: 06/Jul/18 22:13
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on a change in pull request #5883: 
[BEAM-4654] Treat timers as PCollections within proto representation in the 
Java SDK.
URL: https://github.com/apache/beam/pull/5883#discussion_r200781061
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
 ##########
 @@ -88,39 +91,72 @@
       "urn:beam:windowmappingfn:javasdk:0.1";
 
   /** A {@link TransformPayloadTranslator} for {@link ParDo}. */
-  public static class ParDoPayloadTranslator
-      implements TransformPayloadTranslator<MultiOutput<?, ?>> {
-    public static TransformPayloadTranslator create() {
-      return new ParDoPayloadTranslator();
+  public static class ParDoTranslator implements 
TransformTranslator<MultiOutput<?, ?>> {
+
+    public static TransformTranslator create() {
+      return new ParDoTranslator();
     }
 
-    private ParDoPayloadTranslator() {}
+    private ParDoTranslator() {}
 
     @Override
     public String getUrn(ParDo.MultiOutput<?, ?> transform) {
       return PAR_DO_TRANSFORM_URN;
     }
 
     @Override
-    public FunctionSpec translate(
-        AppliedPTransform<?, ?, MultiOutput<?, ?>> transform, SdkComponents 
components)
-        throws IOException {
-      ParDoPayload payload =
-          translateParDo(transform.getTransform(), transform.getPipeline(), 
components);
-      return RunnerApi.FunctionSpec.newBuilder()
-          .setUrn(PAR_DO_TRANSFORM_URN)
-          .setPayload(payload.toByteString())
-          .build();
+    public boolean canTranslate(PTransform<?, ?> pTransform) {
+      return pTransform instanceof ParDo.MultiOutput;
     }
 
-    /** Registers {@link ParDoPayloadTranslator}. */
-    @AutoService(TransformPayloadTranslatorRegistrar.class)
-    public static class Registrar implements 
TransformPayloadTranslatorRegistrar {
-      @Override
-      public Map<? extends Class<? extends PTransform>, ? extends 
TransformPayloadTranslator>
-          getTransformPayloadTranslators() {
-        return Collections.singletonMap(ParDo.MultiOutput.class, new 
ParDoPayloadTranslator());
+    @Override
+    public RunnerApi.PTransform translate(
+        AppliedPTransform<?, ?, ?> appliedPTransform,
+        List<AppliedPTransform<?, ?, ?>> subtransforms,
+        SdkComponents components)
+        throws IOException {
+      RunnerApi.PTransform.Builder builder =
+          PTransformTranslation.translateAppliedPTransform(
+              appliedPTransform, subtransforms, components);
+
+      ParDoPayload payload =
+          translateParDo(
+              (ParDo.MultiOutput) appliedPTransform.getTransform(),
+              appliedPTransform.getPipeline(),
+              components);
+      builder.setSpec(
+          RunnerApi.FunctionSpec.newBuilder()
+              .setUrn(PAR_DO_TRANSFORM_URN)
+              .setPayload(payload.toByteString())
+              .build());
+
+      String mainInputId = getMainInputId(builder, payload);
+      PCollection<KV<?, ?>> mainInput =
+          (PCollection) appliedPTransform.getInputs().get(new 
TupleTag(mainInputId));
+
+      // https://s.apache.org/beam-portability-timers
+      // Add a PCollection and coder for each timer. Also treat them as inputs 
and outputs.
+      for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+        PCollection<?> timerPCollection =
 
 Review comment:
   How is TimerSpec encoded in this PCollectoon?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 120101)
    Time Spent: 50m  (was: 40m)

> Update pipeline translation for timers inside Java SDK
> ------------------------------------------------------
>
>                 Key: BEAM-4654
>                 URL: https://issues.apache.org/jira/browse/BEAM-4654
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Add the timer PCollection and treat timers as inputs/outputs of the ParDo 
> PTransform.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to