[
https://issues.apache.org/jira/browse/BEAM-12272?focusedWorklogId=604854&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-604854
]
ASF GitHub Bot logged work on BEAM-12272:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jun/21 00:56
Start Date: 02/Jun/21 00:56
Worklog Time Spent: 10m
Work Description: danthev commented on a change in pull request #14723:
URL: https://github.com/apache/beam/pull/14723#discussion_r643578566
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
##########
@@ -276,15 +277,33 @@ class _Mutate(PTransform):
Only idempotent Datastore mutation operations (upsert and delete) are
supported, as the commits are retried when failures occur.
"""
- def __init__(self, mutate_fn):
+
+ # Default hint for the expected number of workers in the ramp-up throttling
+ # step for write or delete operations.
+ _DEFAULT_HINT_NUM_WORKERS = 500
Review comment:
I haven't been able to find a reference to throttling counters in the
Python Dataflow runner code. I've added a counter with the same name as the
others (`cumulativeThrottlingSeconds`) into the DoFn for potential future
proofing., @chamikaramj let me know if that is alright.
I have done a few more tests with the default value of 500, and even then
autoscaling quickly went up to whatever maximum I set (69 was max quota for my
test account). We originally picked such a high default to make sure
unconfigured one-off pipelines don't cause much overload even at 2000 workers,
if worker count tops out at a much lower number there's always the option for a
user to reconfigure.
##########
File path:
sdks/python/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py
##########
@@ -0,0 +1,80 @@
+import datetime
+import logging
+import time
+from typing import TypeVar
+
+from apache_beam import typehints
+from apache_beam.io.gcp.datastore.v1new import util
+from apache_beam.transforms import DoFn
+from apache_beam.utils.retry import FuzzedExponentialIntervals
+
+T = TypeVar('T')
+
+_LOG = logging.getLogger(__name__)
+
+
[email protected]_input_types(T)
[email protected]_output_types(T)
+class RampupThrottlingFn(DoFn):
+ """A ``DoFn`` that throttles ramp-up following an exponential function.
+
+ An implementation of a client-side throttler that enforces a gradual ramp-up,
+ broadly in line with Datastore best practices. See also
+ https://cloud.google.com/datastore/docs/best-practices#ramping_up_traffic.
+ """
+ def to_runner_api_parameter(self, unused_context):
+ from apache_beam.internal import pickler
+ config = {
+ 'num_workers': self._num_workers,
+ }
+ return 'beam:fn:rampup_throttling:v0', pickler.dumps(config)
+
+ _BASE_BUDGET = 500
+ _RAMP_UP_INTERVAL = datetime.timedelta(minutes=5)
+
+ def __init__(self, num_workers, *unused_args, **unused_kwargs):
+ """Initializes a ramp-up throttler transform.
+
+ Args:
+ num_workers: A hint for the expected number of workers, used to derive
+ the local rate limit.
+ """
+ super(RampupThrottlingFn, self).__init__(*unused_args, **unused_kwargs)
+ self._num_workers = num_workers
+ self._successful_ops = util.MovingSum(window_ms=1000, bucket_ms=1000)
+ self._first_instant = datetime.datetime.now()
+
+ def _calc_max_ops_budget(
+ self,
+ first_instant: datetime.datetime,
+ current_instant: datetime.datetime):
+ """Function that returns per-second budget according to best practices.
+
+ The exact function is `500 / num_shards * 1.5^max(0, (x-5)/5)`, where x is
Review comment:
Done.
##########
File path: sdks/python/apache_beam/io/gcp/datastore/v1new/datastoreio.py
##########
@@ -276,15 +277,33 @@ class _Mutate(PTransform):
Only idempotent Datastore mutation operations (upsert and delete) are
supported, as the commits are retried when failures occur.
"""
- def __init__(self, mutate_fn):
+
+ # Default hint for the expected number of workers in the ramp-up throttling
+ # step for write or delete operations.
+ _DEFAULT_HINT_NUM_WORKERS = 500
Review comment:
I haven't been able to find a reference to throttling counters in the
Python Dataflow runner code. I've added a counter with the same name as the
others (`cumulativeThrottlingSeconds`) into the DoFn for potential future
proofing, @chamikaramj let me know if that is alright.
I have done a few more tests with the default value of 500, and even then
autoscaling quickly went up to whatever maximum I set (69 was max quota for my
test account). We originally picked such a high default to make sure
unconfigured one-off pipelines don't cause much overload even at 2000 workers,
if worker count tops out at a much lower number there's always the option for a
user to reconfigure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 604854)
Time Spent: 4h 20m (was: 4h 10m)
> Python - Backport FirestoreIO connector's ramp-up to DatastoreIO connector
> --------------------------------------------------------------------------
>
> Key: BEAM-12272
> URL: https://issues.apache.org/jira/browse/BEAM-12272
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Reporter: Daniel Thevessen
> Assignee: Daniel Thevessen
> Priority: P2
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> The FirestoreIO connector (BEAM-8376) for the Java SDK is currently out for
> review.
> Some of the features for the Firestore connector are new relative to the
> Datastore connector, so the Firestore team would like to add them there as
> well. Most notably the Firestore connector has a gradual ramp-up feature that
> slowly increases throughput in line with Datastore/Firestore best practices.
> Some minor adjustments to constants for batch sizing and backoff may also be
> worth looking at.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)