tvalentyn commented on code in PR #37803:
URL: https://github.com/apache/beam/pull/37803#discussion_r2915305745


##########
sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py:
##########
@@ -117,6 +117,7 @@ def setup_pipeline(p, args):
                           use_deep_copy_optimization=True):
       decoded_input_data = (
           input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource())
+      decoded_input_data |= 'Reshuffle' >> beam.transforms.Reshuffle()  # 
pylint: disable=no-value-for-parameter

Review Comment:
   > i added the Reshuffle after DecodeForAnalyze there to redistribute the 
decoded data across workers before AnalyzeDataset as decode can leave data 
concentrated on a few workers so Reshuffle breaks that and gives Analyze a more 
parallel load as it is the same idea as using Reshuffle after reads i found in 
Datastore/Spanner IO to parallelize work
   
   this is out of scope of the failure. we already have a flag to enable or 
disable shuffle depending on whether  benchmark requires it: 
https://github.com/apache/beam/blob/e6fcdd72ed621a01570775d4d5acd3b7845e2c60/sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py#L123



##########
sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py:
##########
@@ -117,6 +117,7 @@ def setup_pipeline(p, args):
                           use_deep_copy_optimization=True):
       decoded_input_data = (
           input_data | 'DecodeForAnalyze' >> input_tfxio.BeamSource())
+      decoded_input_data |= 'Reshuffle' >> beam.transforms.Reshuffle()  # 
pylint: disable=no-value-for-parameter

Review Comment:
   > i added the Reshuffle after DecodeForAnalyze there to redistribute the 
decoded data across workers before AnalyzeDataset as decode can leave data 
concentrated on a few workers so Reshuffle breaks that and gives Analyze a more 
parallel load as it is the same idea as using Reshuffle after reads i found in 
Datastore/Spanner IO to parallelize work
   
   this is out of scope of the failure you are fixing. we already have a flag 
to enable or disable shuffle depending on whether  benchmark requires it: 
https://github.com/apache/beam/blob/e6fcdd72ed621a01570775d4d5acd3b7845e2c60/sdks/python/apache_beam/testing/benchmarks/cloudml/pipelines/workflow.py#L123



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to