This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 290e3721b50 WriteToJson - force num_shards (#38484)
290e3721b50 is described below
commit 290e3721b503faf84bd83bf0b99644ccf9c87491
Author: Derrick Williams <[email protected]>
AuthorDate: Wed May 20 16:31:00 2026 -0400
WriteToJson - force num_shards (#38484)
* force number of shards
* update logic based on gemini
* address another gemini comment
* address one more round
---
sdks/python/apache_beam/dataframe/io.py | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/io.py
b/sdks/python/apache_beam/dataframe/io.py
index 02423f517ee..cabf6ccd5c8 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -680,17 +680,27 @@ class _WriteToPandas(beam.PTransform):
self.binary = binary
def expand(self, pcoll):
- if 'file_naming' in self.kwargs:
+ kwargs = dict(self.kwargs)
+ if 'file_naming' in kwargs:
dir, name = self.path, ''
else:
dir, name = io.filesystems.FileSystems.split(self.path)
+ num_shards = kwargs.pop('num_shards', None)
+ max_writers_per_bundle = kwargs.pop('max_writers_per_bundle', None)
+ write_to_files_kwargs = {}
+ if num_shards is not None:
+ write_to_files_kwargs['shards'] = num_shards
+ write_to_files_kwargs['max_writers_per_bundle'] = 0
+ elif max_writers_per_bundle is not None:
+ write_to_files_kwargs['max_writers_per_bundle'] = max_writers_per_bundle
+
+ file_naming = kwargs.pop('file_naming', fileio.default_file_naming(name))
return pcoll | fileio.WriteToFiles(
path=dir,
- shards=self.kwargs.pop('num_shards', None),
- file_naming=self.kwargs.pop(
- 'file_naming', fileio.default_file_naming(name)),
+ file_naming=file_naming,
sink=lambda _: _WriteToPandasFileSink(
- self.writer, self.args, self.kwargs, self.incremental,
self.binary))
+ self.writer, self.args, kwargs, self.incremental, self.binary),
+ **write_to_files_kwargs)
class _WriteToPandasFileSink(fileio.FileSink):