gemini-code-assist[bot] commented on code in PR #38484:
URL: https://github.com/apache/beam/pull/38484#discussion_r3250531133


##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -680,17 +680,26 @@ def __init__(
     self.binary = binary
 
   def expand(self, pcoll):
-    if 'file_naming' in self.kwargs:
+    kwargs = dict(self.kwargs)
+    if 'file_naming' in kwargs:
       dir, name = self.path, ''
     else:
       dir, name = io.filesystems.FileSystems.split(self.path)
+    num_shards = kwargs.pop('num_shards', None)
+    max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+    write_to_files_kwargs = {}
+    if num_shards is not None:
+      write_to_files_kwargs['shards'] = num_shards
+      write_to_files_kwargs['max_writers_per_bundle'] = (
+          max_writers_per_bundle if max_writers_per_bundle is not None else 0)
+    elif max_writers_per_bundle is not None:
+      write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
     return pcoll | fileio.WriteToFiles(
         path=dir,
-        shards=self.kwargs.pop('num_shards', None),
-        file_naming=self.kwargs.pop(
-            'file_naming', fileio.default_file_naming(name)),
+        file_naming=kwargs.pop('file_naming', 
fileio.default_file_naming(name)),
         sink=lambda _: _WriteToPandasFileSink(
-            self.writer, self.args, self.kwargs, self.incremental, 
self.binary))
+            self.writer, self.args, kwargs, self.incremental, self.binary),

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   For better readability and to avoid mutating the `kwargs` dictionary inside 
the transform call, consider extracting `file_naming` beforehand. This makes it 
explicit that the `kwargs` passed to the `sink` factory has been fully prepared 
and does not contain Beam-specific parameters like `file_naming`. It also 
avoids relying on the left-to-right evaluation order of function arguments in 
Python.
   
   ```python
       file_naming = kwargs.pop('file_naming', fileio.default_file_naming(name))
       return pcoll | fileio.WriteToFiles(
           path=dir,
           file_naming=file_naming,
           sink=lambda _: _WriteToPandasFileSink(
               self.writer, self.args, kwargs, self.incremental, self.binary),
   ```



##########
sdks/python/apache_beam/dataframe/io.py:
##########
@@ -680,17 +680,26 @@ def __init__(
     self.binary = binary
 
   def expand(self, pcoll):
-    if 'file_naming' in self.kwargs:
+    kwargs = dict(self.kwargs)
+    if 'file_naming' in kwargs:
       dir, name = self.path, ''
     else:
       dir, name = io.filesystems.FileSystems.split(self.path)
+    num_shards = kwargs.pop('num_shards', None)
+    max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+    write_to_files_kwargs = {}
+    if num_shards is not None:
+      write_to_files_kwargs['shards'] = num_shards
+      write_to_files_kwargs['max_writers_per_bundle'] = (
+          max_writers_per_bundle if max_writers_per_bundle is not None else 0)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To strictly guarantee that `num_shards` is honored, `max_writers_per_bundle` 
should be forced to `0` whenever `num_shards` is specified. If a user provides 
a non-zero value for `max_writers_per_bundle` along with `num_shards`, Beam 
might still use the 'unsharded' path if the number of writers in a bundle is 
low, which would result in more than `num_shards` files being produced. Since 
`num_shards` implies a global sharding requirement, forcing the shuffle by 
setting `max_writers_per_bundle=0` is the most robust approach to ensure the 
parameter is honored.
   
   ```python
         write_to_files_kwargs['max_writers_per_bundle'] = 0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to