robertwb commented on a change in pull request #11185: [BEAM-8019] Some 
generalizations to support cross-language transforms.
URL: https://github.com/apache/beam/pull/11185#discussion_r397542542
 
 

 ##########
 File path: sdks/python/apache_beam/pvalue.py
 ##########
 @@ -181,11 +185,32 @@ def from_(pcoll):
     """
     return PCollection(pcoll.pipeline, is_bounded=pcoll.is_bounded)
 
+  def _recursively_add_external_coders(
+      self, new_context, old_context, coder_proto):
+    for component_coder_id in coder_proto.component_coder_ids:
+      component_coder_proto = (
+          old_context.coders.get_id_to_proto_map()[component_coder_id])
+      new_context.coders.get_id_to_proto_map()[component_coder_id] = (
+          component_coder_proto)
+      self._recursively_add_external_coders(
+          new_context, old_context, component_coder_proto)
+
   def to_runner_api(self, context):
     # type: (PipelineContext) -> beam_runner_api_pb2.PCollection
+
+    if isinstance(self.element_type, ElementTypeHolder):
+      # This is potentially an coder in an external SDK that cannot be directly
+      # parsed in Python SDK.
+      coder_proto = self.element_type.coder
+      coder_id = context.coders.get_by_proto(coder_proto, deduplicate=True)
+      self._recursively_add_external_coders(
+          context, self.element_type.context, coder_proto)
+    else:
+      coder_id = context.coder_id_from_element_type(self.element_type)
 
 Review comment:
   Could you make this work generically rather than having an 
isinstance(self.element_type, ElementTypeHolder) check above?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to