This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.56.0 by this push:
new 674ad975f6f Change caching of global window inputs to be guarded by
experiment (#31013) (#31035)
674ad975f6f is described below
commit 674ad975f6f6ba2b6fad744aa7c1b1fbd509c275
Author: Sam Whittle <[email protected]>
AuthorDate: Thu Apr 18 14:55:16 2024 +0200
Change caching of global window inputs to be guarded by experiment (#31013)
(#31035)
* Change caching of global window inputs to be guarded by experiment
disable_global_windowed_args_caching
---
sdks/python/apache_beam/runners/common.pxd | 4 +-
sdks/python/apache_beam/runners/common.py | 75 ++++++++++++++++++++----------
2 files changed, 54 insertions(+), 25 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd
b/sdks/python/apache_beam/runners/common.pxd
index 9fb44af6377..683bf8fcac1 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -100,7 +100,9 @@ cdef class PerWindowInvoker(DoFnInvoker):
cdef dict kwargs_for_process_batch
cdef list placeholders_for_process_batch
cdef bint has_windowed_inputs
- cdef bint cache_globally_windowed_args
+ cdef bint recalculate_window_args
+ cdef bint has_cached_window_args
+ cdef bint has_cached_window_batch_args
cdef object process_method
cdef object process_batch_method
cdef bint is_splittable
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index 82ff939dbae..7a1cef4005e 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -761,6 +761,17 @@ class PerWindowInvoker(DoFnInvoker):
self.current_window_index = None
self.stop_window_index = None
+ # TODO(https://github.com/apache/beam/issues/28776): Remove caching after
+ # fully rolling out.
+ # If true, always recalculate window args. If false, has_cached_window_args
+ # and has_cached_window_batch_args will be set to true if the corresponding
+ # self.args_for_process,have been updated and should be reused directly.
+ self.recalculate_window_args = (
+ self.has_windowed_inputs or 'disable_global_windowed_args_caching' in
+ RuntimeValueProvider.experiments)
+ self.has_cached_window_args = False
+ self.has_cached_window_batch_args = False
+
# Try to prepare all the arguments that can just be filled in
# without any additional work. in the process function.
# Also cache all the placeholders needed in the process function.
@@ -921,16 +932,23 @@ class PerWindowInvoker(DoFnInvoker):
additional_kwargs,
):
# type: (...) -> Optional[SplitResultResidual]
- if self.has_windowed_inputs:
- assert len(windowed_value.windows) <= 1
- window, = windowed_value.windows
+ if self.has_cached_window_args:
+ args_for_process, kwargs_for_process = (
+ self.args_for_process, self.kwargs_for_process)
else:
- window = GlobalWindow()
- side_inputs = [si[window] for si in self.side_inputs]
- side_inputs.extend(additional_args)
- args_for_process, kwargs_for_process = util.insert_values_in_args(
- self.args_for_process, self.kwargs_for_process,
- side_inputs)
+ if self.has_windowed_inputs:
+ assert len(windowed_value.windows) <= 1
+ window, = windowed_value.windows
+ else:
+ window = GlobalWindow()
+ side_inputs = [si[window] for si in self.side_inputs]
+ side_inputs.extend(additional_args)
+ args_for_process, kwargs_for_process = util.insert_values_in_args(
+ self.args_for_process, self.kwargs_for_process, side_inputs)
+ if not self.recalculate_window_args:
+ self.args_for_process, self.kwargs_for_process = (
+ args_for_process, kwargs_for_process)
+ self.has_cached_window_args = True
# Extract key in the case of a stateful DoFn. Note that in the case of a
# stateful DoFn, we set during __init__ self.has_windowed_inputs to be
@@ -1012,20 +1030,29 @@ class PerWindowInvoker(DoFnInvoker):
):
# type: (...) -> Optional[SplitResultResidual]
- if self.has_windowed_inputs:
- assert isinstance(windowed_batch, HomogeneousWindowedBatch)
- assert len(windowed_batch.windows) <= 1
- window, = windowed_batch.windows
+ if self.has_cached_window_batch_args:
+ args_for_process_batch, kwargs_for_process_batch = (
+ self.args_for_process_batch, self.kwargs_for_process_batch)
else:
- window = GlobalWindow()
- side_inputs = [si[window] for si in self.side_inputs]
- side_inputs.extend(additional_args)
- (args_for_process_batch, kwargs_for_process_batch) = (
- util.insert_values_in_args(
- self.args_for_process_batch,
- self.kwargs_for_process_batch,
- side_inputs,
- ))
+ if self.has_windowed_inputs:
+ assert isinstance(windowed_batch, HomogeneousWindowedBatch)
+ assert len(windowed_batch.windows) <= 1
+ window, = windowed_batch.windows
+ else:
+ window = GlobalWindow()
+ side_inputs = [si[window] for si in self.side_inputs]
+ side_inputs.extend(additional_args)
+ args_for_process_batch, kwargs_for_process_batch = (
+ util.insert_values_in_args(
+ self.args_for_process_batch,
+ self.kwargs_for_process_batch,
+ side_inputs,
+ )
+ )
+ if not self.recalculate_window_args:
+ self.args_for_process_batch, self.kwargs_for_process_batch = (
+ args_for_process_batch, kwargs_for_process_batch)
+ self.has_cached_window_batch_args = True
for i, p in self.placeholders_for_process_batch:
if core.DoFn.ElementParam == p:
@@ -1541,8 +1568,8 @@ class _OutputHandler(OutputHandler):
tagged_receivers, # type: Mapping[Optional[str], Receiver]
per_element_output_counter,
output_batch_converter, # type: Optional[BatchConverter]
- process_yields_batches, # type: bool,
- process_batch_yields_elements, # type: bool,
+ process_yields_batches, # type: bool
+ process_batch_yields_elements, # type: bool
):
"""Initializes ``_OutputHandler``.