danthev commented on a change in pull request #14723:
URL: https://github.com/apache/beam/pull/14723#discussion_r637178001



##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
##########
@@ -276,15 +277,33 @@ class _Mutate(PTransform):
   Only idempotent Datastore mutation operations (upsert and delete) are
   supported, as the commits are retried when failures occur.
   """
-  def __init__(self, mutate_fn):
+
+  # Default hint for the expected number of workers in the ramp-up throttling
+  # step for write or delete operations.
+  _DEFAULT_HINT_NUM_WORKERS = 500

Review comment:
       I've thought about this before. Generally what I would expect is a slow 
start, but autoscaling quickly scaling up when the throttling limit increases, 
yielding a similar result to starting with the desired number of workers.   
   However, I was weighing whether to report `throttlingMs` or if that stifles 
autoscaling (the Firestore implementation reports it). I thought I remembered 
previous test showing not much of a difference, but I've run a few more tests 
and I do see a significant difference now. This is an example I ran with the 
Java implementation and `maxNumWorkers` at 50, where I commented out 
incrementing the `throttlingMs` counter: 
   ![Screen Shot 2021-05-21 at 11 35 15 
AM](https://user-images.githubusercontent.com/76013657/119188283-4f347800-ba2f-11eb-851d-8102a3834230.png)
   
   That's about what I would expect, autoscaling scales up after 10-15 minutes, 
then the ramp-up proceeds normally. I just ran that same example with the 
original implementation, reporting `throttlingMs`, and Dataflow seems prone to 
getting stuck at 1 worker with very little throughput.
   
   So if I drop `throttlingMs`, autoscaling should be fairly normal in terms of 
behavior. Are there other side effects with autoscaling I should consider?
   

##########
File path: 
sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py
##########
@@ -0,0 +1,80 @@
+import datetime
+import logging
+import time
+from typing import TypeVar
+
+from apache_beam import typehints
+from apache_beam.io.gcp.datastore.v1new import util
+from apache_beam.transforms import DoFn
+from apache_beam.utils.retry import FuzzedExponentialIntervals
+
+T = TypeVar('T')
+
+_LOG = logging.getLogger(__name__)
+
+
[email protected]_input_types(T)
[email protected]_output_types(T)
+class RampupThrottlingFn(DoFn):
+  """A ``DoFn`` that throttles ramp-up following an exponential function.
+
+  An implementation of a client-side throttler that enforces a gradual ramp-up,
+  broadly in line with Datastore best practices. See also
+  https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
+  """
+  def to_runner_api_parameter(self, unused_context):
+    from apache_beam.internal import pickler
+    config = {
+        'num_workers': self._num_workers,
+    }
+    return 'beam:fn:rampup_throttling:v0', pickler.dumps(config)
+
+  _BASE_BUDGET = 500
+  _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5)
+
+  def __init__(self, num_workers, *unused_args, **unused_kwargs):
+    """Initializes a ramp-up throttler transform.
+
+     Args:
+       num_workers: A hint for the expected number of workers, used to derive
+                    the local rate limit.
+     """
+    super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs)
+    self._num_workers = num_workers
+    self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000)
+    self._first_instant = datetime.datetime.now()
+
+  def _calc_max_ops_budget(
+      self,
+      first_instant: datetime.datetime,
+      current_instant: datetime.datetime):
+    """Function that returns per-second budget according to best practices.
+
+    The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is
+    the number of minutes since start time.
+    """
+    timedelta_since_first = current_instant - first_instant
+    growth = max(
+        0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) /
+        self._RAMP_UP_INTERVAL)
+    max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))

Review comment:
       Unless I made a mistake translating this from Java, that "first instant" 
is serialized and kept for each DoFn instance, so the computed per-second 
budget is across bundles, but only _per worker_. That's why I split the budget 
by estimated worker count into a per-worker budget.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to