Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212282164
##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
-
- lines = p | ReadFromText(known_args.input)
-
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- split_lines_result = (
- lines
- | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
- main='words'))
-
- # split_lines_result is an object of type DoOutputsTuple. It supports
- # accessing result in alternative ways.
- words, _, _ = split_lines_result
- short_words =
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
- character_count = split_lines_result.tag_character_count
-
- # pylint: disable=expression-not-assigned
- (
- character_count
- | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
- | beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
- | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
- # pylint: disable=expression-not-assigned
- (
- short_words
- | 'count short words' >> CountWords()
- |
- 'write short words' >> WriteToText(known_args.output + '-short-words'))
-
- # pylint: disable=expression-not-assigned
- (
- words
- | 'count words' >> CountWords()
- | 'write words' >> WriteToText(known_args.output + '-words'))
+ try:
+ lines = p | ReadFromText(known_args.input)
+
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+ split_lines_result = (
+ lines
+ | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+ main='words'))
+
+ # split_lines_result is an object of type DoOutputsTuple. It supports
+ # accessing result in alternative ways.
+ words, _, _ = split_lines_result
+ short_words = split_lines_result[
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+ character_count = split_lines_result.tag_character_count
+
+ # pylint: disable=expression-not-assigned
+ (
+ character_count
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+ | beam.GroupByKey()
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ short_words
+ | 'count short words' >> CountWords()
+ | 'write short words' >>
+ WriteToText(known_args.output + '-short-words'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ words
+ | 'count words' >> CountWords()
+ | 'write words' >> WriteToText(known_args.output + '-words'))
+ except BeamIOError as err:
Review Comment:
I see, the error happens when there is no credential provided. log:
```
$ python -m apache_beam.examples.wordcount_minimal --output
/Users/.../Desktop/outputoutput.txt
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60
seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 1 of 3. Reason: timed out
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 2 of 3. Reason: timed out
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
WARNING:google.auth._default:Authentication failed using Compute Engine
authentication due to unavailable metadata server.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to
use: Your default credentials were not found. To set up Application Default
Credentials, see
https://cloud.google.com/docs/authentication/external/set-up-adc for more
information.
Connecting anonymously.
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 1 of 3. Reason: [Errno 64] Host is down
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 2 of 3. Reason: [Errno 64] Host is down
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
WARNING:google.auth._default:Authentication failed using Compute Engine
authentication due to unavailable metadata server.
Traceback (most recent call last):
File
"/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line
148, in <module>
main()
File
"/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line
123, in main
lines = p | ReadFromText(known_args.input)
File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 781, in
__init__
self._source = self._source_class(
File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 140, in
__init__
super().__init__(
File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line
127, in __init__
self._validate()
File "/Users/.../beam/sdks/python/apache_beam/options/value_provider.py",
line 193, in _f
return fnc(self, *args, **kwargs)
File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line
188, in _validate
match_result = FileSystems.match([pattern], limits=[1])[0]
File "/Users/.../beam/sdks/python/apache_beam/io/filesystems.py", line
204, in match
return filesystem.match(patterns, limits)
File "/Users/.../beam/sdks/python/apache_beam/io/filesystem.py", line 804,
in match
raise BeamIOError("Match operation failed", exceptions)
```
No gcloud credential caused pipeline expansion fail at "lines = p |
ReadFromText(known_args.input)" this is the very beginning of the test.
https://github.com/apache/beam/blob/7179cce5624c304d5f8f7ba69dc73e96b9823d31/sdks/python/apache_beam/examples/wordcount_minimal.py#LL125C16-L125C16
pipeline is not initiatiated successfully. The pipeline expansion is
incomplete and lines after ReadFromText and before catch is never executed.
This is a breaking change - read from public bucket anonymously now fails.
Note that wordcount is the hello world example and first time user is guided
to run it. We provide the public bucket and user do not need google cloud
account to run these examples. We should fix the test.
If cloud-storage-client does not support anonymous connection (would be a
surprise) we would need to make this migration opt-in instead of replacement.
on master, the test runs successfully and a file gets written. log:
```
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60
seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 1 of 3. Reason: timed out
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 2 of 3. Reason: timed out
WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
WARNING:google.auth._default:Authentication failed using Compute Engine
authentication due to unavailable metadata server.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to
use: Your default credentials were not found. To set up Application Default
Credentials, see
https://cloud.google.com/docs/authentication/external/set-up-adc for more
information.
Connecting anonymously.
INFO:root:Default Python SDK image for environment is
apache/beam_python3.8_sdk:2.49.0.dev
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
<function annotate_downstream_side_inputs at 0x11255ee50> ====================
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]