[
https://issues.apache.org/jira/browse/BEAM-7443?focusedWorklogId=253234&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-253234
]
ASF GitHub Bot logged work on BEAM-7443:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Jun/19 17:45
Start Date: 03/Jun/19 17:45
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #8641: [BEAM-7443]
Create a BoundedSource -> SDF wrapper in Python SDK
URL: https://github.com/apache/beam/pull/8641#discussion_r289958621
##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1168,12 +1177,102 @@ def check_done(self):
invoked by different threads, hence must be made thread-safe, e.g. by using
a single lock object.
+ TODO(BEAM-7473): Remove thread safety requirements from API implementation.
+
Returns: ``True`` if current restriction has been fully processed.
Raises:
~exceptions.ValueError: if there is still any unclaimed work remaining.
"""
raise NotImplementedError
+ def try_split(self, fraction_of_remainder):
+ """Splits current restriction based on fraction_of_remainder.
+
+ If splitting the current restriction is possible, the current restriction
is
+ split into a primary and residual restriction pair. This invocation updates
+ the ``current_restriction()`` to be the primary restriction effectively
+ having the current ``DoFn.process()`` execution responsible for performing
+ the work that the primary restriction represents. The residual restriction
+ will be executed in a separate ``DoFn.process()`` invocation (likely in a
+ different process). The work performed by executing the primary and
residual
+ restrictions as separate ``DoFn.process()`` invocations MUST be equivalent
+ to the work performed as if this split never occurred.
+
+ The ``fraction_of_remainder`` should be used in a best effort manner to
+ choose a primary and residual restriction based upon the fraction of the
+ remaining work that the current ``DoFn.process()`` invocation is
responsible
+ for. For example, if a ``DoFn.process()`` was reading a file with a
+ restriction representing the offset range [100, 200) and has processed up
to
+ offset 130 with a fraction_of_remainder of 0.7, the primary and residual
+ restrictions returned would be [100, 179), [179, 200) (note: current_offset
+ + fraction_of_remainder * remaining_work = 130 + 0.7 * 70 = 179).
+
+ It is very important for pipeline scaling and end to end pipeline execution
+ that try_split is implemented well.
+
+ Args:
+ fraction_of_remainder: A hint as to the fraction of work the primary
+ restriction should represent based upon the current known remaining
amount
+ of work.
+
+ Returns: ``(primary_restriction, residual_restriction)`` if a split was
+ possible, otherwise returns ``None``.
+
+ ** Thread safety **
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
+
+ TODO(BEAM-7473): Remove thread safety requirements from API implementation.
+ """
+ raise NotImplementedError
+
+ def try_claim(self, position):
+ """ Attempts to claim the block of work in the current restriction
+ identified by the given position.
+
+ If this succeeds, the DoFn MUST execute the entire block of work. If it
+ fails, the ``DoFn.process()`` MUST return ``None`` without performing any
+ additional work or emitting output (note that emitting output or performing
+ work from ``DoFn.process()`` is also not allowed before the first call of
+ this method).
+
+ Args:
+ position: current position that wants to be claimed.
+
+ Returns: ``True`` if the position can be claimed as current_position.
+ Otherwise, returns ``False``.
+
+ ** Thread safety **
+
+ Methods of the class ``RestrictionTracker`` including this method may get
+ invoked by different threads, hence must be made thread-safe, e.g. by using
+ a single lock object.
+
+ TODO(BEAM-7473): Remove thread safety requirements from API implementation.
+ """
+ raise NotImplementedError
+
+ def defer_remainder(self, watermark=None):
+ """ Invokes checkpoint() in an SDF.process().
+
+ TODO(BEAM-7472): Mark defer_remainder as deprecated once SDF.process() uses
Review comment:
```suggestion
TODO(BEAM-7472): Remove defer_remainder once SDF.process() uses
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 253234)
Time Spent: 4.5h (was: 4h 20m)
> BoundedSource->SDF needs a wrapper in Python SDK
> -------------------------------------------------
>
> Key: BEAM-7443
> URL: https://issues.apache.org/jira/browse/BEAM-7443
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: Major
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)