robertwb commented on a change in pull request #13488:
URL: https://github.com/apache/beam/pull/13488#discussion_r543754989



##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -31,16 +33,28 @@
 _DEFAULT_BYTES_CHUNKSIZE = 1 << 20
 
 
-def read_csv(path, *args, **kwargs):
+def read_csv(path, *args, splittable=False, **kwargs):
   """Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
 
   Use this as
 
       df = p | beam.dataframe.io.read_csv(...)
 
   to get a deferred Beam dataframe representing the contents of the file.
+
+  If your files are large and records do not contain quoted newlines, you may
+  pass the extra argument splittable=True to enable dynamic splitting for this
+  read.

Review comment:
       Good point. Done.

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,134 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)
+
+
+class _DelimSplitter(_Splitter):
+  def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,133 @@ def expand(self, root):
                 self.reader,
                 self.args,
                 self.kwargs,
+                self.binary,
                 self.incremental,
-                self.splittable,
-                self.binary)))
+                self.splitter)))
     from apache_beam.dataframe import convert
     return convert.to_dataframe(
         pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
 
 
+class _Splitter:
+  def empty_buffer(self):
+    raise NotImplementedError(self)
+
+  def read_header(self, handle):
+    raise NotImplementedError(self)
+
+  def read_to_record_boundary(self, buffered, handle):
+    raise NotImplementedError(self)

Review comment:
       I actually tried to add typehints initially; they work poorly due to the 
fact that this works both on bytes and strings (but it's hard for mypy to know 
that it will be all one or all the other). But I added some comments at least. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to