This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ab361e8bf5 [BEAM-14250] Amended the workaround (#17531)
5ab361e8bf5 is described below
commit 5ab361e8bf5748c392def153fdbe1d2f773170c2
Author: Ning Kang <[email protected]>
AuthorDate: Wed May 4 12:17:52 2022 -0700
[BEAM-14250] Amended the workaround (#17531)
Made row coder work with stringified typehint when a
NamedTupleInMainModule is registered with a RowCoder.
---
sdks/python/apache_beam/coders/row_coder.py | 5 +++++
sdks/python/apache_beam/coders/typecoders.py | 4 ++--
2 files changed, 7 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/coders/row_coder.py
b/sdks/python/apache_beam/coders/row_coder.py
index 077da8ed516..bf15764b5aa 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -112,6 +112,11 @@ class RowCoder(FastCoder):
@classmethod
def from_type_hint(cls, type_hint, registry):
+ # TODO(BEAM-14250): Remove once all runners are portable.
+ if isinstance(type_hint, str):
+ import importlib
+ main_module = importlib.import_module('__main__')
+ type_hint = getattr(main_module, type_hint, type_hint)
schema = schema_from_element_type(type_hint)
return cls(schema)
diff --git a/sdks/python/apache_beam/coders/typecoders.py
b/sdks/python/apache_beam/coders/typecoders.py
index ba2df530372..2a05bea090a 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -119,7 +119,7 @@ class CoderRegistry(object):
if typehint_type.__module__ == '__main__':
# See https://issues.apache.org/jira/browse/BEAM-14250
# TODO(robertwb): Remove once all runners are portable.
- typehint_type = str(typehint_type)
+ typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
self._register_coder_internal(typehint_type, typehint_coder_class)
def get_coder(self, typehint):
@@ -127,7 +127,7 @@ class CoderRegistry(object):
if typehint and typehint.__module__ == '__main__':
# See https://issues.apache.org/jira/browse/BEAM-14250
# TODO(robertwb): Remove once all runners are portable.
- typehint = str(typehint)
+ typehint = getattr(typehint, '__name__', str(typehint))
coder = self._coders.get(
typehint.__class__
if isinstance(typehint, typehints.TypeConstraint) else typehint,