[
https://issues.apache.org/jira/browse/BEAM-12803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416936#comment-17416936
]
Jonathan Hourany edited comment on BEAM-12803 at 9/20/21, 8:27 PM:
-------------------------------------------------------------------
Good news! I was able to build and test my changes locally, and, it seems to be
working.
This is working and correctly counts the 3 lines:
{code:python}
import json
from typing import NamedTuple, Optional
import apache_beam as beam
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform
class TestTuple(NamedTuple):
foo: int
bar: Optional[str]
coders.registry.register_coder(TestTuple, coders.RowCoder)
with beam.Pipeline() as pipeline:
_ = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM PCOLLECTION
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
{code}
----
As does passing a PCollection to SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
test_tbl = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
)
( test_tbl
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM PCOLLECTION
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
)
{code}
----
However, what I can't seem to get to work is passing a name-Pcollection dict
and using the name of the Pcollection in SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
test_tbl = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
)
({"test_tbl": test_tbl}
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM test_tbl
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
)
{code}
This leads to "RuntimeError: Unable to parse query" exception where Calcite
reports "Object 'test_tbl' not found". I can't tell if I'm passing the KV pair
in wrong somehow/accessing table incorrectly inside the query, -or if this is a
Py39 issue.- Edit: I rebuilt everything in Python 3.7 and got the same error,
so it's looking like I'm just doing something wrong here.
If it is just user error then I'll get a PR up soon with details I found on why
some tests are failing only on Python 3.9
was (Author: jonathan hourany):
Good news! I was able to build and test my changes locally, and, it seems to be
working.
This is working and correctly counts the 3 lines:
{code:python}
import json
from typing import NamedTuple, Optional
import apache_beam as beam
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform
class TestTuple(NamedTuple):
foo: int
bar: Optional[str]
coders.registry.register_coder(TestTuple, coders.RowCoder)
with beam.Pipeline() as pipeline:
_ = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM PCOLLECTION
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
{code}
----
As does passing a PCollection to SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
test_tbl = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
)
( test_tbl
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM PCOLLECTION
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
)
{code}
----
However, what I can't seem to get to work is passing a name-Pcollection dict
and using the name of the Pcollection in SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
test_tbl = (
pipeline
| beam.Create(
[
'{"foo": 1, "bar": "test"}',
'{"foo": 3, "bar": "test"}',
'{"foo": 5, "bar": null}',
]
)
| beam.Map(json.loads)
| beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
)
({"test_tbl": test_tbl}
| SqlTransform("""
SELECT COUNT(*) AS num_lines
FROM test_tbl
""")
| beam.Map(lambda row: f"Num Rows: {row.num_lines}")
| beam.Map(print)
)
{code}
This leads to "RuntimeError: Unable to parse query" exception where Calcite
reports "Object 'test_tabl' not found". I can't tell if I'm passing the KV pair
in wrong somehow/accessing table incorrectly inside the query, -or if this is a
Py39 issue.- Edit: I rebuilt everything in Python 3.7 and got the same error,
so it's looking like I'm just doing something wrong here.
If it is just user error then I'll get a PR up soon with details I found on why
some tests are failing only on Python 3.9
> SqlTransform doesn't work on python 3.9
> ---------------------------------------
>
> Key: BEAM-12803
> URL: https://issues.apache.org/jira/browse/BEAM-12803
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: sean teeling
> Assignee: Brian Hulette
> Priority: P2
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Working example below -(Is there no way to paste pre-formatted code into
> jira?!)- (EDIT: I added the appropriate "code" block)
> {code:python}
> import itertools
> import csv
> import io
> import apache_beam as beam
> from apache_beam.dataframe.io import read_csv
> from apache_beam.transforms.sql import SqlTransform
> def parse_csv(val):
> deflower_headers(iterator):
> return itertools.chain([next(iterator).lower()], iterator)
> return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))
> class BeamTransformBuilder():
> def build(self, pipeline):
> practices = (
> pipeline
> | beam.io.fileio.MatchFiles("data.csv")
> | beam.io.fileio.ReadMatches()
> | beam.Reshuffle()
> | beam.FlatMap(parse_csv)
> | beam.Map(lambda x: beam.Row(id="test-id"))
> | SqlTransform("""
> SELECT
> id
> FROM PCOLLECTION""")
> )
> practices | beam.Map(print)
> def main():
> builder = BeamTransformBuilder()
> with beam.Pipeline('DirectRunner') as p:
> builder.build(p)
> if __name__ == '__main__':
> main()
> {code}
>
> Results in the error:
>
> {code:java}
> File
> "/usr/local/lib/python3.9/site-packages/apache_beam/typehints/schemas.py",
> line 185, in typing_to_runner_api
> element_type = typing_to_runner_api(_get_args(type_)[0])
> IndexError: tuple index out of range
> {code}
>
>
> Tested on Python 3.9.6.
>
> Annoyingly, it is difficult to test this out on other python versions.
> There's no documentation for how to setup a docker container using
> DirectRunner and running it locally. There's barely any documentation on what
> python versions are supported. And using pyenv, and pip install apache-beam
> requires a lot of other downloads that have conflicts when other versions are
> already installed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)