[
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=252112&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-252112
]
ASF GitHub Bot logged work on BEAM-7443:
----------------------------------------
Author: ASF GitHub Bot
Created on: 31/May/19 22:58
Start Date: 31/May/19 22:58
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #8641: [BEAM-7443]
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289573594
##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1182,37 +1182,69 @@ def check_done(self):
def try_split(self, fraction_of_remainder):
"""Splits current restriction based on fraction_of_remainder.
- Invoked when SDK receiving ProcessBundleSplitRequest during processing
- bundle.
+ If splitting the current restriction is possible, the current restriction
is
+ split into a primary and residual restriction pair. This invocation updates
+ the ``current_restriction()`` to be the primary restriction effectively
+ having the current ``DoFn.process()`` execution responsible for performing
+ the work that the primary restriction represents. The residual restriction
+ will be executed in a separate ``DoFn.process()`` invocation (likely in a
+ different process). The work performed by executing the primary and
residual
+ restrictions as separate ``DoFn.process()`` invocations MUST be equivalent
+ to the work performed as if this split never occurred.
+
+ The ``fraction_of_remainder`` should be used in a best effort manner to
+ choose a primary and residual restriction based upon the fraction of the
+ remaining work that the current ``DoFn.process()`` invocation is
responsible
+ for. For example, if a ``DoFn.process()`` was reading a file with a
+ restriction representing the offset range [100, 200) and has processed up
to
+ offset 130 with a fraction_of_remainder of 0.7, the primary and residual
+ restrictions returned would be [100, 179), [179, 200) (note: current_offset
+ + fraction_of_remainder * remaining_work = 130 + 0.7 * 70 = 179).
+
+ It is very important for pipeline scaling and end to end pipeline execution
+ that try_split is implemented well.
Args:
- fraction_of_remainder: a fraction of (cur_pos, stop_pos).
+ fraction_of_remainder: A hint as to the fraction of work the primary
+ restriction should represent based upon the current known remaining
amount
+ of work.
- Returns: ``None`` when current restriction has been checkpointed, or
- split_point is out of current restriction range. Otherwise, return
- ((start_pos, split_pos), (split_pos, stop_pos)).
+ Returns: ``(primary_restriction, residual_restriction)`` if a split was
+ possible, otherwise returns ``None``.
** Thread safety **
- Accessing to position and checkpoint status should be guarded by a single
- lock object.
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
"""
raise NotImplementedError
def try_claim(self, position):
- """ Claims position as current_position.
+ """ Attempts to claim the block of work in the current restriction
+ identified by the given position.
+
+ If this succeeds, the DoFn MUST execute the entire block of work. If it
+ fails, the ``DoFn.process()`` MUST return ``None`` without performing any
+ additional work or emitting (note that emitting output or performing work
+ from ``DoFn.process()`` is also not allowed before the first call of this
+ method).
Args:
position: current position that wants to be claimed.
Returns: ``True`` if the position can be claimed as current_position.
Otherwise, returns ``False``.
** Thread safety **
- Accessing to position should be guarded by a single lock object.
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
"""
raise NotImplementedError
def defer_remainder(self, watermark=None):
- """ Invokes checkpoint() in an SDF.process()
Review comment:
I filed https://issues.apache.org/jira/browse/BEAM-7472
I would add this as a TODO and just mark these methods as deprecated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 252112)
Time Spent: 4h (was: 3h 50m)
> BoundedSource->SDF needs a wrapper in Python SDK
> -------------------------------------------------
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)