Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212282164


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = 
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   I see, the error happens when there is no credential provided. log:
   
   ```
   $ python -m apache_beam.examples.wordcount_minimal --output 
/Users/.../Desktop/outputoutput.txt
   INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 
seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine 
authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to 
use: Your default credentials were not found. To set up Application Default 
Credentials, see 
https://cloud.google.com/docs/authentication/external/set-up-adc for more 
information.
   Connecting anonymously.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 1 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 2 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine 
authentication due to unavailable metadata server.
   Traceback (most recent call last):
   File 
"/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 
148, in <module>
       main()
     File 
"/Users/.../beam/sdks/python/apache_beam/examples/wordcount_minimal.py", line 
123, in main
       lines = p | ReadFromText(known_args.input)
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 781, in 
__init__
       self._source = self._source_class(
     File "/Users/.../beam/sdks/python/apache_beam/io/textio.py", line 140, in 
__init__
       super().__init__(
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 
127, in __init__
       self._validate()
     File "/Users/.../beam/sdks/python/apache_beam/options/value_provider.py", 
line 193, in _f
       return fnc(self, *args, **kwargs)
     File "/Users/.../beam/sdks/python/apache_beam/io/filebasedsource.py", line 
188, in _validate
       match_result = FileSystems.match([pattern], limits=[1])[0]
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystems.py", line 
204, in match
       return filesystem.match(patterns, limits)
     File "/Users/.../beam/sdks/python/apache_beam/io/filesystem.py", line 804, 
in match
       raise BeamIOError("Match operation failed", exceptions)
   ```
   
   No gcloud credential caused pipeline expansion fail at "lines = p | 
ReadFromText(known_args.input)" this is the very beginning of the test. 
   
   
https://github.com/apache/beam/blob/7179cce5624c304d5f8f7ba69dc73e96b9823d31/sdks/python/apache_beam/examples/wordcount_minimal.py#LL125-L125
   
   pipeline is not initiatiated successfully. The pipeline expansion is 
incomplete and lines after ReadFromText and before catch is never executed. 
This is a breaking change - read from public bucket anonymously now fails.
   
   Note that wordcount is the hello world example and first time user is guided 
to run it. We provide the public bucket and user do not need google cloud 
account to run these examples. We should fix the code. Here it essentially 
skipped the test.
   
   If cloud-storage-client does not support anonymous connection (would be a 
surprise) we would need to make this migration opt-in instead of replacement.
   
   on master, the test runs successfully and a file gets written. log:
   ```
   INFO:root:Missing pipeline option (runner). Executing pipeline using the 
default runner: DirectRunner.
   INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 
seconds.
   INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 1 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 2 of 3. Reason: timed out
   WARNING:google.auth.compute_engine._metadata:Compute Engine Metadata server 
unavailable on attempt 3 of 3. Reason: [Errno 64] Host is down
   WARNING:google.auth._default:Authentication failed using Compute Engine 
authentication due to unavailable metadata server.
   WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to 
use: Your default credentials were not found. To set up Application Default 
Credentials, see 
https://cloud.google.com/docs/authentication/external/set-up-adc for more 
information.
   Connecting anonymously.
   INFO:root:Default Python SDK image for environment is 
apache/beam_python3.8_sdk:2.49.0.dev
   
INFO:apache_beam.runners.portability.fn_api_runner.translations:====================
 <function annotate_downstream_side_inputs at 0x11255ee50> ====================
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to