chamikaramj commented on a change in pull request #14723:
URL: https://github.com/apache/beam/pull/14723#discussion_r636546473



##########
File path: 
sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py
##########
@@ -0,0 +1,80 @@
+import datetime
+import logging
+import time
+from typing import TypeVar
+
+from apache_beam import typehints
+from apache_beam.io.gcp.datastore.v1new import util
+from apache_beam.transforms import DoFn
+from apache_beam.utils.retry import FuzzedExponentialIntervals
+
+T = TypeVar('T')
+
+_LOG = logging.getLogger(__name__)
+
+
[email protected]_input_types(T)
[email protected]_output_types(T)
+class RampupThrottlingFn(DoFn):
+  """A ``DoFn`` that throttles ramp-up following an exponential function.
+
+  An implementation of a client-side throttler that enforces a gradual ramp-up,
+  broadly in line with Datastore best practices. See also
+  https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
+  """
+  def to_runner_api_parameter(self, unused_context):
+    from apache_beam.internal import pickler
+    config = {
+        'num_workers': self._num_workers,
+    }
+    return 'beam:fn:rampup_throttling:v0', pickler.dumps(config)
+
+  _BASE_BUDGET = 500
+  _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5)
+
+  def __init__(self, num_workers, *unused_args, **unused_kwargs):
+    """Initializes a ramp-up throttler transform.
+
+     Args:
+       num_workers: A hint for the expected number of workers, used to derive
+                    the local rate limit.
+     """
+    super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs)
+    self._num_workers = num_workers
+    self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000)
+    self._first_instant = datetime.datetime.now()
+
+  def _calc_max_ops_budget(
+      self,
+      first_instant: datetime.datetime,
+      current_instant: datetime.datetime):
+    """Function that returns per-second budget according to best practices.
+
+    The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is
+    the number of minutes since start time.
+    """
+    timedelta_since_first = current_instant - first_instant
+    growth = max(
+        0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) /
+        self._RAMP_UP_INTERVAL)
+    max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))

Review comment:
       Note that a given DoFn executes a bundle [1] not the whole PCollection. 
The runner (for example Dataflow) would have already split the PCollection into 
bundles at this point and you are only limiting execution of a single bundle 
here (which happens in a single worker).
   
   Total number of parallelly executed bundles can be up to the number of 
available workers. Number of available workers depends on the runner. For 
Dataflow batch it's number of VMs * number of cores by default. For streaming 
it can be more (we may have a large number of parallel worker threads).
   
   [1] https://beam.apache.org/documentation/runtime/model/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to