sean teeling created BEAM-12804:
-----------------------------------

             Summary: Issue with SQLTransform in python 3.7.11
                 Key: BEAM-12804
                 URL: https://issues.apache.org/jira/browse/BEAM-12804
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
            Reporter: sean teeling


Using a SQLTransform results in an error.

 

python --version

Python 3.7.11

 

pip list | grep apache-beam

*apache-beam*        2.31.0

 

The following code works if removing the SQLTransform:

```
import itertools
import csv
import io

import apache_beam as beam
from apache_beam.dataframe.io import read_csv
from apache_beam.transforms.sql import SqlTransform


def parse_csv(val):
deflower_headers(iterator):
return itertools.chain([next(iterator).lower()], iterator)
return csv.DictReader(lower_headers(io.TextIOWrapper(val.open())))


class BeamTransformBuilder():

defbuild(self, pipeline):
practices = (
pipeline
| beam.io.fileio.MatchFiles("data.csv")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle()
| beam.FlatMap(parse_csv)
| beam.Map(lambda x: beam.Row(id="test-id"))
| SqlTransform("""
SELECT
id
FROM PCOLLECTION""")
)
print("should print stuff")
practices | beam.Map(print)


def main():
builder = BeamTransformBuilder()
with beam.Pipeline('DirectRunner') as p:
builder.build(p)


if __name__ == '__main__':
main()
```
 

error message:

    main()    main()  File "./lib/transforms/care_site.py", line 38, in main    
builder.build(p)  File "./lib/transforms/care_site.py", line 29, in build    
FROM PCOLLECTION""")  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pvalue.py",
 line 136, in __or__    return self.pipeline.apply(ptransform, self)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/pipeline.py",
 line 694, in apply    pvalueish_result = self.runner.apply(transform, 
pvalueish, self._options)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py",
 line 185, in apply    return m(transform, input, options)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/runners/runner.py",
 line 215, in apply_PTransform    return transform.expand(input)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/apache_beam/transforms/external.py",
 line 295, in expand    response = service.Expand(request)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py",
 line 946, in __call__    return _end_unary_response_blocking(state, call, 
False, None)  File 
"/Users/<user>/.pyenv/versions/3.7.11/lib/python3.7/site-packages/grpc/_channel.py",
 line 849, in _end_unary_response_blocking    raise 
_InactiveRpcError(state)grpc._channel._InactiveRpcError: <_InactiveRpcError of 
RPC that terminated with: status = StatusCode.UNKNOWN details = "" 
debug_error_string = "\{"created":"@1629934400.533958000","description":"Error 
received from peer 
ipv6:[::1]:50780","file":"src/core/lib/surface/call.cc","file_line":1070,"grpc_message":"","grpc_status":2}"



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to