[
https://issues.apache.org/jira/browse/BEAM-12915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487625#comment-17487625
]
Rogan Morrow commented on BEAM-12915:
-------------------------------------
[~eddiewang] It did work for me with `environment_type=EXTERNAL`, but there is
a caveat that may be why you are still not seeing parallelism. Setting
`–experiments=pre_optimize=all` will only cause Beam to shuffle the list of
file paths, not the rows read from the files. So if the number of files you are
reading from is less than the number of slots in Flink, you will not see full
parallelism. And if some of your files contain more rows than others, your
slots will be imbalanced.
> No parallelism when using SDFBoundedSourceReader with Flink
> -----------------------------------------------------------
>
> Key: BEAM-12915
> URL: https://issues.apache.org/jira/browse/BEAM-12915
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.32.0
> Reporter: Rogan Morrow
> Priority: P2
>
> Background: I am using TFX pipelines with Flink as the runner for Beam (flink
> session cluster using
> [flink-on-k8s-operator|https://github.com/GoogleCloudPlatform/flink-on-k8s-operator]).
> The Flink cluster has 2 taskmanagers with 16 cores each, and parallelism is
> set to 32. TFX components call {{beam.io.ReadFromTFRecord}} to load data,
> passing in a glob file pattern. I have a dataset of TFRecords split across
> 160 files. When I try to run the component, processing for all 160 files ends
> up in a single subtask in Flink, i.e. the parallelism is effectively 1. See
> below images:
> !https://i.imgur.com/ppba0AL.png!
> !https://i.imgur.com/rSTFATn.png!
>
> I have tried all manner of Beam/Flink options and different versions of
> Beam/Flink but the behaviour remains the same.
> Furthermore, the behaviour affects anything that uses
> {{apache_beam.io.iobase.SDFBoundedSourceReader}}, e.g.
> {{apache_beam.io.parquetio.ReadFromParquet}} also has the same issue. Either
> I'm missing some obscure setting in my configuration, or this is a bug with
> the Flink runner.
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)