[ 
https://issues.apache.org/jira/browse/BEAM-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenhai Pan updated BEAM-9029:
-----------------------------
    Description: 
Hi :)

There seem to be 2 bugs in the S3 filesystem support.

I tried to use S3 storage for a simple wordcount demo with DirectRunner.

The demo script:
{code:java}
def main():
options = PipelineOptions().view_as(StandardOptions)
 options.runner = 'DirectRunner'
pipeline = beam.Pipeline(options = options)
(
 pipeline
 | ReadFromText("s3://mx-machine-learning/panwenhai/beam_test/test_data")
 | "extract_words" >> beam.FlatMap(lambda x: re.findall(r" [A-Za-z\']+", x))
 | beam.combiners.Count.PerElement()
 | beam.MapTuple(lambda word, count: "%s: %s" % (word, count))
 | WriteToText("s3://mx-machine-learning/panwenhai/beam_test/output")
 )
result = pipeline.run()
 result.wait_until_finish()
return
{code}
 

Error message 1:
{noformat}
apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions 
{'s3://mx-machine-learning/panwenhai/beam_test/output-*-of-00001': 
BeamIOError("List operation failed with exceptions 
{'s3://mx-machine-learning/panwenhai/beam_test/output-': S3ClientError('Tried 
to list nonexistent S3 path: 
s3://mx-machine-learning/panwenhai/beam_test/output-', 404)}")} [while running 
'WriteToText/Write/WriteImpl/PreFinalize'] with exceptions None{noformat}
 

After digging into the code, it seems the Boto3 client's list function will 
raise an exception when trying to list a nonexistent S3 path 
(beam/sdks/pythonapache_beam/io/aws/clients/s3/boto3_client.py line 111). And 
the S3IO class does not handle this exception in list_prefix function 
(beam/sdks/python/apache_beam/io/aws/s3io.py line 121).

When the runner tries to list and delete the existing output file, if there is 
no existing output file, it will try to list a nonexistent S3 path and will 
trigger the exception.

This should not be an issue here. I think we can ignore this exception safely 
in the S3IO list_prefix function.

Error Message 2:
{noformat}
File 
"/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
 line 272, in delete
exceptions = {path: error for (path, error) in results
File 
"/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
 line 272, in <dictcomp>
exceptions = {path: error for (path, error) in results
ValueError: too many values to unpack (expected 2) [while running 
'WriteToText/Write/WriteImpl/FinalizeWrite']{noformat}
 

When the runner tries to delete the temporary output directory, it will trigger 
this exception. This exception is caused by parsing (path, error) directly from 
the "results" which is a dict 
(beam/sdks/python/apache_beam/io/aws/s3filesystem.py line 272). I think we 
should use results.items() here.

I have submitted a patch for these 2 bugs: 
https://github.com/apache/beam/pull/10459

 

Thank you.

  was:
Hi :)

There seem to be 2 bugs in the S3 filesystem support.
 
I tried to use S3 storage for a simple wordcount demo with DirectRunner.

The demo script:
{code:java}
def main():
options = PipelineOptions().view_as(StandardOptions)
 options.runner = 'DirectRunner'
pipeline = beam.Pipeline(options = options)
(
 pipeline
 | ReadFromText("s3://mx-machine-learning/panwenhai/beam_test/test_data")
 | "extract_words" >> beam.FlatMap(lambda x: re.findall(r" [A-Za-z\']+", x))
 | beam.combiners.Count.PerElement()
 | beam.MapTuple(lambda word, count: "%s: %s" % (word, count))
 | WriteToText("s3://mx-machine-learning/panwenhai/beam_test/output")
 )
result = pipeline.run()
 result.wait_until_finish()
return
{code}
 


Error message 1:
{noformat}
apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions 
{'s3://mx-machine-learning/panwenhai/beam_test/output-*-of-00001': 
BeamIOError("List operation failed with exceptions 
{'s3://mx-machine-learning/panwenhai/beam_test/output-': S3ClientError('Tried 
to list nonexistent S3 path: 
s3://mx-machine-learning/panwenhai/beam_test/output-', 404)}")} [while running 
'WriteToText/Write/WriteImpl/PreFinalize'] with exceptions None{noformat}
 

After digging into the code, it seems the Boto3 client's list function will 
raise an exception when trying to list a nonexistent S3 path 
(beam/sdks/pythonapache_beam/io/aws/clients/s3/boto3_client.py line 111). And 
the S3IO class does not handle this exception in list_prefix function 
(beam/sdks/python/apache_beam/io/aws/s3io.py line 121).

When the runner tries to list and delete the existing output file, if there is 
no existing output file, it will try to list a nonexistent S3 path and will 
trigger the exception.

This should not be an issue here. I think we can ignore this exception safely 
in the S3IO list_prefix function.


Error Message 2:
{noformat}
File 
"/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
 line 272, in delete
exceptions = {path: error for (path, error) in results
File 
"/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
 line 272, in <dictcomp>
exceptions = {path: error for (path, error) in results
ValueError: too many values to unpack (expected 2) [while running 
'WriteToText/Write/WriteImpl/FinalizeWrite']{noformat}
 

When the runner tries to delete the temporary output directory, it will trigger 
this exception. This exception is caused by parsing (path, error) directly from 
the "results" which is a dict 
(beam/sdks/python/apache_beam/io/aws/s3filesystem.py line 272). I think we 
should use results.items() here.

I have submitted a patch for these 2 bugs. Thank you.


> Two bugs in Python SDK S3 filesystem support
> --------------------------------------------
>
>                 Key: BEAM-9029
>                 URL: https://issues.apache.org/jira/browse/BEAM-9029
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Wenhai Pan
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 24h
>          Time Spent: 10m
>  Remaining Estimate: 23h 50m
>
> Hi :)
> There seem to be 2 bugs in the S3 filesystem support.
> I tried to use S3 storage for a simple wordcount demo with DirectRunner.
> The demo script:
> {code:java}
> def main():
> options = PipelineOptions().view_as(StandardOptions)
>  options.runner = 'DirectRunner'
> pipeline = beam.Pipeline(options = options)
> (
>  pipeline
>  | ReadFromText("s3://mx-machine-learning/panwenhai/beam_test/test_data")
>  | "extract_words" >> beam.FlatMap(lambda x: re.findall(r" [A-Za-z\']+", x))
>  | beam.combiners.Count.PerElement()
>  | beam.MapTuple(lambda word, count: "%s: %s" % (word, count))
>  | WriteToText("s3://mx-machine-learning/panwenhai/beam_test/output")
>  )
> result = pipeline.run()
>  result.wait_until_finish()
> return
> {code}
>  
> Error message 1:
> {noformat}
> apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions 
> {'s3://mx-machine-learning/panwenhai/beam_test/output-*-of-00001': 
> BeamIOError("List operation failed with exceptions 
> {'s3://mx-machine-learning/panwenhai/beam_test/output-': S3ClientError('Tried 
> to list nonexistent S3 path: 
> s3://mx-machine-learning/panwenhai/beam_test/output-', 404)}")} [while 
> running 'WriteToText/Write/WriteImpl/PreFinalize'] with exceptions 
> None{noformat}
>  
> After digging into the code, it seems the Boto3 client's list function will 
> raise an exception when trying to list a nonexistent S3 path 
> (beam/sdks/pythonapache_beam/io/aws/clients/s3/boto3_client.py line 111). And 
> the S3IO class does not handle this exception in list_prefix function 
> (beam/sdks/python/apache_beam/io/aws/s3io.py line 121).
> When the runner tries to list and delete the existing output file, if there 
> is no existing output file, it will try to list a nonexistent S3 path and 
> will trigger the exception.
> This should not be an issue here. I think we can ignore this exception safely 
> in the S3IO list_prefix function.
> Error Message 2:
> {noformat}
> File 
> "/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
>  line 272, in delete
> exceptions = {path: error for (path, error) in results
> File 
> "/Users/wenhai.pan/venvs/tfx/lib/python3.7/site-packages/apache_beam-2.19.0.dev0-py3.7.egg/apache_beam/io/aws/s3filesystem.py",
>  line 272, in <dictcomp>
> exceptions = {path: error for (path, error) in results
> ValueError: too many values to unpack (expected 2) [while running 
> 'WriteToText/Write/WriteImpl/FinalizeWrite']{noformat}
>  
> When the runner tries to delete the temporary output directory, it will 
> trigger this exception. This exception is caused by parsing (path, error) 
> directly from the "results" which is a dict 
> (beam/sdks/python/apache_beam/io/aws/s3filesystem.py line 272). I think we 
> should use results.items() here.
> I have submitted a patch for these 2 bugs: 
> https://github.com/apache/beam/pull/10459
>  
> Thank you.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to