Maximilian Michels created BEAM-10389:
-----------------------------------------
Summary: SqlTransform only allows one registered RowCoder schema
Key: BEAM-10389
URL: https://issues.apache.org/jira/browse/BEAM-10389
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Maximilian Michels
The current workflow for using the SqlTransform is:
{code:python}
Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)])
beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
with self.create_pipeline() as p:
output = (
p
| 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)])
| 'Sql' >> SqlTransform(
"""SELECT col1, col2 || '*' || col2 as col2,
power(col1, 2) as col3
FROM PCOLLECTION))
{code}
This works fine, but when multiple row schemas are registered like this:
{code:python}
Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)])
beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
with self.create_pipeline() as p:
output = (
p
| 'Create' >> beam.Create([Row(x, str(x)) for x in range(5)])
| 'Sql' >> SqlTransform(
"""SELECT col1, col2 || '*' || col2 as col2,
power(col1, 2) as col3
FROM PCOLLECTION))
output2 = (
p
| 'Create2' >> beam.Create([Row2(x, str(x)) for x in range(5)])
| 'Sql2' >> SqlTransform(
"""SELECT col1, col2 || '*' || col2 as col2,
power(col1, 2) as col3
FROM PCOLLECTION
"""))
{code}
This yields:
{noformat}
RuntimeError: Re-used coder id: ref_Coder_RowCoder_1
{noformat}
Source:
https://github.com/apache/beam/blob/a8f390704925d3a371b007ccbfcfc28a48b312d1/sdks/python/apache_beam/transforms/external.py#L419
--
This message was sent by Atlassian Jira
(v8.3.4#803005)