danthev commented on a change in pull request #14723: URL: https://github.com/apache/beam/pull/14723#discussion_r650117509
########## File path: sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py ########## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import logging +import time +from typing import TypeVar + +from apache_beam import typehints +from apache_beam.io.gcp.datastore.v1new import util +from apache_beam.metrics.metric import Metrics +from apache_beam.transforms import DoFn +from apache_beam.utils.retry import FuzzedExponentialIntervals + +T = TypeVar('T') + +_LOG = logging.getLogger(__name__) + + [email protected]_input_types(T) [email protected]_output_types(T) +class RampupThrottlingFn(DoFn): + """A ``DoFn`` that throttles ramp-up following an exponential function. + + An implementation of a client-side throttler that enforces a gradual ramp-up, + broadly in line with Datastore best practices. See also + https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic. + """ + def to_runner_api_parameter(self, unused_context): Review comment: Done. ########## File path: sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py ########## @@ -0,0 +1,80 @@ +import datetime +import logging +import time +from typing import TypeVar + +from apache_beam import typehints +from apache_beam.io.gcp.datastore.v1new import util +from apache_beam.transforms import DoFn +from apache_beam.utils.retry import FuzzedExponentialIntervals + +T = TypeVar('T') + +_LOG = logging.getLogger(__name__) + + [email protected]_input_types(T) [email protected]_output_types(T) +class RampupThrottlingFn(DoFn): + """A ``DoFn`` that throttles ramp-up following an exponential function. + + An implementation of a client-side throttler that enforces a gradual ramp-up, + broadly in line with Datastore best practices. See also + https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic. + """ + def to_runner_api_parameter(self, unused_context): + from apache_beam.internal import pickler + config = { + 'num_workers': self._num_workers, + } + return 'beam:fn:rampup_throttling:v0', pickler.dumps(config) + + _BASE_BUDGET = 500 + _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5) + + def __init__(self, num_workers, *unused_args, **unused_kwargs): + """Initializes a ramp-up throttler transform. + + Args: + num_workers: A hint for the expected number of workers, used to derive + the local rate limit. + """ + super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs) + self._num_workers = num_workers + self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000) + self._first_instant = datetime.datetime.now() + + def _calc_max_ops_budget( + self, + first_instant: datetime.datetime, + current_instant: datetime.datetime): + """Function that returns per-second budget according to best practices. + + The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is + the number of minutes since start time. + """ + timedelta_since_first = current_instant - first_instant + growth = max( + 0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) / + self._RAMP_UP_INTERVAL) + max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth)) Review comment: Right, if I understand this correctly runners can clone DoFns differently. The two main kinds of state for this DoFn are the first instant (the important one, constant upon initialization), and the per-second budget. Both are set in the constructor, so even if they are cloned for every bundle and the runner produces lots of bundles, the right information is kept and the model works out given active bundles are bounded by the number of workers. When a bundle switches from inactive to active, it may not account for a "half-full" per-second budget on a previous bundle depending on when it was cloned, but that resolves itself after at most a second. ########## File path: sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py ########## @@ -0,0 +1,80 @@ +import datetime +import logging +import time +from typing import TypeVar + +from apache_beam import typehints +from apache_beam.io.gcp.datastore.v1new import util +from apache_beam.transforms import DoFn +from apache_beam.utils.retry import FuzzedExponentialIntervals + +T = TypeVar('T') + +_LOG = logging.getLogger(__name__) + + [email protected]_input_types(T) [email protected]_output_types(T) +class RampupThrottlingFn(DoFn): + """A ``DoFn`` that throttles ramp-up following an exponential function. + + An implementation of a client-side throttler that enforces a gradual ramp-up, + broadly in line with Datastore best practices. See also + https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic. + """ + def to_runner_api_parameter(self, unused_context): + from apache_beam.internal import pickler + config = { + 'num_workers': self._num_workers, + } + return 'beam:fn:rampup_throttling:v0', pickler.dumps(config) + + _BASE_BUDGET = 500 + _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5) + + def __init__(self, num_workers, *unused_args, **unused_kwargs): + """Initializes a ramp-up throttler transform. + + Args: + num_workers: A hint for the expected number of workers, used to derive + the local rate limit. + """ + super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs) + self._num_workers = num_workers + self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000) + self._first_instant = datetime.datetime.now() + + def _calc_max_ops_budget( + self, + first_instant: datetime.datetime, + current_instant: datetime.datetime): + """Function that returns per-second budget according to best practices. + + The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is + the number of minutes since start time. + """ + timedelta_since_first = current_instant - first_instant + growth = max( + 0.0, (timedelta_since_first - self._RAMP_UP_INTERVAL) / + self._RAMP_UP_INTERVAL) + max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth)) Review comment: Right, if I understand this correctly runners can clone DoFns differently. The two main kinds of state for this DoFn are the first instant (the important one, constant upon initialization), and the per-second budget. Both are set in the constructor, so even if they are cloned for every bundle and the runner produces lots of bundles, the right information is kept and the model works out given active bundles are bounded by the number of workers. When a bundle switches from inactive to active, it may not account for a "half-full" per-second budget on a previous bundle depending on when it was cloned, but that resolves itself after at most a second. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
