This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c90fd8320a5 Use bytes instead of mibs. (#31457)
c90fd8320a5 is described below
commit c90fd8320a571573e9375318818bdb1a7c15d421
Author: tvalentyn <[email protected]>
AuthorDate: Thu May 30 11:21:53 2024 -0700
Use bytes instead of mibs. (#31457)
* Use bytes.
* Update sdks/python/apache_beam/runners/worker/data_plane.py
---
sdks/python/apache_beam/runners/worker/data_plane.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py
b/sdks/python/apache_beam/runners/worker/data_plane.py
index ad4790a59e1..3dd6bdbe9ae 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -135,12 +135,12 @@ class
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
close_callback=None, # type: Optional[Callable[[bytes], None]]
flush_callback=None, # type: Optional[Callable[[bytes], None]]
size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD, # type: int
- large_buffer_warn_threshold_mib = 512 # type: int
+ large_buffer_warn_threshold_bytes = 512 << 20 # type: int
):
super().__init__(close_callback)
self._flush_callback = flush_callback
self._size_flush_threshold = size_flush_threshold
- self._large_buffer_warn_threshold_mib = large_buffer_warn_threshold_mib
+ self._large_buffer_warn_threshold_bytes = large_buffer_warn_threshold_bytes
# This must be called explicitly to avoid flushing partial elements.
def maybe_flush(self):
@@ -152,8 +152,8 @@ class
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
# type: () -> None
if self._flush_callback:
size = self.size()
- if (self._large_buffer_warn_threshold_mib and
- size > self._large_buffer_warn_threshold_mib << 20):
+ if (self._large_buffer_warn_threshold_bytes and
+ size > self._large_buffer_warn_threshold_bytes):
if size > _FLUSH_MAX_SIZE:
raise ValueError(
f'Buffer size {size} exceeds GRPC limit {_FLUSH_MAX_SIZE}. '
@@ -167,7 +167,7 @@ class
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
if self._large_flush_last_observed_timestamp + 600 < time.time():
self._large_flush_last_observed_timestamp = time.time()
_LOGGER.warning(
- 'Data output stream buffer size %s exceeds %s MB. '
+ 'Data output stream buffer size %s exceeds %s bytes. '
'This is likely due to a large element in a PCollection. '
'Large elements increase pipeline RAM requirements and '
'can cause runtime errors. '
@@ -176,7 +176,7 @@ class
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
'storage systems, and use PCollections to pass their metadata, '
'or use a custom coder that reduces the element\'s size.',
size,
- self._large_buffer_warn_threshold_mib)
+ self._large_buffer_warn_threshold_bytes)
self._flush_callback(self.get())
self._clear()