[
https://issues.apache.org/jira/browse/BEAM-10596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
gecko655 updated BEAM-10596:
----------------------------
Comment: was deleted
(was: I could not reproduce the issue with apache-beam==2.23.0 and
DataflowRunner.
`pip freeze` shows that apache-beam==2.24.0.dev0 is installed, but 2.24.0 is
not officially released, I think.
h4. pip freeze output (apache-beam==2.24.0.dev0)
!スクリーンショット 2020-07-29 16.01.32.png!
> # Editable install with no version control (apache-beam==2.24.0.dev0)
> -e /root/apache-beam-custom/packages/beam/sdks/python
h4. official releases (2.23.0-RC2)
https://github.com/apache/beam/tags
So I think this might not an issue of entire beam but an issue of either
jupyter's InteractiveRunner or beam-2.24.0.dev0(GCP's customized version???).)
> Sharding with fileio.WriteToFiles need to set `max_writers_per_bundle=0` when
> using InteractiveRunner or DirectRunner?
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-10596
> URL: https://issues.apache.org/jira/browse/BEAM-10596
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.24.0
> Environment: - Python 3.7.6
> - `apache-beam==2.23.0`
> - Reproducing is done in GCP's jupyter notebook environment.
> https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
> Reporter: gecko655
> Priority: P2
>
> h3. Description:
> `fileio.WriteToFiles` ignores the option `shards=3` given to its constructor
> unless I set `max_writers_per_bundle` to `0`.
> It reproduces with InteractiveRunner or DirectRunner, but does not reproduce
> with DataflowRunner.
> h3. Example:
> Suppose I have the following pipeline (with interactive runner):
> {code:python}
> import apache_beam as beam
> import apache_beam.io.fileio as fileio
> import apache_beam.runners.interactive.interactive_beam as ib
> user_ids = list(map(lambda x: 'user_id' + str(x), range(0, 10000)))
> with beam.Pipeline(InteractiveRunner()) as pipeline:
> user_list = pipeline | 'create pcollection' >> beam.Create(user_ids)
> write_sharded_csv = user_list | 'write sharded csv files' >>
> fileio.WriteToFiles(
> path='/tmp/data/',
> shards=3,
> file_naming=fileio.default_file_naming(prefix='userlist',
> suffix='.csv'),
> # max_writers_per_bundle=0,
> )
> ib.show(write_sharded_csv)
> {code}
> This pipeline is implemented to...
> - Creates PCollection of strings: 'user_id1', 'user_id2', ... 'user_id10000'
> - Writes the user ids to 3 local files with sharding.
> The code does not work as intended. It writes whole user ids to only 1 file.
> The code DOES work as intended after I added the `max_writers_per_bundle=0`
> argument to the `WriteToFiles` constructor.
> The code also works if I use GCP's DataflowRunner instead of
> InteractiveRunner.
> Is the behavior intentional or bug?
> I couldn't understand why `max_writers_per_bundle` is related to the sharding
> behavior. I couldn't find any documentation about this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)