gemini-code-assist[bot] commented on code in PR #38484:
URL: https://github.com/apache/beam/pull/38484#discussion_r3250531133
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -680,17 +680,26 @@ def __init__(
self.binary = binary
def expand(self, pcoll):
- if 'file_naming' in self.kwargs:
+ kwargs = dict(self.kwargs)
+ if 'file_naming' in kwargs:
dir, name = self.path, ''
else:
dir, name = io.filesystems.FileSystems.split(self.path)
+ num_shards = kwargs.pop('num_shards', None)
+ max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+ write_to_files_kwargs = {}
+ if num_shards is not None:
+ write_to_files_kwargs['shards'] = num_shards
+ write_to_files_kwargs['max_writers_per_bundle'] = (
+ max_writers_per_bundle if max_writers_per_bundle is not None else 0)
+ elif max_writers_per_bundle is not None:
+ write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
return pcoll | fileio.WriteToFiles(
path=dir,
- shards=self.kwargs.pop('num_shards', None),
- file_naming=self.kwargs.pop(
- 'file_naming', fileio.default_file_naming(name)),
+ file_naming=kwargs.pop('file_naming',
fileio.default_file_naming(name)),
sink=lambda _: _WriteToPandasFileSink(
- self.writer, self.args, self.kwargs, self.incremental,
self.binary))
+ self.writer, self.args, kwargs, self.incremental, self.binary),
Review Comment:

For better readability and to avoid mutating the `kwargs` dictionary inside
the transform call, consider extracting `file_naming` beforehand. This makes it
explicit that the `kwargs` passed to the `sink` factory has been fully prepared
and does not contain Beam-specific parameters like `file_naming`. It also
avoids relying on the left-to-right evaluation order of function arguments in
Python.
```python
file_naming = kwargs.pop('file_naming', fileio.default_file_naming(name))
return pcoll | fileio.WriteToFiles(
path=dir,
file_naming=file_naming,
sink=lambda _: _WriteToPandasFileSink(
self.writer, self.args, kwargs, self.incremental, self.binary),
```
##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -680,17 +680,26 @@ def __init__(
self.binary = binary
def expand(self, pcoll):
- if 'file_naming' in self.kwargs:
+ kwargs = dict(self.kwargs)
+ if 'file_naming' in kwargs:
dir, name = self.path, ''
else:
dir, name = io.filesystems.FileSystems.split(self.path)
+ num_shards = kwargs.pop('num_shards', None)
+ max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+ write_to_files_kwargs = {}
+ if num_shards is not None:
+ write_to_files_kwargs['shards'] = num_shards
+ write_to_files_kwargs['max_writers_per_bundle'] = (
+ max_writers_per_bundle if max_writers_per_bundle is not None else 0)
Review Comment:

To strictly guarantee that `num_shards` is honored, `max_writers_per_bundle`
should be forced to `0` whenever `num_shards` is specified. If a user provides
a non-zero value for `max_writers_per_bundle` along with `num_shards`, Beam
might still use the 'unsharded' path if the number of writers in a bundle is
low, which would result in more than `num_shards` files being produced. Since
`num_shards` implies a global sharding requirement, forcing the shuffle by
setting `max_writers_per_bundle=0` is the most robust approach to ensure the
parameter is honored.
```python
write_to_files_kwargs['max_writers_per_bundle'] = 0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]