jmdobry opened a new issue, #29350:
URL: https://github.com/apache/beam/issues/29350
### What happened?
When using `BatchElements`, one can precisely control the size of each batch
by setting `min_batch_size` and `max_batch_size` to the same value and by also
providing `element_size_fn`. It almost works. There are cases where exceeding
max_batch_size in any way is unacceptable (example, APIs that have a max
request payload size).
However, the `process` function of `_GlobalWindowsBatchingDoFn` blindly adds
elements to a batch _before_ checking to see if adding that element would
exceed. This can be fixed by changing `process` to only add an element to a
batch if the batch is empty OR adding the element will not exceed
match_batch_size. Example fixed implementation:
```python
if self._running_batch_size and (
(self._running_batch_size + self._element_size_fn(element))
>= self._target_batch_size
):
with self._batch_size_estimator.record_time(self._running_batch_size):
yield window.GlobalWindows.windowed_value_at_end_of_window(self._batch)
self._batch = []
self._running_batch_size = 0
self._target_batch_size = self._batch_size_estimator.next_batch_size()
# End if
self._batch.append(element)
self._running_batch_size += self._element_size_fn(element)
```
### Issue Priority
Priority: 3 (minor)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]