This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c90fd8320a5 Use bytes instead of mibs. (#31457)
c90fd8320a5 is described below

commit c90fd8320a571573e9375318818bdb1a7c15d421
Author: tvalentyn <[email protected]>
AuthorDate: Thu May 30 11:21:53 2024 -0700

    Use bytes instead of mibs. (#31457)
    
    * Use bytes.
    
    * Update sdks/python/apache_beam/runners/worker/data_plane.py
---
 sdks/python/apache_beam/runners/worker/data_plane.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index ad4790a59e1..3dd6bdbe9ae 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -135,12 +135,12 @@ class 
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
       close_callback=None,  # type: Optional[Callable[[bytes], None]]
       flush_callback=None,  # type: Optional[Callable[[bytes], None]]
       size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD,  # type: int
-      large_buffer_warn_threshold_mib = 512  # type: int
+      large_buffer_warn_threshold_bytes = 512 << 20  # type: int
   ):
     super().__init__(close_callback)
     self._flush_callback = flush_callback
     self._size_flush_threshold = size_flush_threshold
-    self._large_buffer_warn_threshold_mib = large_buffer_warn_threshold_mib
+    self._large_buffer_warn_threshold_bytes = large_buffer_warn_threshold_bytes
 
   # This must be called explicitly to avoid flushing partial elements.
   def maybe_flush(self):
@@ -152,8 +152,8 @@ class 
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
     # type: () -> None
     if self._flush_callback:
       size = self.size()
-      if (self._large_buffer_warn_threshold_mib and
-          size > self._large_buffer_warn_threshold_mib << 20):
+      if (self._large_buffer_warn_threshold_bytes and
+          size > self._large_buffer_warn_threshold_bytes):
         if size > _FLUSH_MAX_SIZE:
           raise ValueError(
               f'Buffer size {size} exceeds GRPC limit {_FLUSH_MAX_SIZE}. '
@@ -167,7 +167,7 @@ class 
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
         if self._large_flush_last_observed_timestamp + 600 < time.time():
           self._large_flush_last_observed_timestamp = time.time()
           _LOGGER.warning(
-              'Data output stream buffer size %s exceeds %s MB. '
+              'Data output stream buffer size %s exceeds %s bytes. '
               'This is likely due to a large element in a PCollection. '
               'Large elements increase pipeline RAM requirements and '
               'can cause runtime errors. '
@@ -176,7 +176,7 @@ class 
SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
               'storage systems, and use PCollections to pass their metadata, '
               'or use a custom coder that reduces the element\'s size.',
               size,
-              self._large_buffer_warn_threshold_mib)
+              self._large_buffer_warn_threshold_bytes)
 
       self._flush_callback(self.get())
       self._clear()

Reply via email to