[
https://issues.apache.org/jira/browse/BEAM-7473?focusedWorklogId=345615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345615
]
ASF GitHub Bot logged work on BEAM-7473:
----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Nov/19 22:17
Start Date: 18/Nov/19 22:17
Worklog Time Spent: 10m
Work Description: robertwb commented on pull request #10118: [BEAM-7473]
Pack RangeTracker into restriction
URL: https://github.com/apache/beam/pull/10118#discussion_r347636628
##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1402,20 +1402,36 @@ class _SDFBoundedSourceWrapper(ptransform.PTransform):
NOTE: This transform can only be used with beam_fn_api enabled.
"""
+
+ class _SDFBoundedSourceRestriction(object):
+ """ A restriction wraps SourceBundle and RangeTracker. """
+ def __init__(self, source_bundle, range_tracker=None):
+ self.source_bundle = source_bundle
+ self.range_tracker = range_tracker
+
+ def __reduce__(self):
+ # The instance of RangeTracker shouldn't be serialized.
+ return (self.__class__, (self.source_bundle, ))
+
+
class _SDFBoundedSourceRestrictionTracker(RestrictionTracker):
"""An `iobase.RestrictionTracker` implementations for wrapping
BoundedSource
- with SDF.
+ with SDF. The tracked restriction is a (SourceBundle, RangeTracker) pair.
+ In order to save bytes sent across the wire, the RangeTracker is set as
+ system tracking RangeTracker only when current_restriction is called.
Delegated RangeTracker guarantees synchronization safety.
"""
def __init__(self, restriction):
- if not isinstance(restriction, SourceBundle):
+ if not isinstance(restriction,
+ _SDFBoundedSourceWrapper._SDFBoundedSourceRestriction):
raise ValueError('Initializing SDFBoundedSourceRestrictionTracker'
- 'requires a SourceBundle')
- self._delegate_range_tracker = restriction.source.get_range_tracker(
- restriction.start_position, restriction.stop_position)
- self._source = restriction.source
- self._weight = restriction.weight
+ ' requires a _SDFBoundedSourceRestriction')
+ self._source = restriction.source_bundle.source
+ self._weight = restriction.source_bundle.weight
+ self._delegate_range_tracker = self._source.get_range_tracker(
Review comment:
How about just adding a range_tracker() method to
_SDFBoundedSourceRestriction? Then _delegate_range_tracker would not have to be
tracked here at all.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 345615)
Time Spent: 1h 20m (was: 1h 10m)
> Update RestrictionTracker within Python to not be required to be thread safe
> ----------------------------------------------------------------------------
>
> Key: BEAM-7473
> URL: https://issues.apache.org/jira/browse/BEAM-7473
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core, sdk-py-harness
> Reporter: Luke Cwik
> Assignee: Boyuan Zhang
> Priority: Major
> Fix For: 2.17.0
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> The commit
> [https://github.com/apache/beam/commit/8faffb2bcf28ccab5e9a95322743cc60df65077c#diff-ed95abb6bc30a9ed07faef5c3fea93f0]
> modified the Java SDK removed the need for users to be thread safe and
> instead made the framework provide the necessary locking around a restriction
> tracker.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)