boyuanzz commented on a change in pull request #13443:
URL: https://github.com/apache/beam/pull/13443#discussion_r533773617



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -181,39 +193,153 @@ def expand(self, root):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), than 
this
+  would give a view of a 500-byte file consisting of bytes bytes 110 to 609
+  (inclusive) of the underlying file.
+
+  As with all SDF trackers, the endpoint may change dynamically during reading.
+  """
+  def __init__(
+      self,
+      underlying,
+      tracker,
+      delim=b'\n',
+      chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
+    self._underlying = underlying
+    self._tracker = tracker
+    self._buffer_start_pos = self._tracker.current_restriction().start
+    self._delim = delim
+    self._chunk_size = chunk_size
+
+    self._buffer = self._empty = self._delim[:0]
+    self._done = False
+    if self._buffer_start_pos > 0:
+      # Seek to first delimiter after the start position.
+      self._underlying.seek(self._buffer_start_pos)
+      if self.buffer_to_delim():
+        line_start = self._buffer.index(self._delim) + len(self._delim)
+        self._buffer_start_pos += line_start
+        self._buffer = self._buffer[line_start:]
+      else:
+        self._done = True
+
+  def readable(self):
+    return True
+
+  def writable(self):
+    return False
+
+  def seekable(self):
+    return False
+
+  @property
+  def closed(self):
+    return False
+
+  def __iter__(self):
+    # For pandas is_file_like.
+    raise NotImplementedError()
+
+  def buffer_to_delim(self, offset=0):
+    """Read enough of the file such that the buffer contains the delimiter, or
+    end-of-file is reached.
+    """
+    if self._delim in self._buffer[offset:]:
+      return True
+    while True:
+      chunk = self._underlying.read(self._chunk_size)
+      self._buffer += chunk
+      if self._delim in chunk:
+        return True
+      elif not chunk:
+        return False
+
+  def read(self, size=-1):
+    if self._done:
+      return self._empty
+    elif size == -1:
+      self._buffer += self._underlying.read()
+    elif not self._buffer:
+      self._buffer = self._underlying.read(size)
+
+    if self._tracker.try_claim(self._buffer_start_pos + len(self._buffer)):
+      res = self._buffer
+      self._buffer = self._empty
+      self._buffer_start_pos += len(res)
+    else:
+      offset = self._tracker.current_restriction().stop - 
self._buffer_start_pos
+      if self.buffer_to_delim(offset):
+        end_of_line = self._buffer.index(self._delim, offset)
+        res = self._buffer[:end_of_line + len(self._delim)]
+      else:
+        res = self._buffer
+      self._done = True
+    return res
+
+
+class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
+  def __init__(self, reader, args, kwargs, incremental, splittable, binary):
     # avoid pickling issues
     if reader.__module__.startswith('pandas.'):
       reader = reader.__name__
     self.reader = reader
     self.args = args
     self.kwargs = kwargs
     self.incremental = incremental
+    self.splittable = splittable

Review comment:
       I want to make sure I understand how `incremental` and `splittable` work 
together:
   * If `incremental == True` and `splittable == True`, the reading is both 
splittable and we will have progress on that.
   * If `incremental == True` and `splittable == False`, the reading is not 
splittable but we will have progress on that.
   * If `incremental ==False` and `splittable ==True`, the reading is not 
splittable and no progress on it.
   * If `incremental ==False` and `splittable == False`, the reading is not 
splittable and no progress on it.
   
   Does it correct?

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -181,39 +193,153 @@ def expand(self, root):
                 self.args,
                 self.kwargs,
                 self.incremental,
+                self.splittable,
                 self.binary)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
-  def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+  """A wrapper of a file-like object representing the restriction of the
+  underling handle according to the given SDF restriction tracker, breaking
+  the file only after the given delimiter.
+
+  For example, if the underling restriction is [103, 607) and each line were
+  exactly 10 characters long (i.e. every 10th charcter was a newline), than 
this

Review comment:
       ```suggestion
     exactly 10 characters long (i.e. every 10th charcter was a newline), then 
this
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to