This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab361e8bf5 [BEAM-14250] Amended the workaround (#17531)
5ab361e8bf5 is described below

commit 5ab361e8bf5748c392def153fdbe1d2f773170c2
Author: Ning Kang <[email protected]>
AuthorDate: Wed May 4 12:17:52 2022 -0700

    [BEAM-14250] Amended the workaround (#17531)
    
    Made row coder work with stringified typehint when a
    NamedTupleInMainModule is registered with a RowCoder.
---
 sdks/python/apache_beam/coders/row_coder.py  | 5 +++++
 sdks/python/apache_beam/coders/typecoders.py | 4 ++--
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/coders/row_coder.py 
b/sdks/python/apache_beam/coders/row_coder.py
index 077da8ed516..bf15764b5aa 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -112,6 +112,11 @@ class RowCoder(FastCoder):
 
   @classmethod
   def from_type_hint(cls, type_hint, registry):
+    # TODO(BEAM-14250): Remove once all runners are portable.
+    if isinstance(type_hint, str):
+      import importlib
+      main_module = importlib.import_module('__main__')
+      type_hint = getattr(main_module, type_hint, type_hint)
     schema = schema_from_element_type(type_hint)
     return cls(schema)
 
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index ba2df530372..2a05bea090a 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -119,7 +119,7 @@ class CoderRegistry(object):
     if typehint_type.__module__ == '__main__':
       # See https://issues.apache.org/jira/browse/BEAM-14250
       # TODO(robertwb): Remove once all runners are portable.
-      typehint_type = str(typehint_type)
+      typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
     self._register_coder_internal(typehint_type, typehint_coder_class)
 
   def get_coder(self, typehint):
@@ -127,7 +127,7 @@ class CoderRegistry(object):
     if typehint and typehint.__module__ == '__main__':
       # See https://issues.apache.org/jira/browse/BEAM-14250
       # TODO(robertwb): Remove once all runners are portable.
-      typehint = str(typehint)
+      typehint = getattr(typehint, '__name__', str(typehint))
     coder = self._coders.get(
         typehint.__class__
         if isinstance(typehint, typehints.TypeConstraint) else typehint,

Reply via email to