[ 
https://issues.apache.org/jira/browse/BEAM-7342?focusedWorklogId=244421&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-244421
 ]

ASF GitHub Bot logged work on BEAM-7342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/19 23:05
            Start Date: 17/May/19 23:05
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on pull request #8607: [BEAM-7342] 
Extend SyntheticPipeline map steps to be splittable.
URL: https://github.com/apache/beam/pull/8607#discussion_r285312101
 
 

 ##########
 File path: sdks/python/apache_beam/testing/synthetic_pipeline.py
 ##########
 @@ -115,6 +115,108 @@ def process(self, element):
       for _ in range(self._output_records_per_input_record):
         yield element
 
+class SyntheticSDFStepRestrictionProvider(RestrictionProvider):
+  """A `RestrictionProvider` for SyntheticSDFStep.
+
+  In initial_restriction and split that operates on num_records and ignores
+  source description (element).
+
+  """
+
+  def __init__(self, num_records, initial_splitting_num_bundles):
+    self._num_records = num_records
+    self._initial_splitting_num_bundles = initial_splitting_num_bundles
+
+  def initial_restriction(self, element):
+    return (0, self._num_records)
+
+  def create_tracker(self, restriction):
+    return restriction_trackers.OffsetRestrictionTracker(
+        restriction[0], restriction[1])
+
+  def split(self, element, restriction):
+    bundle_ranges = []
+    start_position, stop_position = restriction
+    if self._initial_splitting_num_bundles < 2:
+      bundle_ranges.append((start_position, stop_position))
+      return bundle_ranges
+    num_bundles = self._initial_splitting_num_bundles
+    num_records_per_bundle = (stop_position - start_position) // num_bundles
+
+    for i in range(0, num_bundles):
+      final_position = start_position + num_records_per_bundle
+      if i == num_bundles - 1:
+        final_position = stop_position  # Final bundle goes to end
+      bundle_ranges.append((start_position, final_position))
+      start_position = final_position
+    return bundle_ranges
+
+  def restriction_size(self, element, restriction):
+    (start, stop) = restriction
+    element_size = len(element) if isinstance(element, str) else 1
+    return (stop - start) * element_size
+
+""" A function which returns a SyntheticSDFStep with given parameters. """
 
 Review comment:
   We would prefer comments going below the function signature, like:
   ```python
   def getSyntheticSDFStep():
   """some comments"""
   ```
   Otherwise, lint check may complain. You can run ./gradlew 
:sdks:python:lintPy27  to run lint check.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 244421)
    Time Spent: 20m  (was: 10m)

> Extend SyntheticPipeline map steps to be able to be splittable (Beam Python 
> SDK)
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-7342
>                 URL: https://issues.apache.org/jira/browse/BEAM-7342
>             Project: Beam
>          Issue Type: New Feature
>          Components: testing
>         Environment: Beam Python
>            Reporter: Lara Schmidt
>            Assignee: Lara Schmidt
>            Priority: Minor
>   Original Estimate: 1m
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add the ability for map steps to be configured to be splittable. 
> Possible configuration options:
>  - uneven bundle sizes
>  - possible incorrect sizing returned



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to