This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bcea54f [BEAM-11361] Dynamic splitting of SDF IOs. (#13443)
bcea54f is described below
commit bcea54f20574ea6658e7afe934213a7246bbcfaf
Author: Robert Bradshaw <[email protected]>
AuthorDate: Thu Dec 3 12:37:49 2020 -0800
[BEAM-11361] Dynamic splitting of SDF IOs. (#13443)
For now, only json lines are supported. CSV files are more complicated.
---
sdks/python/apache_beam/dataframe/io.py | 143 ++++++++++++++++++++-
sdks/python/apache_beam/dataframe/io_test.py | 41 ++++++
sdks/python/apache_beam/io/restriction_trackers.py | 17 +++
3 files changed, 195 insertions(+), 6 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/io.py
b/sdks/python/apache_beam/dataframe/io.py
index b1af416..2f6404d 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -27,6 +27,9 @@ from apache_beam import io
from apache_beam.dataframe import frame_base
from apache_beam.io import fileio
+_DEFAULT_LINES_CHUNKSIZE = 10_000
+_DEFAULT_BYTES_CHUNKSIZE = 1 << 20
+
def read_csv(path, *args, **kwargs):
"""Emulates `pd.read_csv` from Pandas, but as a Beam PTransform.
@@ -62,6 +65,7 @@ def read_json(path, *args, **kwargs):
args,
kwargs,
incremental=kwargs.get('lines', False),
+ splittable=kwargs.get('lines', False),
binary=False)
@@ -143,7 +147,14 @@ def _prefix_range_index_with(prefix, df):
class _ReadFromPandas(beam.PTransform):
def __init__(
- self, reader, path, args, kwargs, incremental=False, binary=True):
+ self,
+ reader,
+ path,
+ args,
+ kwargs,
+ incremental=False,
+ splittable=False,
+ binary=True):
if 'compression' in kwargs:
raise NotImplementedError('compression')
if not isinstance(path, str):
@@ -153,6 +164,7 @@ class _ReadFromPandas(beam.PTransform):
self.args = args
self.kwargs = kwargs
self.incremental = incremental
+ self.splittable = splittable
self.binary = binary
def expand(self, root):
@@ -181,15 +193,105 @@ class _ReadFromPandas(beam.PTransform):
self.args,
self.kwargs,
self.incremental,
+ self.splittable,
self.binary)))
from apache_beam.dataframe import convert
return convert.to_dataframe(
pcoll, proxy=_prefix_range_index_with(':', sample[:0]))
-# TODO(robertwb): Actually make an SDF.
-class _ReadFromPandasDoFn(beam.DoFn):
- def __init__(self, reader, args, kwargs, incremental, binary):
+class _TruncatingFileHandle(object):
+ """A wrapper of a file-like object representing the restriction of the
+ underling handle according to the given SDF restriction tracker, breaking
+ the file only after the given delimiter.
+
+ For example, if the underling restriction is [103, 607) and each line were
+ exactly 10 characters long (i.e. every 10th charcter was a newline), then
this
+ would give a view of a 500-byte file consisting of bytes bytes 110 to 609
+ (inclusive) of the underlying file.
+
+ As with all SDF trackers, the endpoint may change dynamically during reading.
+ """
+ def __init__(
+ self,
+ underlying,
+ tracker,
+ delim=b'\n',
+ chunk_size=_DEFAULT_BYTES_CHUNKSIZE):
+ self._underlying = underlying
+ self._tracker = tracker
+ self._buffer_start_pos = self._tracker.current_restriction().start
+ self._delim = delim
+ self._chunk_size = chunk_size
+
+ self._buffer = self._empty = self._delim[:0]
+ self._done = False
+ if self._buffer_start_pos > 0:
+ # Seek to first delimiter after the start position.
+ self._underlying.seek(self._buffer_start_pos)
+ if self.buffer_to_delim():
+ line_start = self._buffer.index(self._delim) + len(self._delim)
+ self._buffer_start_pos += line_start
+ self._buffer = self._buffer[line_start:]
+ else:
+ self._done = True
+
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return False
+
+ @property
+ def closed(self):
+ return False
+
+ def __iter__(self):
+ # For pandas is_file_like.
+ raise NotImplementedError()
+
+ def buffer_to_delim(self, offset=0):
+ """Read enough of the file such that the buffer contains the delimiter, or
+ end-of-file is reached.
+ """
+ if self._delim in self._buffer[offset:]:
+ return True
+ while True:
+ chunk = self._underlying.read(self._chunk_size)
+ self._buffer += chunk
+ if self._delim in chunk:
+ return True
+ elif not chunk:
+ return False
+
+ def read(self, size=-1):
+ if self._done:
+ return self._empty
+ elif size == -1:
+ self._buffer += self._underlying.read()
+ elif not self._buffer:
+ self._buffer = self._underlying.read(size)
+
+ if self._tracker.try_claim(self._buffer_start_pos + len(self._buffer)):
+ res = self._buffer
+ self._buffer = self._empty
+ self._buffer_start_pos += len(res)
+ else:
+ offset = self._tracker.current_restriction().stop -
self._buffer_start_pos
+ if self.buffer_to_delim(offset):
+ end_of_line = self._buffer.index(self._delim, offset)
+ res = self._buffer[:end_of_line + len(self._delim)]
+ else:
+ res = self._buffer
+ self._done = True
+ return res
+
+
+class _ReadFromPandasDoFn(beam.DoFn, beam.RestrictionProvider):
+ def __init__(self, reader, args, kwargs, incremental, splittable, binary):
# avoid pickling issues
if reader.__module__.startswith('pandas.'):
reader = reader.__name__
@@ -197,23 +299,52 @@ class _ReadFromPandasDoFn(beam.DoFn):
self.args = args
self.kwargs = kwargs
self.incremental = incremental
+ self.splittable = splittable
self.binary = binary
- def process(self, readable_file):
+ def initial_restriction(self, readable_file):
+ return beam.io.restriction_trackers.OffsetRange(
+ 0, readable_file.metadata.size_in_bytes)
+
+ def restriction_size(self, readable_file, restriction):
+ return restriction.size()
+
+ def create_tracker(self, restriction):
+ tracker =
beam.io.restriction_trackers.OffsetRestrictionTracker(restriction)
+ if self.splittable:
+ return tracker
+ else:
+ return beam.io.restriction_trackers.UnsplittableRestrictionTracker(
+ tracker)
+
+ def process(self, readable_file, tracker=beam.DoFn.RestrictionParam()):
reader = self.reader
if isinstance(reader, str):
reader = getattr(pd, self.reader)
with readable_file.open() as handle:
+ if self.incremental:
+ # We can get progress even if we can't split.
+ # TODO(robertwb): We could consider trying to get progress for
+ # non-incremental sources that are read linearly, as long as they
+ # don't try to seek. This could be deceptive as progress would
+ # advance to 100% the instant the (large) read was done, discounting
+ # any downstream processing.
+ handle = _TruncatingFileHandle(handle, tracker)
if not self.binary:
handle = TextIOWrapper(handle)
if self.incremental:
if 'chunksize' not in self.kwargs:
- self.kwargs['chunksize'] = 10_000
+ self.kwargs['chunksize'] = _DEFAULT_LINES_CHUNKSIZE
frames = reader(handle, *self.args, **self.kwargs)
else:
frames = [reader(handle, *self.args, **self.kwargs)]
for df in frames:
yield _prefix_range_index_with(readable_file.metadata.path + ':', df)
+ if not self.incremental:
+ # Satisfy the SDF contract by claiming the whole range.
+ # Do this after emitting the frames to avoid advancing progress to 100%
+ # prior to that.
+ tracker.try_claim(tracker.current_restriction().stop)
class _WriteToPandas(beam.PTransform):
diff --git a/sdks/python/apache_beam/dataframe/io_test.py
b/sdks/python/apache_beam/dataframe/io_test.py
index a86961a..de67eff 100644
--- a/sdks/python/apache_beam/dataframe/io_test.py
+++ b/sdks/python/apache_beam/dataframe/io_test.py
@@ -26,6 +26,7 @@ import shutil
import sys
import tempfile
import unittest
+from io import StringIO
import pandas as pd
from pandas.testing import assert_frame_equal
@@ -34,6 +35,7 @@ from parameterized import parameterized
import apache_beam as beam
from apache_beam.dataframe import convert
from apache_beam.dataframe import io
+from apache_beam.io import restriction_trackers
from apache_beam.testing.util import assert_that
@@ -160,6 +162,45 @@ class IOTest(unittest.TestCase):
os.system('head -n 100 ' + dest + '*')
raise
+ def _run_truncating_file_handle_test(
+ self, s, splits, delim=' ', chunk_size=10):
+ split_results = []
+ next_range = restriction_trackers.OffsetRange(0, len(s))
+ for split in list(splits) + [None]:
+ tracker = restriction_trackers.OffsetRestrictionTracker(next_range)
+ handle = io._TruncatingFileHandle(
+ StringIO(s), tracker, delim=delim, chunk_size=chunk_size)
+ data = ''
+ chunk = handle.read(1)
+ if split is not None:
+ _, next_range = tracker.try_split(split)
+ while chunk:
+ data += chunk
+ chunk = handle.read(7)
+ split_results.append(data)
+ return split_results
+
+ def test_truncating_filehandle(self):
+ self.assertEqual(
+ self._run_truncating_file_handle_test('a b c d e', [0.5]),
+ ['a b c ', 'd e'])
+ self.assertEqual(
+ self._run_truncating_file_handle_test('aaaaaaaaaaaaaaXaaa b', [0.5]),
+ ['aaaaaaaaaaaaaaXaaa ', 'b'])
+ self.assertEqual(
+ self._run_truncating_file_handle_test(
+ 'aa bbbbbbbbbbbbbbbbbbbbbbbbbb ccc ', [0.01, 0.5]),
+ ['aa ', 'bbbbbbbbbbbbbbbbbbbbbbbbbb ', 'ccc '])
+
+ numbers = 'x'.join(str(k) for k in range(1000))
+ splits = self._run_truncating_file_handle_test(
+ numbers, [0.1] * 20, delim='x')
+ self.assertEqual(numbers, ''.join(splits))
+ self.assertTrue(s.endswith('x') for s in splits[:-1])
+ self.assertLess(max(len(s) for s in splits), len(numbers) * 0.9 + 10)
+ self.assertGreater(
+ min(len(s) for s in splits), len(numbers) * 0.9**20 * 0.1)
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/io/restriction_trackers.py
b/sdks/python/apache_beam/io/restriction_trackers.py
index 2420c0b..00cefee 100644
--- a/sdks/python/apache_beam/io/restriction_trackers.py
+++ b/sdks/python/apache_beam/io/restriction_trackers.py
@@ -164,3 +164,20 @@ class OffsetRestrictionTracker(RestrictionTracker):
def is_bounded(self):
return True
+
+
+class UnsplittableRestrictionTracker(RestrictionTracker):
+ """An `iobase.RestrictionTracker` that wraps another but does not split."""
+ def __init__(self, underling_tracker):
+ self._underling_tracker = underling_tracker
+
+ def try_split(self, fraction_of_remainder):
+ return False
+
+ # __getattribute__ is used rather than __getattr__ to override the
+ # stubs in the baseclass.
+ def __getattribute__(self, name):
+ if name.startswith('_') or name in ('try_split', ):
+ return super(UnsplittableRestrictionTracker, self).__getattribute__(name)
+ else:
+ return getattr(self._underling_tracker, name)