[
https://issues.apache.org/jira/browse/BEAM-7342?focusedWorklogId=251179&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-251179
]
ASF GitHub Bot logged work on BEAM-7342:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/May/19 17:52
Start Date: 30/May/19 17:52
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #8607: [BEAM-7342]
Extend SyntheticPipeline map steps to be splittable.
URL: https://github.com/apache/beam/pull/8607#discussion_r289095136
##########
File path: sdks/python/apache_beam/testing/synthetic_pipeline.py
##########
@@ -116,6 +118,149 @@ def process(self, element):
yield element
+class NonLiquidShardingOffsetRangeTracker(OffsetRestrictionTracker):
+ """ A OffsetRangeTracker that doesn't allow splitting. """
+
+ def try_split(self, split_offset):
+ return # Don't split.
+
+ def checkpoint(self):
+ return # Don't split.
+
+
+class SyntheticSDFStepRestrictionProvider(RestrictionProvider):
+ """A `RestrictionProvider` for SyntheticSDFStep.
+
+ An initial_restriction and split that operate on num_records and ignore
+ source description (element). Splits into initial_splitting_num_bundles.
+ Returns size_estimate_override as restriction size, if set. Otherwise uses
+ element size.
+
+ If initial_splitting_uneven_chunks, produces uneven chunks.
+
+ """
+
+ def __init__(self, num_records, initial_splitting_num_bundles,
+ initial_splitting_uneven_chunks, disable_liquid_sharding,
+ size_estimate_override):
+ self._num_records = num_records
+ self._initial_splitting_num_bundles = initial_splitting_num_bundles
+ self._initial_splitting_uneven_chunks = initial_splitting_uneven_chunks
+ self._disable_liquid_sharding = disable_liquid_sharding
+ self._size_estimate_override = size_estimate_override
+
+ def initial_restriction(self, element):
+ return (0, self._num_records)
+
+ def create_tracker(self, restriction):
+ if self._disable_liquid_sharding:
+ return NonLiquidShardingOffsetRangeTracker(restriction[0],
+ restriction[1])
+ else:
+ # OffsetRange.new_tracker returns a RangeTracker - not
RestrictionTracker.
+ return OffsetRestrictionTracker(restriction[0], restriction[1])
+
+ def split_randomly(self, restriction):
+ ''' Randomly split the restriction into the right number of bundles.'''
+ elems = restriction[1] - restriction[0]
+ bundles = self._initial_splitting_num_bundles
+ randomNums = [np.random.randint(0, elems - 1) for _ in
+ range(0, bundles - 1)]
+ print randomNums
+ randomNums.append(0)
+ randomNums.append(elems)
+ randomNums.sort()
+ for i in range(1, len(randomNums)):
+ yield (restriction[0] + randomNums[i - 1],
+ restriction[0] + randomNums[i])
+
+ def split(self, element, restriction):
+ elems = restriction[1] - restriction[0]
+ if (self._initial_splitting_uneven_chunks and
+ self._initial_splitting_num_bundles > 1 and
+ elems > 1):
+ return self.split_randomly(restriction)
+ else:
+ offsets_per_split = max(1, (elems //
self._initial_splitting_num_bundles))
+ result = list(
+ OffsetRange(restriction[0], restriction[1]).split(
+ offsets_per_split, offsets_per_split // 2))
+ return [(x.start, x.stop) for x in result]
+
+ def restriction_size(self, element, restriction):
+ if self._size_estimate_override is not None:
+ return self._size_estimate_override
+ element_size = len(element) if isinstance(element, str) else 1
+ return (restriction[1] - restriction[0]) * element_size
+
+
+def getSyntheticSDFStep(per_element_delay_sec=0,
+ per_bundle_delay_sec=0,
+ output_records_per_input_record=1,
+ output_filter_ratio=0,
+ initial_splitting_num_bundles=8,
+ initial_splitting_uneven_chunks=False,
+ disable_liquid_sharding=False,
+ size_estimate_override=None,):
+ """ A function which returns a SyntheticSDFStep with given parameters. """
+
+ class SyntheticSDFStep(beam.DoFn):
+ """A SplittableDoFn of which behavior can be controlled through
prespecified
+ parameters.
+ """
+
+ def __init__(self, per_element_delay_sec_arg, per_bundle_delay_sec_arg,
+ output_filter_ratio_arg):
+ if per_element_delay_sec_arg and per_element_delay_sec_arg < 1e-3:
+ raise ValueError(
+ 'Per element sleep time must be at least 1e-3. '
+ 'Received: %r', per_element_delay_sec_arg)
+ self._per_element_delay_sec = per_element_delay_sec_arg
+ self._per_bundle_delay_sec = per_bundle_delay_sec_arg
+ self._output_filter_ratio = output_filter_ratio_arg
+
+ def start_bundle(self):
+ self._start_time = time.time()
+
+ def finish_bundle(self):
+ # The target is for the enclosing stage to take as close to as possible
+ # the given number of seconds, so we only sleep enough to make up for
+ # overheads not incurred elsewhere.
+ to_sleep = self._per_bundle_delay_sec - (
+ time.time() - self._start_time)
+
+ # Ignoring sub-millisecond sleep times.
+ if to_sleep >= 1e-3:
+ time.sleep(to_sleep)
+
+ def process(self,
+ element,
+ restriction_tracker=beam.DoFn.RestrictionParam(
+ SyntheticSDFStepRestrictionProvider(
+ output_records_per_input_record,
+ initial_splitting_num_bundles,
+ initial_splitting_uneven_chunks,
+ disable_liquid_sharding,
+ size_estimate_override))):
+ filter_element = False
+ if self._output_filter_ratio > 0:
+ if np.random.random() < self._output_filter_ratio:
+ filter_element = True
+
+ for k in range(*restriction_tracker.current_restriction()):
+ if not restriction_tracker.try_claim(k):
+ return
+
+ if self._per_element_delay_sec > 0:
Review comment:
In the synthetic pipeline map step, the per element delay is how much we
should wait in time per **input** element and not per output that we produce.
I think we should divide the wait time by num elements we are going to emit
and perform these smaller sleeps to make progress through the SDF, wdyt?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 251179)
Time Spent: 3h 20m (was: 3h 10m)
> Extend SyntheticPipeline map steps to be able to be splittable (Beam Python
> SDK)
> --------------------------------------------------------------------------------
>
> Key: BEAM-7342
> URL: https://issues.apache.org/jira/browse/BEAM-7342
> Project: Beam
> Issue Type: New Feature
> Components: testing
> Environment: Beam Python
> Reporter: Lara Schmidt
> Assignee: Lara Schmidt
> Priority: Minor
> Original Estimate: 1m
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Add the ability for map steps to be configured to be splittable.
> Possible configuration options:
> - uneven bundle sizes
> - possible incorrect sizing returned
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)