[
https://issues.apache.org/jira/browse/BEAM-11671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
hiryu updated BEAM-11671:
-------------------------
Description:
When using Spark PortableRunner, the job server takes care of translating the
Beam pipeline into a Spark job and submitting it to a Spark cluster for
execution.
However, simple jobs (e.g. Wordcount) are executed with low parallelism on an
actual Spark cluster: this is due to the fact that the stages resulting from
the job server translation are split in a very low number of tasks (this is
described in detail here:
[https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python]).
Investigations have shown that the job server defines explicitly the number of
partitions for translated Spark stages based on calls to
{{defaultParallelism}}, which is however _not_ a robust method for inferring
the number of executors and for partitioning Spark jobs (again, see the
accepted answer to the above SO issue for the detailed explanation:
[https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752|https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752).]).
As of now, this issue prevents the scalability of the job server in a
production environment without manually modifying the job server source and
recompiling to get around the {{defaultParallelism}} issue. Possible suggested
solutions (non-exclusive):
* change the job server logic to infer the number of available executors and
the number of partitions/tasks in the translated stages in a more robust way;
* allow the user to configure, via pipeline options, the default parallelism
to be used by the job server for translating jobs (this is what's done by the
Flink portable runner).
was:
When using Spark PortableRunner, the job server takes care of translating the
Beam pipeline into a Spark job and submitting it to a Spark cluster for
execution.
However, simple jobs (e.g. Wordcount) are executed with low parallelism on an
actual Spark cluster: this is due to the fact that the stages resulting from
the job server translation are split in a very low number of tasks (this is
described in detail here:
https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python).
Investigations have shown that the job server defines explicitly the number of
partitions for translated Spark stages based on calls to
{{defaultParallelism}}, which is however _not_ a robust method for inferring
the number of executors and for partitioning Spark jobs (again, see the
accepted answer to the above SO issue for the detailed explanation:
[https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752|https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752).]).
As of now, this issue prevents the scalability of the job server in a
production environment without manually modifying the job server source and
recompiling. Possible suggested solutions:
* change the job server logic to infer the number of available executors and
the number of partitions/tasks in the translated stages in a more robust way;
* allow the user to configure, via pipeline options, the default parallelism
to be used by the job server for translating jobs (this is what's done by the
Flink portable runner).
> Spark PortableRunner (Python SDK) low parallelism
> --------------------------------------------------
>
> Key: BEAM-11671
> URL: https://issues.apache.org/jira/browse/BEAM-11671
> Project: Beam
> Issue Type: Improvement
> Components: jobserver, runner-spark
> Affects Versions: 2.26.0
> Reporter: hiryu
> Priority: P2
>
> When using Spark PortableRunner, the job server takes care of translating the
> Beam pipeline into a Spark job and submitting it to a Spark cluster for
> execution.
> However, simple jobs (e.g. Wordcount) are executed with low parallelism on an
> actual Spark cluster: this is due to the fact that the stages resulting from
> the job server translation are split in a very low number of tasks (this is
> described in detail here:
> [https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python]).
> Investigations have shown that the job server defines explicitly the number
> of partitions for translated Spark stages based on calls to
> {{defaultParallelism}}, which is however _not_ a robust method for inferring
> the number of executors and for partitioning Spark jobs (again, see the
> accepted answer to the above SO issue for the detailed explanation:
> [https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752|https://stackoverflow.com/questions/64878908/low-parallelism-when-running-apache-beam-wordcount-pipeline-on-spark-with-python/65616752#65616752).]).
> As of now, this issue prevents the scalability of the job server in a
> production environment without manually modifying the job server source and
> recompiling to get around the {{defaultParallelism}} issue. Possible
> suggested solutions (non-exclusive):
> * change the job server logic to infer the number of available executors and
> the number of partitions/tasks in the translated stages in a more robust way;
> * allow the user to configure, via pipeline options, the default parallelism
> to be used by the job server for translating jobs (this is what's done by the
> Flink portable runner).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)