Repository: incubator-beam Updated Branches: refs/heads/python-sdk 1530a1727 -> ad4dc87a4
Improve the speed of getting file sizes Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7a059d37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7a059d37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7a059d37 Branch: refs/heads/python-sdk Commit: 7a059d37e71b62702e8cdeafec6956fc7e1e38c4 Parents: 1530a17 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Mon Nov 21 15:50:21 2016 -0800 Committer: Davor Bonaci <da...@google.com> Committed: Mon Nov 28 17:40:37 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 42 ++++++++++++++-------- sdks/python/apache_beam/io/fileio.py | 31 ++++++++++++++++ sdks/python/apache_beam/io/fileio_test.py | 41 +++++++++++++++++++++ sdks/python/apache_beam/io/gcsio.py | 25 +++++++++++++ sdks/python/apache_beam/io/gcsio_test.py | 38 ++++++++++++++++++++ 5 files changed, 163 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 7d8f686..14eaf27 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -107,7 +107,8 @@ class FileBasedSource(iobase.BoundedSource): if self._concat_source is None: single_file_sources = [] file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] - sizes = FileBasedSource._estimate_sizes_in_parallel(file_names) + sizes = FileBasedSource._estimate_sizes_of_files(file_names, + self._pattern) # We create a reference for FileBasedSource that will be serialized along # with each _SingleFileSource. To prevent this FileBasedSource from having @@ -144,22 +145,32 @@ class FileBasedSource(iobase.BoundedSource): compression_type=self._compression_type) @staticmethod - def _estimate_sizes_in_parallel(file_names): + def _estimate_sizes_of_files(file_names, pattern=None): + """Returns the size of all the files as an ordered list based on the file + names that are provided here. If the pattern is specified here then we use + the size_of_files_in_glob method to get the size of files matching the glob + for performance improvements instead of getting the size one by one. + """ if not file_names: return [] elif len(file_names) == 1: return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: - # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a - # child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - pool = ThreadPool( - min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) - try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) - finally: - pool.terminate() + if pattern is None: + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): + threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool( + min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) + try: + return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) + finally: + pool.terminate() + else: + file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, + file_names) + return [file_sizes[f] for f in file_names] def _validate(self): """Validate if there are actual files in the specified glob pattern @@ -179,7 +190,10 @@ class FileBasedSource(iobase.BoundedSource): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] if (len(file_names) <= FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT): - return sum(self._estimate_sizes_in_parallel(file_names)) + # We're reading very few files so we can pass names without pattern + # as otherwise we'll try to do optimization based on the pattern and + # might end up reading much more data than needed for a few files. + return sum(self._estimate_sizes_of_files(file_names)) else: # Estimating size of a random sample. # TODO: better support distributions where file sizes are not @@ -188,7 +202,7 @@ class FileBasedSource(iobase.BoundedSource): int(len(file_names) * FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT)) sample = random.sample(file_names, sample_size) - estimate = self._estimate_sizes_in_parallel(sample) + estimate = self._estimate_sizes_of_files(sample, self._pattern) return int( sum(estimate) * (float(len(file_names)) / len(sample))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 30044c3..c71a730 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -606,6 +606,37 @@ class ChannelFactory(object): else: return os.path.getsize(path) + @staticmethod + def size_of_files_in_glob(path, file_names=None): + """Returns a map of file names to sizes. + + Args: + path: a file path pattern that reads the size of all the files + file_names: List of file names that we need size for, this is added to + support eventually consistent sources where two expantions of glob + might yield to different files. + """ + if path.startswith('gs://'): + file_sizes = gcsio.GcsIO().size_of_files_in_glob(path) + if file_names is None: + return file_sizes + else: + result = {} + # We need to make sure we fetched the size for all the files as the + # list API in GCS is eventually consistent so directly call size for + # any files that may be missing. + for file_name in file_names: + if file_name in file_sizes: + result[file_name] = file_sizes[file_name] + else: + result[file_name] = ChannelFactory.size_in_bytes(file_name) + return result + else: + if file_names is None: + file_names = ChannelFactory.glob(path) + return {file_name: ChannelFactory.size_in_bytes(file_name) + for file_name in file_names} + class _CompressedFile(object): """Somewhat limited file wrapper for easier handling of compressed files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 098ace1..a68d484 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -582,6 +582,47 @@ class TestTextFileSource(unittest.TestCase): self.progress_with_offsets(lines, start_offset=14) self.progress_with_offsets(lines, start_offset=20, end_offset=20) + @mock.patch('apache_beam.io.fileio.gcsio') + def test_size_of_files_in_glob_complete(self, *unused_args): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + fileio.gcsio.GcsIO = lambda: gcsio_mock + file_names = ['gs://bucket/file1', 'gs://bucket/file2'] + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + expected_results = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + self.assertEqual( + fileio.ChannelFactory.size_of_files_in_glob( + 'gs://bucket/*', file_names), + expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + + @mock.patch('apache_beam.io.fileio.gcsio') + def test_size_of_files_in_glob_incomplete(self, *unused_args): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + fileio.gcsio.GcsIO = lambda: gcsio_mock + file_names = ['gs://bucket/file1', 'gs://bucket/file2'] + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1 + } + gcsio_mock.size.return_value = 2 + expected_results = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + self.assertEqual( + fileio.ChannelFactory.size_of_files_in_glob( + 'gs://bucket/*', file_names), + expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + gcsio_mock.size.assert_called_once_with('gs://bucket/file2') + class TestNativeTextFileSink(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 4f310be..9adb946 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -353,6 +353,31 @@ class GcsIO(object): bucket=bucket, object=object_path) return self.client.objects.Get(request).size + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def size_of_files_in_glob(self, pattern): + """Returns the size of all the files in the glob as a dictionary + + Args: + path: a file path pattern that reads the size of all the files + """ + bucket, name_pattern = parse_gcs_path(pattern) + # Get the prefix with which we can list objects in the given bucket. + prefix = re.match('^[^[*?]*', name_pattern).group(0) + request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix) + file_sizes = {} + while True: + response = self.client.objects.List(request) + for item in response.items: + if fnmatch.fnmatch(item.name, name_pattern): + file_name = 'gs://%s/%s' % (item.bucket, item.name) + file_sizes[file_name] = item.size + if response.nextPageToken: + request.pageToken = response.nextPageToken + else: + break + return file_sizes + class GcsBufferedReader(object): """A class for reading Google Cloud Storage files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 95c8e58..9d44e17 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -652,6 +652,44 @@ class TestGCSIO(unittest.TestCase): self.assertEqual( set(self.gcs.glob(file_pattern)), set(expected_file_names)) + def test_size_of_files_in_glob(self): + bucket_name = 'gcsio-test' + object_names = [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ('apple/dog/blubber', 5), + ('apple/fish/blubber', 6), + ('apple/fish/blowfish', 7), + ('apple/fish/bambi', 8), + ('apple/fish/balloon', 9), + ('apple/fish/cat', 10), + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ('apple/dish/bat', 13), + ('apple/dish/cat', 14), + ('apple/dish/carl', 15), + ] + for (object_name, size) in object_names: + file_name = 'gs://%s/%s' % (bucket_name, object_name) + self._insert_random_file(self.client, file_name, size) + test_cases = [ + ('gs://gcsio-test/cow/*', [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ]), + ('gs://gcsio-test/apple/fish/car?', [ + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ]) + ] + for file_pattern, expected_object_names in test_cases: + expected_file_sizes = {'gs://%s/%s' % (bucket_name, o): s + for (o, s) in expected_object_names} + self.assertEqual( + self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes) + class TestPipeStream(unittest.TestCase):