Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210677588
##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
-
- lines = p | ReadFromText(known_args.input)
-
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- split_lines_result = (
- lines
- | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
- main='words'))
-
- # split_lines_result is an object of type DoOutputsTuple. It supports
- # accessing result in alternative ways.
- words, _, _ = split_lines_result
- short_words =
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
- character_count = split_lines_result.tag_character_count
-
- # pylint: disable=expression-not-assigned
- (
- character_count
- | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
- | beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
- | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
- # pylint: disable=expression-not-assigned
- (
- short_words
- | 'count short words' >> CountWords()
- |
- 'write short words' >> WriteToText(known_args.output + '-short-words'))
-
- # pylint: disable=expression-not-assigned
- (
- words
- | 'count words' >> CountWords()
- | 'write words' >> WriteToText(known_args.output + '-words'))
+ try:
+ lines = p | ReadFromText(known_args.input)
+
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+ split_lines_result = (
+ lines
+ | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+ main='words'))
+
+ # split_lines_result is an object of type DoOutputsTuple. It supports
+ # accessing result in alternative ways.
+ words, _, _ = split_lines_result
+ short_words = split_lines_result[
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+ character_count = split_lines_result.tag_character_count
+
+ # pylint: disable=expression-not-assigned
+ (
+ character_count
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+ | beam.GroupByKey()
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ short_words
+ | 'count short words' >> CountWords()
+ | 'write short words' >>
+ WriteToText(known_args.output + '-short-words'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ words
+ | 'count words' >> CountWords()
+ | 'write words' >> WriteToText(known_args.output + '-words'))
+ except BeamIOError as err:
Review Comment:
ok, I am going to verify that pipeline still run on full graph for tests
listed #26774
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]