tvalentyn commented on code in PR #37803:
URL: https://github.com/apache/beam/pull/37803#discussion_r2915305745
##########
sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py:
##########
@@ -117,6 +117,7 @@ def setup_pipeline(p, args):
use_deep_copy_optimization=True):
decoded_input_data = (
input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource())
+ decoded_input_data |= 'Reshuffle' >> beam.transforms.Reshuffle() #
pylint: disable=no-value-for-parameter
Review Comment:
> i added the Reshuffle after DecodeForAnalyze there to redistribute the
decoded data across workers before AnalyzeDataset as decode can leave data
concentrated on a few workers so Reshuffle breaks that and gives Analyze a more
parallel load as it is the same idea as using Reshuffle after reads i found in
Datastore/Spanner IO to parallelize work
this is out of scope of the failure. we already have a flag to enable or
disable shuffle depending on whether benchmark requires it:
https://github.com/apache/beam/blob/e6fcdd72ed621a01570775d4d5acd3b7845e2c60/sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py#L123
##########
sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py:
##########
@@ -117,6 +117,7 @@ def setup_pipeline(p, args):
use_deep_copy_optimization=True):
decoded_input_data = (
input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource())
+ decoded_input_data |= 'Reshuffle' >> beam.transforms.Reshuffle() #
pylint: disable=no-value-for-parameter
Review Comment:
> i added the Reshuffle after DecodeForAnalyze there to redistribute the
decoded data across workers before AnalyzeDataset as decode can leave data
concentrated on a few workers so Reshuffle breaks that and gives Analyze a more
parallel load as it is the same idea as using Reshuffle after reads i found in
Datastore/Spanner IO to parallelize work
this is out of scope of the failure you are fixing. we already have a flag
to enable or disable shuffle depending on whether benchmark requires it:
https://github.com/apache/beam/blob/e6fcdd72ed621a01570775d4d5acd3b7845e2c60/sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py#L123
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]