jrmccluskey commented on code in PR #29175:
URL: https://github.com/apache/beam/pull/29175#discussion_r1378090494


##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -728,6 +836,86 @@ def expand(self, pcoll):
               self._batch_size_estimator, self._element_size_fn))
 
 
[email protected]_input_types(T)
[email protected]_output_types(List[T])
+class StatefulBatchElements(PTransform):
+  """A Transform that batches elements for amortized processing.
+
+  This transform is designed to precede operations whose processing cost
+  is of the form
+
+      time = fixed_cost + num_elements * per_element_cost
+
+  where the per element cost is (often significantly) smaller than the fixed
+  cost and could be amortized over multiple elements.  It consumes a 
PCollection
+  of element type T and produces a PCollection of element type List[T].
+
+  This transform attempts to find the best batch size between the minimim
+  and maximum parameters by profiling the time taken by (fused) downstream
+  operations. For a fixed batch size, set the min and max to be equal.
+
+  Elements are batched per-window and batches emitted in the window
+  corresponding to its contents. Each batch is emitted with a timestamp at
+  the end of their window.
+
+  Args:
+    min_batch_size: (optional) the smallest size of a batch
+    max_batch_size: (optional) the largest size of a batch
+    target_batch_overhead: (optional) a target for fixed_cost / time,
+        as used in the formula above
+    target_batch_duration_secs: (optional) a target for total time per bundle,
+        in seconds, excluding fixed cost
+    target_batch_duration_secs_including_fixed_cost: (optional) a target for
+        total time per bundle, in seconds, including fixed cost
+    element_size_fn: (optional) A mapping of an element to its contribution to
+        batch size, defaulting to every element having size 1.  When provided,
+        attempts to provide batches of optimal total size which may consist of
+        a varying number of elements.
+    variance: (optional) the permitted (relative) amount of deviation from the
+        (estimated) ideal batch size used to produce a wider base for
+        linear interpolation
+    clock: (optional) an alternative to time.time for measuring the cost of
+        donwstream operations (mostly for testing)
+    record_metrics: (optional) whether or not to record beam metrics on
+        distributions of the batch size. Defaults to True.
+  """
+  def __init__(
+      self,
+      min_batch_size=1,
+      max_batch_size=10000,
+      target_batch_overhead=.05,
+      target_batch_duration_secs=10,
+      target_batch_duration_secs_including_fixed_cost=None,
+      max_batch_duration_secs=100,

Review Comment:
   That is correct, it's the only additional parameter. The separate transform 
makes things easier for testing IMO, but from an actual implementation 
standpoint it isn't particularly difficult to manage. I'm not opposed to 
rerouting BatchElements if the param is set, since that satisfies the "optional 
route" rollout step we discussed offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to