Repository: incubator-beam Updated Branches: refs/heads/python-sdk 560fe79f8 -> 66f324b35
Use batch GCS operations during FileSink write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1 Branch: refs/heads/python-sdk Commit: 313191e129b884e4e14e9f503a757147d368217c Parents: 560fe79 Author: Charles Chen <c...@google.com> Authored: Thu Nov 10 11:54:08 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Nov 15 08:48:47 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 177 ++++++++++++++++--------- sdks/python/apache_beam/io/fileio_test.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 78 +++++++++++ sdks/python/apache_beam/io/gcsio_test.py | 103 +++++++++++++- sdks/python/apache_beam/utils/retry.py | 3 + 5 files changed, 298 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 669bfc9..ef20a7c 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -31,6 +31,7 @@ import zlib import weakref from apache_beam import coders +from apache_beam.io import gcsio from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.runners.dataflow.native_io import iobase as dataflow_io @@ -451,8 +452,6 @@ class ChannelFactory(object): 'was %s' % type(compression_type)) if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio raw_file = gcsio.GcsIO().open( path, mode, @@ -470,40 +469,92 @@ class ChannelFactory(object): return isinstance(fileobj, _CompressedFile) @staticmethod - def rename(src, dst): + def rename(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio - gcsio.GcsIO().rename(src, dst) + if not dest.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', dest) + gcsio.GcsIO().rename(src, dest) else: try: - os.rename(src, dst) + os.rename(src, dest) except OSError as err: raise IOError(err) @staticmethod - def copytree(src, dst): + def rename_batch(src_dest_pairs): + # Filter out local and GCS operations. + local_src_dest_pairs = [] + gcs_src_dest_pairs = [] + for src, dest in src_dest_pairs: + if src.startswith('gs://'): + if not dest.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', dest) + gcs_src_dest_pairs.append((src, dest)) + else: + local_src_dest_pairs.append((src, dest)) + + # Execute local operations. + exceptions = [] + for src, dest in local_src_dest_pairs: + try: + ChannelFactory.rename(src, dest) + except Exception as e: # pylint: disable=broad-except + exceptions.append((src, dest, e)) + + # Execute GCS operations. + exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs) + + return exceptions + + @staticmethod + def _rename_gcs_batch(src_dest_pairs): + # Prepare batches. + gcs_batches = [] + gcs_current_batch = [] + for src, dest in src_dest_pairs: + if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + gcs_batches.append(gcs_current_batch) + gcs_current_batch = [] + if gcs_current_batch: + gcs_batches.append(gcs_current_batch) + + # Execute GCS renames if any and return exceptions. + exceptions = [] + for batch in gcs_batches: + copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_succeeded = [] + for src, dest, exception in copy_statuses: + if exception: + exceptions.append((src, dest, exception)) + else: + copy_succeeded.append((src, dest)) + delete_batch = [src for src, dest in copy_succeeded] + delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + for i, (src, exception) in enumerate(delete_statuses): + dest = copy_succeeded[i] + if exception: + exceptions.append((src, dest, exception)) + return exceptions + + @staticmethod + def copytree(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst + if not dest.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', dest) assert src.endswith('/'), src - assert dst.endswith('/'), dst - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio - gcsio.GcsIO().copytree(src, dst) + assert dest.endswith('/'), dest + gcsio.GcsIO().copytree(src, dest) else: try: - if os.path.exists(dst): - shutil.rmtree(dst) - shutil.copytree(src, dst) + if os.path.exists(dest): + shutil.rmtree(dest) + shutil.copytree(src, dest) except OSError as err: raise IOError(err) @staticmethod def exists(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio return gcsio.GcsIO().exists(path) else: return os.path.exists(path) @@ -511,8 +562,6 @@ class ChannelFactory(object): @staticmethod def rmdir(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio gcs = gcsio.GcsIO() if not path.endswith('/'): path += '/' @@ -528,8 +577,6 @@ class ChannelFactory(object): @staticmethod def rm(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio gcsio.GcsIO().delete(path) else: try: @@ -540,8 +587,6 @@ class ChannelFactory(object): @staticmethod def glob(path): if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio return gcsio.GcsIO().glob(path) else: return glob.glob(path) @@ -554,8 +599,6 @@ class ChannelFactory(object): path: a string that gives the path of a single file. """ if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio return gcsio.GcsIO().size(path) else: return os.path.getsize(path) @@ -803,7 +846,6 @@ class FileSink(iobase.Sink): def finalize_write(self, init_result, writer_results): writer_results = sorted(writer_results) num_shards = len(writer_results) - channel_factory = ChannelFactory() min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS) num_threads = max(1, min_threads) @@ -815,55 +857,66 @@ class FileSink(iobase.Sink): ]) rename_ops.append((shard, final_name)) + batches = [] + current_batch = [] + for rename_op in rename_ops: + current_batch.append(rename_op) + if len(current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + batches.append(current_batch) + current_batch = [] + if current_batch: + batches.append(current_batch) + logging.info( - 'Starting finalize_write threads with num_shards: %d, num_threads: %d', - num_shards, num_threads) + 'Starting finalize_write threads with num_shards: %d, ' + 'batches: %d, num_threads: %d', + num_shards, len(batches), num_threads) start_time = time.time() # Use a thread pool for renaming operations. - def _rename_file(rename_op): - """_rename_file executes single (old_name, new_name) rename operation.""" - old_name, final_name = rename_op - try: - channel_factory.rename(old_name, final_name) - except IOError as e: - # May have already been copied. - try: - exists = channel_factory.exists(final_name) - except Exception as exists_e: # pylint: disable=broad-except - logging.warning('Exception when checking if file %s exists: ' - '%s', final_name, exists_e) - # Returning original exception after logging the exception from - # exists() call. - return (None, e) - if not exists: - logging.warning(('IOError in _rename_file. old_name: %s, ' - 'final_name: %s, err: %s'), old_name, final_name, e) - return (None, e) - except Exception as e: # pylint: disable=broad-except - logging.warning(('Exception in _rename_file. old_name: %s, ' - 'final_name: %s, err: %s'), old_name, final_name, e) - return (None, e) - return (final_name, None) + def _rename_batch(batch): + """_rename_batch executes batch rename operations.""" + exceptions = [] + exception_infos = ChannelFactory.rename_batch(batch) + for src, dest, exception in exception_infos: + if exception: + should_report = True + if isinstance(exception, IOError): + # May have already been copied. + try: + if ChannelFactory.exists(dest): + should_report = False + except Exception as exists_e: # pylint: disable=broad-except + logging.warning('Exception when checking if file %s exists: ' + '%s', dest, exists_e) + if should_report: + logging.warning(('Exception in _rename_batch. src: %s, ' + 'dest: %s, err: %s'), src, dest, exception) + exceptions.append(exception) + return exceptions # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a # child thread. (http://bugs.python.org/issue10015) if not hasattr(threading.current_thread(), '_children'): threading.current_thread()._children = weakref.WeakKeyDictionary() - rename_results = ThreadPool(num_threads).map(_rename_file, rename_ops) + exception_batches = ThreadPool(num_threads).map(_rename_batch, batches) - for final_name, err in rename_results: - if err: - logging.warning('Error when processing rename_results: %s', err) - raise err - else: - yield final_name + all_exceptions = [] + for exceptions in exception_batches: + if exceptions: + all_exceptions += exceptions + if all_exceptions: + raise Exception('Encountered exceptions in finalize_write: %s', + all_exceptions) + + for shard, final_name in rename_ops: + yield final_name logging.info('Renamed %d shards in %.2f seconds.', num_shards, time.time() - start_time) try: - channel_factory.rmdir(init_result) + ChannelFactory.rmdir(init_result) except IOError: # May have already been removed. pass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 7e6e60b..b55fa19 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -848,7 +848,7 @@ class TestFileSink(unittest.TestCase): res2 = writer2.close() os.remove(res2) - with self.assertRaises(IOError): + with self.assertRaises(Exception): list(sink.finalize_write(init_token, [res1, res2])) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 9fcce5b..1b08994 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -31,6 +31,7 @@ import threading import traceback from apitools.base.py.exceptions import HttpError +from apitools.base.py.batch import BatchApiRequest import apitools.base.py.transfer as transfer from apache_beam.internal import auth @@ -50,6 +51,11 @@ DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 WRITE_CHUNK_SIZE = 8 * 1024 * 1024 +# Maximum number of operations permitted in GcsIO.copy_batch() and +# GcsIO.delete_batch(). +MAX_BATCH_OPERATION_SIZE = 100 + + def parse_gcs_path(gcs_path): """Return the bucket and object names of the given gs:// path.""" match = re.match('^gs://([^/]+)/(.+)$', gcs_path) @@ -167,6 +173,39 @@ class GcsIO(object): return raise + # We intentionally do not decorate this method with a retry, as retrying is + # handled in BatchApiRequest.Execute(). + def delete_batch(self, paths): + """Deletes the objects at the given GCS paths. + + Args: + paths: List of GCS file path patterns in the form gs://<bucket>/<name>, + not to exceed MAX_BATCH_OPERATION_SIZE in length. + + Returns: List of tuples of (path, exception) in the same order as the paths + argument, where exception is None if the operation succeeded or + the relevant exception if the operation failed. + """ + batch_request = BatchApiRequest( + retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) + for path in paths: + bucket, object_path = parse_gcs_path(path) + request = storage.StorageObjectsDeleteRequest( + bucket=bucket, object=object_path) + batch_request.Add(self.client.objects, 'Delete', request) + api_calls = batch_request.Execute(self.client._http) # pylint: disable=protected-access + result_statuses = [] + for i, api_call in enumerate(api_calls): + path = paths[i] + exception = None + if api_call.is_error: + exception = api_call.exception + # Return success when the file doesn't exist anymore for idempotency. + if isinstance(exception, HttpError) and exception.status_code == 404: + exception = None + result_statuses.append((path, exception)) + return result_statuses + @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) def copy(self, src, dest): @@ -193,6 +232,45 @@ class GcsIO(object): raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src) raise + # We intentionally do not decorate this method with a retry, as retrying is + # handled in BatchApiRequest.Execute(). + def copy_batch(self, src_dest_pairs): + """Copies the given GCS object from src to dest. + + Args: + src_dest_pairs: list of (src, dest) tuples of gs://<bucket>/<name> files + paths to copy from src to dest, not to exceed + MAX_BATCH_OPERATION_SIZE in length. + + Returns: List of tuples of (src, dest, exception) in the same order as the + src_dest_pairs argument, where exception is None if the operation + succeeded or the relevant exception if the operation failed. + """ + batch_request = BatchApiRequest( + retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) + for src, dest in src_dest_pairs: + src_bucket, src_path = parse_gcs_path(src) + dest_bucket, dest_path = parse_gcs_path(dest) + request = storage.StorageObjectsCopyRequest( + sourceBucket=src_bucket, + sourceObject=src_path, + destinationBucket=dest_bucket, + destinationObject=dest_path) + batch_request.Add(self.client.objects, 'Copy', request) + api_calls = batch_request.Execute(self.client._http) # pylint: disable=protected-access + result_statuses = [] + for i, api_call in enumerate(api_calls): + src, dest = src_dest_pairs[i] + exception = None + if api_call.is_error: + exception = api_call.exception + # Translate 404 to the appropriate not found exception. + if isinstance(exception, HttpError) and exception.status_code == 404: + exception = ( + GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)) + result_statuses.append((src, dest, exception)) + return result_statuses + # We intentionally do not decorate this method with a retry, since the # underlying copy and delete operations are already idempotent operations # protected by retry decorators. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 2e9945a..95c8e58 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -16,6 +16,7 @@ # """Tests for Google Cloud Storage client.""" +import errno import logging import multiprocessing import os @@ -24,11 +25,11 @@ import threading import unittest import httplib2 +import mock from apitools.base.py.exceptions import HttpError from apache_beam.internal.clients import storage from apache_beam.io import gcsio -from mock import patch class FakeGcsClient(object): @@ -37,6 +38,8 @@ class FakeGcsClient(object): def __init__(self): self.objects = FakeGcsObjects() + # Referenced in GcsIO.batch_copy() and GcsIO.batch_delete(). + self._http = object() class FakeFile(object): @@ -165,6 +168,33 @@ class FakeGcsObjects(object): return result +class FakeApiCall(object): + + def __init__(self, exception): + self.exception = exception + self.is_error = exception is not None + + +class FakeBatchApiRequest(object): + + def __init__(self, **unused_kwargs): + self.operations = [] + + def Add(self, service, method, request): # pylint: disable=invalid-name + self.operations.append((service, method, request)) + + def Execute(self, unused_http, **unused_kwargs): # pylint: disable=invalid-name + api_calls = [] + for service, method, request in self.operations: + exception = None + try: + getattr(service, method)(request) + except Exception as e: # pylint: disable=broad-except + exception = e + api_calls.append(FakeApiCall(exception)) + return api_calls + + class TestGCSPathParser(unittest.TestCase): def test_gcs_path(self): @@ -201,7 +231,7 @@ class TestGCSIO(unittest.TestCase): self.assertFalse(self.gcs.exists(file_name + 'xyz')) self.assertTrue(self.gcs.exists(file_name)) - @patch.object(FakeGcsObjects, 'Get') + @mock.patch.object(FakeGcsObjects, 'Get') def test_exists_failure(self, mock_get): # Raising an error other than 404. Raising 404 is a valid failure for # exists() call. @@ -251,6 +281,37 @@ class TestGCSIO(unittest.TestCase): self.assertFalse( gcsio.parse_gcs_path(file_name) in self.client.objects.files) + @mock.patch('apache_beam.io.gcsio.BatchApiRequest') + def test_delete_batch(self, *unused_args): + gcsio.BatchApiRequest = FakeBatchApiRequest + file_name_pattern = 'gs://gcsio-test/delete_me_%d' + file_size = 1024 + num_files = 10 + + # Test deletion of non-existent files. + result = self.gcs.delete_batch( + [file_name_pattern % i for i in range(num_files)]) + self.assertTrue(result) + for i, (file_name, exception) in enumerate(result): + self.assertEqual(file_name, file_name_pattern % i) + self.assertEqual(exception, None) + self.assertFalse(self.gcs.exists(file_name_pattern % i)) + + # Insert some files. + for i in range(num_files): + self._insert_random_file(self.client, file_name_pattern % i, file_size) + + # Check files inserted properly. + for i in range(num_files): + self.assertTrue(self.gcs.exists(file_name_pattern % i)) + + # Execute batch delete. + self.gcs.delete_batch([file_name_pattern % i for i in range(num_files)]) + + # Check files deleted properly. + for i in range(num_files): + self.assertFalse(self.gcs.exists(file_name_pattern % i)) + def test_copy(self): src_file_name = 'gs://gcsio-test/source' dest_file_name = 'gs://gcsio-test/dest' @@ -271,6 +332,44 @@ class TestGCSIO(unittest.TestCase): self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent', 'gs://gcsio-test/non-existent-destination') + @mock.patch('apache_beam.io.gcsio.BatchApiRequest') + def test_copy_batch(self, *unused_args): + gcsio.BatchApiRequest = FakeBatchApiRequest + from_name_pattern = 'gs://gcsio-test/copy_me_%d' + to_name_pattern = 'gs://gcsio-test/destination_%d' + file_size = 1024 + num_files = 10 + + # Test copy of non-existent files. + result = self.gcs.copy_batch( + [(from_name_pattern % i, to_name_pattern % i) + for i in range(num_files)]) + self.assertTrue(result) + for i, (src, dest, exception) in enumerate(result): + self.assertEqual(src, from_name_pattern % i) + self.assertEqual(dest, to_name_pattern % i) + self.assertTrue(isinstance(exception, IOError)) + self.assertEqual(exception.errno, errno.ENOENT) + self.assertFalse(self.gcs.exists(from_name_pattern % i)) + self.assertFalse(self.gcs.exists(to_name_pattern % i)) + + # Insert some files. + for i in range(num_files): + self._insert_random_file(self.client, from_name_pattern % i, file_size) + + # Check files inserted properly. + for i in range(num_files): + self.assertTrue(self.gcs.exists(from_name_pattern % i)) + + # Execute batch copy. + self.gcs.copy_batch([(from_name_pattern % i, to_name_pattern % i) + for i in range(num_files)]) + + # Check files copied properly. + for i in range(num_files): + self.assertTrue(self.gcs.exists(from_name_pattern % i)) + self.assertTrue(self.gcs.exists(to_name_pattern % i)) + def test_copytree(self): src_dir_name = 'gs://gcsio-test/source/' dest_dir_name = 'gs://gcsio-test/dest/' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/utils/retry.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 67cbeb2..1f5af88 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -98,6 +98,9 @@ def retry_on_server_errors_and_timeout_filter(exception): return retry_on_server_errors_filter(exception) +SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599] + + class Clock(object): """A simple clock implementing sleep()."""