BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1210626027
##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
-
- lines = p | ReadFromText(known_args.input)
-
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- split_lines_result = (
- lines
- | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
- main='words'))
-
- # split_lines_result is an object of type DoOutputsTuple. It supports
- # accessing result in alternative ways.
- words, _, _ = split_lines_result
- short_words =
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
- character_count = split_lines_result.tag_character_count
-
- # pylint: disable=expression-not-assigned
- (
- character_count
- | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
- | beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
- | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
- # pylint: disable=expression-not-assigned
- (
- short_words
- | 'count short words' >> CountWords()
- |
- 'write short words' >> WriteToText(known_args.output + '-short-words'))
-
- # pylint: disable=expression-not-assigned
- (
- words
- | 'count words' >> CountWords()
- | 'write words' >> WriteToText(known_args.output + '-words'))
+ try:
+ lines = p | ReadFromText(known_args.input)
+
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+ split_lines_result = (
+ lines
+ | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+ main='words'))
+
+ # split_lines_result is an object of type DoOutputsTuple. It supports
+ # accessing result in alternative ways.
+ words, _, _ = split_lines_result
+ short_words = split_lines_result[
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+ character_count = split_lines_result.tag_character_count
+
+ # pylint: disable=expression-not-assigned
+ (
+ character_count
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+ | beam.GroupByKey()
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ short_words
+ | 'count short words' >> CountWords()
+ | 'write short words' >>
+ WriteToText(known_args.output + '-short-words'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ words
+ | 'count words' >> CountWords()
+ | 'write words' >> WriteToText(known_args.output + '-words'))
+ except BeamIOError as err:
Review Comment:
I have created an issue (#26774) to update the test environments to include
google.auth credentials, the absence of which is causing these tests to fail.
For the moment, the try-except blocks only catch the specific error that is
caused by that deficiency in our testing environments. With the apitools
implementation, a lack of appropriate credentials didn't raise an exception but
GCS still couldn't be accessed, so these changes don't alter the functioning of
the test, they preserve it by handling the error that GCS (correctly) raises in
a situation when apitools wouldn't. Does that address your concern?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]