rahuliyer95 opened a new issue, #22854:
URL: https://github.com/apache/beam/issues/22854

   ### What happened?
   
   When using expansion service with Beam SDK 2.41.0, the type inference is 
failing with the following exception,
   ```
   Traceback (most recent call last):
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
 line 193, in _run_module_as_main
       "__main__", mod_spec)
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
 line 85, in _run_code
       exec(code, run_globals)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/__main__.py",
 line 87, in <module>
       bootstrap_pex(__entry_point__)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex_bootstrapper.py",
 line 591, in bootstrap_pex
       pex.PEX(entry_point).execute()
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 528, in execute
       sys.exit(self._wrap_coverage(self._wrap_profiling, self._execute))
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 435, in _wrap_coverage
       return runner(*args)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 466, in _wrap_profiling
       return runner(*args)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 556, in _execute
       EntryPoint.parse("run = {}".format(self._pex_info.entry_point))
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 739, in execute_entry
       return self.execute_module(entry_point.module)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.bootstrap/pex/pex.py",
 line 747, in execute_module
       runpy.run_module(module_name, run_name="__main__", alter_sys=True)
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
 line 205, in run_module
       return _run_module_code(code, init_globals, run_name, mod_spec)
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
 line 96, in _run_module_code
       mod_name, mod_spec, pkg_name, script_name)
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/runpy.py",
 line 85, in _run_code
       exec(code, run_globals)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py",
 line 180, in <module>
       main()
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/twitter/udp/integration_tests/jobs/dal_read_job.py",
 line 157, in main
       >> dal_rows.ReadRows("tweetsource-public", "public_tweets", 
opts.date).with_output_types(beam.Row)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py",
 line 1095, in __ror__
       return self.transform.__ror__(pvalueish, self.label)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/ptransform.py",
 line 617, in __ror__
       result = p.apply(self, pvalueish, label)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py",
 line 663, in apply
       return self.apply(transform, pvalueish)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pipeline.py",
 line 709, in apply
       pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/dataflow/dataflow_runner.py",
 line 141, in apply
       return super().apply(transform, input, options)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py",
 line 185, in apply
       return m(transform, input, options)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/runner.py",
 line 215, in apply_PTransform
       return transform.expand(input)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py",
 line 547, in expand
       pcoll_id in self._expanded_transform.outputs.items()
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/transforms/external.py",
 line 547, in <dictcomp>
       pcoll_id in self._expanded_transform.outputs.items()
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
 line 115, in get_by_id
       self._id_to_proto[id], self._pipeline_context)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/pvalue.py",
 line 210, in from_runner_api
       element_type=context.element_type_from_coder_id(proto.coder_id),
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
 line 269, in element_type_from_coder_id
       self.coders[coder_id].to_type_hint())
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
 line 163, in __getitem__
       return self.get_by_id(id)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/runners/pipeline_context.py",
 line 115, in get_by_id
       self._id_to_proto[id], self._pipeline_context)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/coders.py",
 line 385, in from_runner_api
       context)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py",
 line 111, in from_runner_api_parameter
       return RowCoder(schema)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/coders/row_coder.py",
 line 63, in __init__
       self._type_hint = named_tuple_from_schema(self.schema)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
 line 462, in named_tuple_from_schema
       schema_registry=schema_registry)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
 line 179, in typing_from_runner_api
       schema_registry=schema_registry).typing_from_runner_api(fieldtype_proto)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
 line 407, in typing_from_runner_api
       field_py_type = self.typing_from_runner_api(field.type)
     File 
"/private/var/folders/yt/m4jhm2d5197c62c7v0kx77fc0000gp/T/tmps6_sxbu8/.deps/apache_beam-2.41.0-py3-none-any.whl/apache_beam/typehints/schemas.py",
 line 365, in typing_from_runner_api
       return Optional[base]
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
 line 254, in inner
       return func(*args, **kwds)
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
 line 357, in __getitem__
       arg = _type_check(parameters, "Optional[t] requires a single type.")
     File 
"/opt/twitter_mde/package/eepython37/886055eae06dbd005860a8b21c6f43355dd92462660a6eda8e580552513a78a2/lib/python3.7/typing.py",
 line 142, in _type_check
       raise TypeError(f"{msg} Got {arg!r:.100}.")
   TypeError: Optional[t] requires a single type. Got 
Row(location=typing.Union[typing.Sequence[str], NoneType], 
empty__=typing.Union[bool, NoneType]).
   ```
   
   This is how we are using the external transform,
   ```
   import apache_beam as beam
   from apache_beam.options.pipeline_options import GoogleCloudOptions, 
PipelineOptions
   from apache_beam.transforms.external import ExpansionAndArtifactRetrievalStub
   import grpc
   import typing
   
   External = typing.NamedTuple(
     "External",
     [
       ("id", str),
     ],
   )
   
   creds = grpc.ssl_channel_credentials()
   channel = grpc.secure_channel("localhost:10001", creds)
   grpc.channel_ready_future(channel).result()
   stub = ExpansionAndArtifactRetrievalStub(channel)
   with beam.Pipeline(options=PipelineOptions()) as p:
     p | "Read" >> beam.ExternalTransform("URN", 
NamedTupleBasedPayloadBuilder(External("123")), stub)
   ```
   
   The same code works fine with SDK 2.40.0
   
   ### Issue Priority
   
   Priority: 1
   
   ### Issue Component
   
   Component: sdk-py-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to