[
https://issues.apache.org/jira/browse/BEAM-1630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275258#comment-16275258
]
ASF GitHub Bot commented on BEAM-1630:
--------------------------------------
chamikaramj commented on a change in pull request #4064: [BEAM-1630] Adds
support for processing Splittable DoFns using DirectRunner.
URL: https://github.com/apache/beam/pull/4064#discussion_r153369747
##########
File path: sdks/python/apache_beam/io/restriction_trackers.py
##########
@@ -0,0 +1,117 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""`iobase.RestrictionTracker` implementations provided with Apache Beam."""
+
+import threading
+
+from apache_beam.io.iobase import RestrictionTracker
+from apache_beam.io.range_trackers import OffsetRangeTracker
+
+
+class OffsetRange(object):
+
+ def __init__(self, start, stop):
+ if start > stop:
+ raise ValueError(
+ 'Start offset must be not be larger than the stop offset. '
+ 'Received %d and %d respectively.', start, stop)
+ self.start = start
+ self.stop = stop
+
+ def __eq__(self, other):
+ if not isinstance(other, OffsetRange):
+ return False
+
+ return self.start == other.start and self.stop == other.stop
+
+ def __ne__(self, other):
+ if not isinstance(other, OffsetRange):
+ return True
+
+ return not (self.start == other.start and self.stop == other.stop)
+
+ def split(self, desired_num_offsets_per_split, min_num_offsets_per_split=1):
+ current_split_start = self.start
+ max_split_size = max(desired_num_offsets_per_split,
+ min_num_offsets_per_split)
+ while current_split_start < self.stop:
+ current_split_stop = min(current_split_start + max_split_size, self.stop)
+ remaining = self.stop - current_split_stop
+
+ # Avoiding a small split at the end.
+ if (remaining < desired_num_offsets_per_split / 4 or
+ remaining < min_num_offsets_per_split):
+ current_split_stop = self.stop
+
+ yield OffsetRange(current_split_start, current_split_stop)
+ current_split_start = current_split_stop
+
+ def new_tracker(self):
+ return OffsetRangeTracker(self.start, self.stop)
+
+
+class OffsetRestrictionTracker(RestrictionTracker):
+ """An `iobase.RestrictionTracker` implementations for byte offsets."""
+
+ def __init__(self, start_position, stop_position):
+ self._range = OffsetRange(start_position, stop_position)
+ self._current_position = None
+ self._last_claim_attempt = None
+ self._checkpointed = False
+ self._lock = threading.Lock()
+
+ def check_done(self):
+ if self._last_claim_attempt < self._range.stop - 1:
+ raise ValueError(
+ 'OffsetRestrictionTracker is not done since work in range [%s, %s) '
+ 'has not been claimed.',
+ self._last_claim_attempt if self._last_claim_attempt is not None
+ else self._range.start,
+ self._range.stop)
+
+ def current_restriction(self):
+ return (self._range.start, self._range.stop)
+
+ def start_position(self):
+ return self._range.start
+
+ def stop_position(self):
+ return self._range.stop
+
+ def try_claim(self, position):
+ with self._lock:
+ self._last_claim_attempt = position
+ if position >= self._range.start and position < self._range.stop:
+ self._current_position = position
+ return True
+
+ return False
+
+ def checkpoint(self):
+ with self._lock:
+ residual_range = (
+ (self._range.start, self._range.stop)
+ if self._current_position is None
+ else (self._current_position + 1, self._range.stop))
+ # If self._current_position is 'None' no records have been claimed so
+ # residual should start from self._range.start.
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Splittable DoFn to Python SDK
> ---------------------------------
>
> Key: BEAM-1630
> URL: https://issues.apache.org/jira/browse/BEAM-1630
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Chamikara Jayalath
> Assignee: Chamikara Jayalath
>
> Splittable DoFn [1] is currently being implemented for Java SDK [2]. We
> should add this to Python SDK as well.
> Following document proposes an API for this.
> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit?usp=sharing
> [1] https://s.apache.org/splittable-do-fn
> [2]
> https://lists.apache.org/thread.html/0ce61ac162460a149d5c93cdface37cc383f8030fe86ca09e5699b18@%3Cdev.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)