This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 290e3721b50 WriteToJson - force num_shards (#38484)
290e3721b50 is described below

commit 290e3721b503faf84bd83bf0b99644ccf9c87491
Author: Derrick Williams <[email protected]>
AuthorDate: Wed May 20 16:31:00 2026 -0400

    WriteToJson - force num_shards (#38484)
    
    * force number of shards
    
    * update logic based on gemini
    
    * address another gemini comment
    
    * address one more round
---
 sdks/python/apache_beam/dataframe/io.py | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/io.py 
b/sdks/python/apache_beam/dataframe/io.py
index 02423f517ee..cabf6ccd5c8 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -680,17 +680,27 @@ class _WriteToPandas(beam.PTransform):
     self.binary = binary
 
   def expand(self, pcoll):
-    if 'file_naming' in self.kwargs:
+    kwargs = dict(self.kwargs)
+    if 'file_naming' in kwargs:
       dir, name = self.path, ''
     else:
       dir, name = io.filesystems.FileSystems.split(self.path)
+    num_shards = kwargs.pop('num_shards', None)
+    max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+    write_to_files_kwargs = {}
+    if num_shards is not None:
+      write_to_files_kwargs['shards'] = num_shards
+      write_to_files_kwargs['max_writers_per_bundle'] = 0
+    elif max_writers_per_bundle is not None:
+      write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
+
+    file_naming = kwargs.pop('file_naming', fileio.default_file_naming(name))
     return pcoll | fileio.WriteToFiles(
         path=dir,
-        shards=self.kwargs.pop('num_shards', None),
-        file_naming=self.kwargs.pop(
-            'file_naming', fileio.default_file_naming(name)),
+        file_naming=file_naming,
         sink=lambda _: _WriteToPandasFileSink(
-            self.writer, self.args, self.kwargs, self.incremental, 
self.binary))
+            self.writer, self.args, kwargs, self.incremental, self.binary),
+        **write_to_files_kwargs)
 
 
 class _WriteToPandasFileSink(fileio.FileSink):

Reply via email to