[ 
https://issues.apache.org/jira/browse/BEAM-12803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416936#comment-17416936
 ] 

Jonathan Hourany edited comment on BEAM-12803 at 9/18/21, 2:21 AM:
-------------------------------------------------------------------

Good news! I was able to build and test my changes locally, and, it seems to be 
 working.

This is working and correctly counts the 3 lines:
{code:python}
import json
from typing import NamedTuple, Optional

import apache_beam as beam
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform


class TestTuple(NamedTuple):
    foo: int
    bar: Optional[str]


coders.registry.register_coder(TestTuple, coders.RowCoder)


with beam.Pipeline() as pipeline:
    _ = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM PCOLLECTION
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
{code}
----
 

As does passing a PCollection to SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
    test_tbl = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
    )
    ( test_tbl
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM PCOLLECTION
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
    )
{code}
----
 

However, what I can't seem to get to work is passing a name-Pcollection dict 
and using the name of the Pcollection in SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
    test_tbl = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
    )
    
    ({"test_tbl": test_tbl}
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM test_tbl
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
    )
{code}
This leads to "RuntimeError: Unable to parse query" exception where Calcite 
reports "Object 'test_tabl' not found". I can't tell if I'm passing the KV pair 
in wrong somehow/accessing table incorrectly inside the query, -or if this is a 
Py39 issue.- Edit: I rebuilt everything in Python 3.7 and got the same error, 
so it's looking like I'm just doing something wrong here.

If it is just user error then I'll get a PR up soon with details I found on why 
some tests are failing only on Python 3.9


was (Author: jonathan hourany):
Good news! I was able to build and test my changes locally, and, it seems to be 
 working.

This is working and correctly counts the 3 lines:
{code:python}
import json
from typing import NamedTuple, Optional

import apache_beam as beam
from apache_beam import coders
from apache_beam.transforms.sql import SqlTransform


class TestTuple(NamedTuple):
    foo: int
    bar: Optional[str]


coders.registry.register_coder(TestTuple, coders.RowCoder)


with beam.Pipeline() as pipeline:
    _ = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM PCOLLECTION
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
{code}
----
 

As does passing a PCollection to SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
    test_tbl = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
    )
    ( test_tbl
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM PCOLLECTION
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
    )
{code}
----
 

However, what I can't seem to get to work is passing a name-Pcollection dict 
and using the name of the Pcollection in SqlTransformer
{code:python}
with beam.Pipeline() as pipeline:
    test_tbl = (
        pipeline
        | beam.Create(
                [
                    '{"foo": 1, "bar": "test"}',
                    '{"foo": 3, "bar": "test"}',
                    '{"foo": 5, "bar": null}',
                ]
            )
        | beam.Map(json.loads)
        | beam.Map(lambda x: TestTuple(**x)).with_output_types(TestTuple)
    )
    
    ({"test_tbl": test_tbl}
        | SqlTransform("""
            SELECT COUNT(*) AS num_lines
            FROM test_tbl
            """)
        | beam.Map(lambda row: f"Num Rows: {row.num_lines}")
        | beam.Map(print)
    )
{code}
This leads to "RuntimeError: Unable to parse query" exception where Calcite 
reports "Object 'test_tabl' not found". I can't tell if I'm passing the KV pair 
in wrong somehow/accessing table incorrectly inside the query, or if this is a 
Py39 issue.

If it is just user error then I'll get a PR up soon.

> SqlTransform doesn't work on python 3.9
> ---------------------------------------
>
>                 Key: BEAM-12803
>                 URL: https://issues.apache.org/jira/browse/BEAM-12803
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: sean teeling
>            Assignee: Brian Hulette
>            Priority: P2
>
> Working example below -(Is there no way to paste pre-formatted code into 
> jira?!)- (EDIT: I added the appropriate "code" block)
> {code:python}
> import itertools
> import csv
> import io
> import apache_beam as beam
> from apache_beam.dataframe.io import read_csv
> from apache_beam.transforms.sql import SqlTransform
> def parse_csv(val):
> deflower_headers(iterator):
> return itertools.chain([next(iterator).lower()], iterator)
> return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))
> class BeamTransformBuilder():
>   def build(self, pipeline):
>     practices = (
>         pipeline
>           | beam.io.fileio.MatchFiles("data.csv")
>           | beam.io.fileio.ReadMatches()
>           | beam.Reshuffle()
>           | beam.FlatMap(parse_csv)
>           | beam.Map(lambda x: beam.Row(id="test-id"))
>           | SqlTransform("""
>                 SELECT
>                 id
>                 FROM PCOLLECTION""")
>         )
>     practices | beam.Map(print)
> def main():
>   builder = BeamTransformBuilder()
>   with beam.Pipeline('DirectRunner') as p:
>   builder.build(p)
> if __name__ == '__main__':
>   main()
> {code}
>  
>  Results in the error:
>  
> {code:java}
>   File 
> "/usr/local/lib/python3.9/site-packages/apache_beam/typehints/schemas.py", 
> line 185, in typing_to_runner_api
>     element_type = typing_to_runner_api(_get_args(type_)[0])
> IndexError: tuple index out of range
> {code}
>  
>  
> Tested on Python 3.9.6. 
>  
> Annoyingly, it is difficult to test this out on other python versions. 
> There's no documentation for how to setup a docker container using 
> DirectRunner and running it locally. There's barely any documentation on what 
> python versions are supported. And using pyenv, and pip install apache-beam 
> requires a lot of other downloads that have conflicts when other versions are 
> already installed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to