[ 
https://issues.apache.org/jira/browse/BEAM-2717?focusedWorklogId=158567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158567
 ]

ASF GitHub Bot logged work on BEAM-2717:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Oct/18 07:43
            Start Date: 25/Oct/18 07:43
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #6813: 
[BEAM-2717] Emit coders in generated protos.
URL: https://github.com/apache/beam/pull/6813#discussion_r228064034
 
 

 ##########
 File path: sdks/python/apache_beam/pipeline.py
 ##########
 @@ -603,12 +605,36 @@ def visit_value(self, value, _):
     self.visit(Visitor())
     return Visitor.ok
 
-  def to_runner_api(self, return_context=False, context=None):
+  def to_runner_api(
+      self, return_context=False, context=None, use_fake_coders=False):
     """For internal use only; no backwards-compatibility guarantees."""
     from apache_beam.runners import pipeline_context
     from apache_beam.portability.api import beam_runner_api_pb2
     if context is None:
-      context = pipeline_context.PipelineContext()
+      context = pipeline_context.PipelineContext(
+          use_fake_coders=use_fake_coders)
+
+    # The RunnerAPI spec requires certain transforms to have KV inputs
+    # (and corresponding outputs).
+    # Currently we only upgrade to KV pairs.  If there is a need for more
+    # general shapes, potential conflicts will have to be resolved.
+    class ForceKvInputTypes(PipelineVisitor):
+      def enter_composite_transform(self, transform_node):
+        self.visit_transform(transform_node)
+
+      def visit_transform(self, transform_node):
+        if transform_node.transform and 
transform_node.transform.runner_api_requires_keyed_input():
+          pcoll = transform_node.inputs[0]
+          pcoll.element_type = typehints.coerce_to_kv_type(
+              pcoll.element_type, transform_node.full_label)
+          if len(transform_node.outputs) == 1:
 
 Review comment:
   Done, and yes, it's sufficient. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 158567)
    Time Spent: 1h 20m  (was: 1h 10m)

> Infer coders in SDK prior to handing off pipeline to Runner
> -----------------------------------------------------------
>
>                 Key: BEAM-2717
>                 URL: https://issues.apache.org/jira/browse/BEAM-2717
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Priority: Minor
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently all runners have to duplicate this work, and there's also a hack 
> storing the element type rather than the coder in the Runner protos.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to