[ 
https://issues.apache.org/jira/browse/BEAM-10596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gecko655 updated BEAM-10596:
----------------------------
    Description: 
h3. Description:

`fileio.WriteToFiles` ignores the option `shards=3` given to its constructor 
unless I set `max_writers_per_bundle` to `0`.
It reproduces with InteractiveRunner or DirectRunner, but does not reproduce 
with DataflowRunner.

h3. Example:

Suppose I have the following pipeline (with interactive runner):
{code:python}
import apache_beam as beam
import apache_beam.io.fileio as fileio
import apache_beam.runners.interactive.interactive_beam as ib

user_ids = list(map(lambda x: 'user_id' + str(x), range(0, 10000)))
with beam.Pipeline(InteractiveRunner()) as pipeline:
    user_list =  pipeline | 'create pcollection' >> beam.Create(user_ids)
    write_sharded_csv = user_list | 'write sharded csv files' >> 
fileio.WriteToFiles(
            path='/tmp/data/',
            shards=3,
            file_naming=fileio.default_file_naming(prefix='userlist', 
suffix='.csv'),
            # max_writers_per_bundle=0,
        )
    ib.show(write_sharded_csv)
{code}
This pipeline is implemented to...
 - Creates PCollection of strings: 'user_id1', 'user_id2', ... 'user_id10000'
 - Writes the user ids to 3 local files with sharding.

The code does not work as intended. It writes whole user ids to only 1 file.
The code DOES work as intended after I added the `max_writers_per_bundle=0` 
argument to the `WriteToFiles` constructor.
The code also works if I use GCP's DataflowRunner instead of InteractiveRunner.

Is the behavior intentional or bug?
I couldn't understand why `max_writers_per_bundle` is related to the sharding 
behavior. I couldn't find any documentation about this.


  was:
h3. Description:

`fileio.WriteToFiles` ignores the option `shards=3` given to its constructor 
unless I set `max_writers_per_bundle` to `0`
h3. Example:

Suppose I have the following pipeline (with interactive runner):
{code:python}
import apache_beam as beam
import apache_beam.io.fileio as fileio
import apache_beam.runners.interactive.interactive_beam as ib

user_ids = list(map(lambda x: 'user_id' + str(x), range(0, 10000)))
with beam.Pipeline(InteractiveRunner()) as pipeline:
    user_list =  pipeline | 'create pcollection' >> beam.Create(user_ids)
    write_sharded_csv = user_list | 'write sharded csv files' >> 
fileio.WriteToFiles(
            path='/tmp/data/',
            shards=3,
            file_naming=fileio.default_file_naming(prefix='userlist', 
suffix='.csv'),
            # max_writers_per_bundle=0,
        )
    ib.show(write_sharded_csv)
{code}
This pipeline is implemented to...
 - Creates PCollection of strings: 'user_id1', 'user_id2', ... 'user_id10000'
 - Writes the user ids to 3 local files with sharding.

The code does not work as intended. It writes the user ids to only 1 file.
 The code DOES work as intended after I added the `max_writers_per_bundle=0` 
argument to the `WriteToFiles` constructor.

Is the behavior intentional or bug?
I couldn't understand why `max_writers_per_bundle` is related to the sharding 
behavior. I couldn't find any documentation about this.


    Environment: 
- Python 3.7.6
- `apache-beam==2.23.0`
- Reproducing is done in GCP's jupyter notebook environment. 
https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development

  was:
- Python 3.7.6
- `apache-beam==2.24.0.dev0`
- Reproducing is done in GCP's jupyter notebook environment. 
https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development

        Summary: Sharding with fileio.WriteToFiles need to set 
`max_writers_per_bundle=0` when using InteractiveRunner or DirectRunner?  (was: 
Sharding with fileio.WriteToFiles need to set `max_writers_per_bundle=0`?)

> Sharding with fileio.WriteToFiles need to set `max_writers_per_bundle=0` when 
> using InteractiveRunner or DirectRunner?
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-10596
>                 URL: https://issues.apache.org/jira/browse/BEAM-10596
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.24.0
>         Environment: - Python 3.7.6
> - `apache-beam==2.23.0`
> - Reproducing is done in GCP's jupyter notebook environment. 
> https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development
>            Reporter: gecko655
>            Priority: P2
>         Attachments: スクリーンショット 2020-07-29 16.01.32.png
>
>
> h3. Description:
> `fileio.WriteToFiles` ignores the option `shards=3` given to its constructor 
> unless I set `max_writers_per_bundle` to `0`.
> It reproduces with InteractiveRunner or DirectRunner, but does not reproduce 
> with DataflowRunner.
> h3. Example:
> Suppose I have the following pipeline (with interactive runner):
> {code:python}
> import apache_beam as beam
> import apache_beam.io.fileio as fileio
> import apache_beam.runners.interactive.interactive_beam as ib
> user_ids = list(map(lambda x: 'user_id' + str(x), range(0, 10000)))
> with beam.Pipeline(InteractiveRunner()) as pipeline:
>     user_list =  pipeline | 'create pcollection' >> beam.Create(user_ids)
>     write_sharded_csv = user_list | 'write sharded csv files' >> 
> fileio.WriteToFiles(
>             path='/tmp/data/',
>             shards=3,
>             file_naming=fileio.default_file_naming(prefix='userlist', 
> suffix='.csv'),
>             # max_writers_per_bundle=0,
>         )
>     ib.show(write_sharded_csv)
> {code}
> This pipeline is implemented to...
>  - Creates PCollection of strings: 'user_id1', 'user_id2', ... 'user_id10000'
>  - Writes the user ids to 3 local files with sharding.
> The code does not work as intended. It writes whole user ids to only 1 file.
> The code DOES work as intended after I added the `max_writers_per_bundle=0` 
> argument to the `WriteToFiles` constructor.
> The code also works if I use GCP's DataflowRunner instead of 
> InteractiveRunner.
> Is the behavior intentional or bug?
> I couldn't understand why `max_writers_per_bundle` is related to the sharding 
> behavior. I couldn't find any documentation about this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to