BjornPrime commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1212279790
##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
-
- lines = p | ReadFromText(known_args.input)
-
- # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
- split_lines_result = (
- lines
- | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
- SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
- SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
- main='words'))
-
- # split_lines_result is an object of type DoOutputsTuple. It supports
- # accessing result in alternative ways.
- words, _, _ = split_lines_result
- short_words =
split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
- character_count = split_lines_result.tag_character_count
-
- # pylint: disable=expression-not-assigned
- (
- character_count
- | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
- | beam.GroupByKey()
- | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
- | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
- # pylint: disable=expression-not-assigned
- (
- short_words
- | 'count short words' >> CountWords()
- |
- 'write short words' >> WriteToText(known_args.output + '-short-words'))
-
- # pylint: disable=expression-not-assigned
- (
- words
- | 'count words' >> CountWords()
- | 'write words' >> WriteToText(known_args.output + '-words'))
+ try:
+ lines = p | ReadFromText(known_args.input)
+
+ # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+ split_lines_result = (
+ lines
+ | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+ SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+ main='words'))
+
+ # split_lines_result is an object of type DoOutputsTuple. It supports
+ # accessing result in alternative ways.
+ words, _, _ = split_lines_result
+ short_words = split_lines_result[
+ SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+ character_count = split_lines_result.tag_character_count
+
+ # pylint: disable=expression-not-assigned
+ (
+ character_count
+ | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+ | beam.GroupByKey()
+ | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+ | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ short_words
+ | 'count short words' >> CountWords()
+ | 'write short words' >>
+ WriteToText(known_args.output + '-short-words'))
+
+ # pylint: disable=expression-not-assigned
+ (
+ words
+ | 'count words' >> CountWords()
+ | 'write words' >> WriteToText(known_args.output + '-words'))
+ except BeamIOError as err:
Review Comment:
Did you test if the apitools implementation actually makes writes to GCS?
That's what I'm concerned with because my understanding is that neither
implementation actually has credentials available to log into GCS (not even
defaults) on the remote runs. The only difference is that apitools doesn't
complain about that and GCS client does (which is probably better, even if
inconvenient in this exact scenario).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]