Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210677588


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = 
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   ok, I am going to verify that pipeline still run on full graph for tests 
listed #26774 
   
   In any case, a change need to modify word count example is concerning in 
terms of breaking change. We'd better find a way that not need to change 
unrelated test so substantially



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to