rahuliyer95 opened a new issue, #22854:
URL: https://github.com/apache/beam/issues/22854
### What happened?
When using expansion service with Beam SDK 2.41.0, the type inference is
failing with the following exception,
```
Traceback (most recent call last):
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
line 193, in _run_module_as_main
"__main__", mod_spec)
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
line 85, in _run_code
exec(code, run_globals)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/__main__.py",
line 87, in <module>
bootstrap_pex(__entry_point__)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex_bootstrapper.py",
line 591, in bootstrap_pex
pex.PEX(entry_point).execute()
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 528, in execute
sys.exit(self._wrap_coverage(self._wrap_profiling, self._execute))
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 435, in _wrap_coverage
return runner(*args)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 466, in _wrap_profiling
return runner(*args)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 556, in _execute
EntryPoint.parse("run = {}".format(self._pex_info.entry_point))
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 739, in execute_entry
return self.execute_module(entry_point.module)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
line 747, in execute_module
runpy.run_module(module_name, run_name="__main__", alter_sys=True)
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
line 205, in run_module
return _run_module_code(code, init_globals, run_name, mod_spec)
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
line 85, in _run_code
exec(code, run_globals)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py",
line 180, in <module>
main()
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py",
line 157, in main
>> dal_rows.ReadRows("tweetsource-public", "public_tweets",
opts.date).with_output_types(beam.Row)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py",
line 1095, in __ror__
return self.transform.__ror__(pvalueish, self.label)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py",
line 617, in __ror__
result = p.apply(self, pvalueish, label)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py",
line 663, in apply
return self.apply(transform, pvalueish)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py",
line 709, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/dataflow/dataflow_runner.py",
line 141, in apply
return super().apply(transform, input, options)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py",
line 185, in apply
return m(transform, input, options)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py",
line 215, in apply_PTransform
return transform.expand(input)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py",
line 547, in expand
pcoll_id in self._expanded_transform.outputs.items()
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py",
line 547, in <dictcomp>
pcoll_id in self._expanded_transform.outputs.items()
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
line 115, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pvalue.py",
line 210, in from_runner_api
element_type=context.element_type_from_coder_id(proto.coder_id),
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
line 269, in element_type_from_coder_id
self.coders[coder_id].to_type_hint())
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
line 163, in __getitem__
return self.get_by_id(id)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
line 115, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/coders.py",
line 385, in from_runner_api
context)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py",
line 111, in from_runner_api_parameter
return RowCoder(schema)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py",
line 63, in __init__
self._type_hint = named_tuple_from_schema(self.schema)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
line 462, in named_tuple_from_schema
schema_registry=schema_registry)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
line 179, in typing_from_runner_api
schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
line 407, in typing_from_runner_api
field_py_type = self.typing_from_runner_api(field.type)
File
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
line 365, in typing_from_runner_api
return Optional[base]
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
line 254, in inner
return func(*args, **kwds)
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
line 357, in __getitem__
arg = _type_check(parameters, "Optional[t] requires a single type.")
File
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
line 142, in _type_check
raise TypeError(f"{msg} Got {arg!r:.100}.")
TypeError: Optional[t] requires a single type. Got
Row(location=typing.Union[typing.Sequence[str], NoneType],
empty__=typing.Union[bool, NoneType]).
```
This is how we are using the external transform,
```
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions,
PipelineOptions
from apache_beam.transforms.external import ExpansionAndArtifactRetrievalStub
import grpc
import typing
External = typing.NamedTuple(
"External",
[
("id", str),
],
)
creds = grpc.ssl_channel_credentials()
channel = grpc.secure_channel("localhost:10001", creds)
grpc.channel_ready_future(channel).result()
stub = ExpansionAndArtifactRetrievalStub(channel)
with beam.Pipeline(options=PipelineOptions()) as p:
p | "Read" >> beam.ExternalTransform("URN",
NamedTupleBasedPayloadBuilder(External("123")), stub)
```
The same code works fine with SDK 2.40.0
### Issue Priority
Priority: 1
### Issue Component
Component: sdk-py-core
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]