boyuanzz commented on a change in pull request #14439:
URL: https://github.com/apache/beam/pull/14439#discussion_r609076402



##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1499,33 +1499,36 @@ def source(self):
     return self._source_bundle.source
 
   def try_split(self, fraction_of_remainder):
-    consumed_fraction = self.range_tracker().fraction_consumed()
-    fraction = (
-        consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
-    position = self.range_tracker().position_at_fraction(fraction)
-    # Need to stash current stop_pos before splitting since
-    # range_tracker.split will update its stop_pos if splits
-    # successfully.
-    stop_pos = self._source_bundle.stop_position
-    split_result = self.range_tracker().try_split(position)
-    if split_result:
-      split_pos, split_fraction = split_result
-      primary_weight = self._source_bundle.weight * split_fraction
-      residual_weight = self._source_bundle.weight - primary_weight
-      # Update self to primary weight and end position.
-      self._source_bundle = SourceBundle(
-          primary_weight,
-          self._source_bundle.source,
-          self._source_bundle.start_position,
-          split_pos)
-      return (
-          self,
-          _SDFBoundedSourceRestriction(
-              SourceBundle(
-                  residual_weight,
-                  self._source_bundle.source,
-                  split_pos,
-                  stop_pos)))
+    try:
+      consumed_fraction = self.range_tracker().fraction_consumed()
+      fraction = (
+          consumed_fraction + (1 - consumed_fraction) * fraction_of_remainder)
+      position = self.range_tracker().position_at_fraction(fraction)
+      # Need to stash current stop_pos before splitting since
+      # range_tracker.split will update its stop_pos if splits
+      # successfully.
+      stop_pos = self._source_bundle.stop_position
+      split_result = self.range_tracker().try_split(position)
+      if split_result:
+        split_pos, split_fraction = split_result
+        primary_weight = self._source_bundle.weight * split_fraction
+        residual_weight = self._source_bundle.weight - primary_weight
+        # Update self to primary weight and end position.
+        self._source_bundle = SourceBundle(
+            primary_weight,
+            self._source_bundle.source,
+            self._source_bundle.start_position,
+            split_pos)
+        return (
+            self,
+            _SDFBoundedSourceRestriction(
+                SourceBundle(
+                    residual_weight,
+                    self._source_bundle.source,
+                    split_pos,
+                    stop_pos)))
+    except Exception:
+      return None

Review comment:
       Done. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to