TheNeuralBit commented on a change in pull request #13488:
URL: https://github.com/apache/beam/pull/13488#discussion_r542981589
##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,133 @@ def expand(self, root):
self.reader,
self.args,
self.kwargs,
+ self.binary,
self.incremental,
- self.splittable,
- self.binary)))
+ self.splitter)))
from apache_beam.dataframe import convert
return convert.to_dataframe(
pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
+class _Splitter:
+ def empty_buffer(self):
+ raise NotImplementedError(self)
+
+ def read_header(self, handle):
+ raise NotImplementedError(self)
+
+ def read_to_record_boundary(self, buffered, handle):
+ raise NotImplementedError(self)
Review comment:
Could you add docstrings and/or typehints for this class so its clear
what implementations should do
##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -192,14 +207,134 @@ def expand(self, root):
self.reader,
self.args,
self.kwargs,
+ self.binary,
self.incremental,
- self.splittable,
- self.binary)))
+ self.splitter)))
from apache_beam.dataframe import convert
return convert.to_dataframe(
pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
+class _Splitter:
+ def empty_buffer(self):
+ raise NotImplementedError(self)
+
+ def read_header(self, handle):
+ raise NotImplementedError(self)
+
+ def read_to_record_boundary(self, buffered, handle):
+ raise NotImplementedError(self)
+
+
+class _DelimSplitter(_Splitter):
+ def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
Review comment:
Might be helpful to clarify that this is splitting on delimiters between
_records_, it's easy to get mixed up in the CSV case where there's also a field
delimiter.
```suggestion
class _RecordDelimSplitter(_Splitter):
""" A _Splitter that splits on delimiters between records. """
def __init__(self, delim, read_chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
```
##########
File path: sdks/python/apache_beam/dataframe/io.py
##########
@@ -31,16 +33,28 @@
_DEFAULT_BYTES_CHUNKSIZE = 1 << 20
-def read_csv(path, *args, **kwargs):
+def read_csv(path, *args, splittable=False, **kwargs):
"""Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
Use this as
df = p | beam.dataframe.io.read_csv(...)
to get a deferred Beam dataframe representing the contents of the file.
+
+ If your files are large and records do not contain quoted newlines, you may
+ pass the extra argument splittable=True to enable dynamic splitting for this
+ read.
Review comment:
Can we make this warn that using splittable=True with quoted newlines
could lead to dataloss?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]