BeamFileSystem implementation instead of IOChannelFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc240bc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc240bc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc240bc Branch: refs/heads/master Commit: 1bc240bc637883cb1c04253264534c9d545fdefa Parents: 7c7bb82 Author: Sourabh Bajaj <[email protected]> Authored: Wed Mar 1 23:01:34 2017 -0800 Committer: Chamikara Jayalath <[email protected]> Committed: Wed Mar 22 14:27:01 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/avroio.py | 3 +- sdks/python/apache_beam/io/filebasedsource.py | 83 +-- .../apache_beam/io/filebasedsource_test.py | 18 +- sdks/python/apache_beam/io/fileio.py | 588 +++---------------- sdks/python/apache_beam/io/fileio_test.py | 102 +--- sdks/python/apache_beam/io/filesystem.py | 439 ++++++++++++++ sdks/python/apache_beam/io/filesystems_util.py | 31 + sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 242 ++++++++ .../apache_beam/io/gcp/gcsfilesystem_test.py | 293 +++++++++ sdks/python/apache_beam/io/gcp/gcsio.py | 12 +- sdks/python/apache_beam/io/gcp/gcsio_test.py | 9 +- sdks/python/apache_beam/io/iobase.py | 4 +- sdks/python/apache_beam/io/localfilesystem.py | 236 ++++++++ .../apache_beam/io/localfilesystem_test.py | 185 ++++++ sdks/python/apache_beam/io/textio.py | 7 +- sdks/python/apache_beam/io/textio_test.py | 2 +- sdks/python/apache_beam/io/tfrecordio.py | 5 +- sdks/python/apache_beam/io/tfrecordio_test.py | 20 +- .../apache_beam/tests/pipeline_verifiers.py | 8 +- .../tests/pipeline_verifiers_test.py | 26 +- 21 files changed, 1613 insertions(+), 702 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 18f1506..5cb5ee5 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -856,7 +856,7 @@ def model_textio_compressed(renames, expected): # [START model_textio_write_compressed] lines = p | 'ReadFromText' >> beam.io.ReadFromText( '/path/to/input-*.csv.gz', - compression_type=beam.io.fileio.CompressionTypes.GZIP) + compression_type=beam.io.filesystem.CompressionTypes.GZIP) # [END model_textio_write_compressed] beam.assert_that(lines, beam.equal_to(expected)) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/avroio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 6fdd798..1c08c68 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -29,6 +29,7 @@ import apache_beam as beam from apache_beam.io import filebasedsource from apache_beam.io import fileio from apache_beam.io import iobase +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.iobase import Read from apache_beam.transforms import PTransform @@ -354,7 +355,7 @@ class _AvroSink(fileio.FileSink): mime_type=mime_type, # Compression happens at the block level using the supplied codec, and # not at the file level. - compression_type=fileio.CompressionTypes.UNCOMPRESSED) + compression_type=CompressionTypes.UNCOMPRESSED) self._schema = schema self._codec = codec http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 582d673..a3e0667 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -25,16 +25,13 @@ for more details. For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ -import random - from apache_beam.internal import pickler -from apache_beam.internal import util from apache_beam.io import concat_source -from apache_beam.io import fileio from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.transforms.display import DisplayDataItem - MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25 @@ -47,7 +44,7 @@ class FileBasedSource(iobase.BoundedSource): def __init__(self, file_pattern, min_bundle_size=0, - compression_type=fileio.CompressionTypes.AUTO, + compression_type=CompressionTypes.AUTO, splittable=True, validate=True): """Initializes ``FileBasedSource``. @@ -81,14 +78,15 @@ class FileBasedSource(iobase.BoundedSource): (self.__class__.__name__, file_pattern)) self._pattern = file_pattern + self._file_system = get_filesystem(file_pattern) self._concat_source = None self._min_bundle_size = min_bundle_size - if not fileio.CompressionTypes.is_valid_compression_type(compression_type): + if not CompressionTypes.is_valid_compression_type(compression_type): raise TypeError('compression_type must be CompressionType object but ' 'was %s' % type(compression_type)) self._compression_type = compression_type - if compression_type in (fileio.CompressionTypes.UNCOMPRESSED, - fileio.CompressionTypes.AUTO): + if compression_type in (CompressionTypes.UNCOMPRESSED, + CompressionTypes.AUTO): self._splittable = splittable else: # We can't split compressed files efficiently so turn off splitting. @@ -105,9 +103,8 @@ class FileBasedSource(iobase.BoundedSource): def _get_concat_source(self): if self._concat_source is None: single_file_sources = [] - file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] - sizes = FileBasedSource._estimate_sizes_of_files(file_names, - self._pattern) + match_result = self._file_system.match([self._pattern])[0] + files_metadata = match_result.metadata_list # We create a reference for FileBasedSource that will be serialized along # with each _SingleFileSource. To prevent this FileBasedSource from having @@ -115,23 +112,25 @@ class FileBasedSource(iobase.BoundedSource): # we clone it here. file_based_source_ref = pickler.loads(pickler.dumps(self)) - for index, file_name in enumerate(file_names): - if sizes[index] == 0: + for file_metadata in files_metadata: + file_name = file_metadata.path + file_size = file_metadata.size_in_bytes + if file_size == 0: continue # Ignoring empty file. # We determine splittability of this specific file. splittable = self.splittable if (splittable and - self._compression_type == fileio.CompressionTypes.AUTO): - compression_type = fileio.CompressionTypes.detect_compression_type( + self._compression_type == CompressionTypes.AUTO): + compression_type = CompressionTypes.detect_compression_type( file_name) - if compression_type != fileio.CompressionTypes.UNCOMPRESSED: + if compression_type != CompressionTypes.UNCOMPRESSED: splittable = False single_file_source = _SingleFileSource( file_based_source_ref, file_name, 0, - sizes[index], + file_size, min_bundle_size=self._min_bundle_size, splittable=splittable) single_file_sources.append(single_file_source) @@ -139,36 +138,16 @@ class FileBasedSource(iobase.BoundedSource): return self._concat_source def open_file(self, file_name): - return fileio.ChannelFactory.open( - file_name, 'rb', 'application/octet-stream', + return get_filesystem(file_name).open( + file_name, 'application/octet-stream', compression_type=self._compression_type) - @staticmethod - def _estimate_sizes_of_files(file_names, pattern=None): - """Returns the size of all the files as an ordered list based on the file - names that are provided here. If the pattern is specified here then we use - the size_of_files_in_glob method to get the size of files matching the glob - for performance improvements instead of getting the size one by one. - """ - if not file_names: - return [] - elif len(file_names) == 1: - return [fileio.ChannelFactory.size_in_bytes(file_names[0])] - else: - if pattern is None: - return util.run_using_threadpool( - fileio.ChannelFactory.size_in_bytes, file_names, - MAX_NUM_THREADS_FOR_SIZE_ESTIMATION) - else: - file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, - file_names) - return [file_sizes[f] for f in file_names] - def _validate(self): """Validate if there are actual files in the specified glob pattern """ # Limit the responses as we only want to check if something exists - if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0: + match_result = self._file_system.match([self._pattern], limits=[1])[0] + if len(match_result.metadata_list) <= 0: raise IOError( 'No files found based on the file pattern %s' % self._pattern) @@ -180,24 +159,8 @@ class FileBasedSource(iobase.BoundedSource): stop_position=stop_position) def estimate_size(self): - file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] - # We're reading very few files so we can pass names file names to - # _estimate_sizes_of_files without pattern as otherwise we'll try to do - # optimization based on the pattern and might end up reading much more - # data than needed for a few files. - if (len(file_names) <= - FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT): - return sum(self._estimate_sizes_of_files(file_names)) - else: - # Estimating size of a random sample. - # TODO: better support distributions where file sizes are not - # approximately equal. - sample_size = max(FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT, - int(len(file_names) * - FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT)) - sample = random.sample(file_names, sample_size) - estimate = self._estimate_sizes_of_files(sample) - return int(sum(estimate) * (float(len(file_names)) / len(sample))) + match_result = self._file_system.match([self._pattern])[0] + return sum([f.size_in_bytes for f in match_result.metadata_list]) def read(self, range_tracker): return self._get_concat_source().read(range_tracker) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 7481c4c..7b7ec8a 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -29,9 +29,9 @@ import hamcrest as hc import apache_beam as beam from apache_beam.io import filebasedsource -from apache_beam.io import fileio from apache_beam.io import iobase from apache_beam.io import range_trackers +from apache_beam.io.filesystem import CompressionTypes # importing following private classes for testing from apache_beam.io.concat_source import ConcatSource @@ -409,7 +409,7 @@ class TestFileBasedSource(unittest.TestCase): pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, splittable=False, - compression_type=fileio.CompressionTypes.BZIP2)) + compression_type=CompressionTypes.BZIP2)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -424,7 +424,7 @@ class TestFileBasedSource(unittest.TestCase): pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, splittable=False, - compression_type=fileio.CompressionTypes.GZIP)) + compression_type=CompressionTypes.GZIP)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -442,7 +442,7 @@ class TestFileBasedSource(unittest.TestCase): pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, splittable=False, - compression_type=fileio.CompressionTypes.BZIP2)) + compression_type=CompressionTypes.BZIP2)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -461,7 +461,7 @@ class TestFileBasedSource(unittest.TestCase): pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, splittable=False, - compression_type=fileio.CompressionTypes.GZIP)) + compression_type=CompressionTypes.GZIP)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -475,7 +475,7 @@ class TestFileBasedSource(unittest.TestCase): pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, - compression_type=fileio.CompressionTypes.AUTO)) + compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -489,7 +489,7 @@ class TestFileBasedSource(unittest.TestCase): pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( filename, - compression_type=fileio.CompressionTypes.AUTO)) + compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -508,7 +508,7 @@ class TestFileBasedSource(unittest.TestCase): pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, - compression_type=fileio.CompressionTypes.AUTO)) + compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) pipeline.run() @@ -530,7 +530,7 @@ class TestFileBasedSource(unittest.TestCase): pipeline = TestPipeline() pcoll = pipeline | 'Read' >> beam.Read(LineSource( file_pattern, - compression_type=fileio.CompressionTypes.AUTO)) + compression_type=CompressionTypes.AUTO)) assert_that(pcoll, equal_to(lines)) pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 49a2082..0759ce4 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -18,146 +18,40 @@ from __future__ import absolute_import -import bz2 -import cStringIO -import glob import logging import os import re -import shutil import time -import zlib from apache_beam.internal import util from apache_beam.io import iobase +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import CompressedFile as _CompressedFile +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.transforms.display import DisplayDataItem -# TODO(sourabhbajaj): Fix the constant values after the new IO factory -# Current constants are copy pasted from gcsio.py till we fix this. -# Protect against environments where apitools library is not available. -# pylint: disable=wrong-import-order, wrong-import-position -try: - from apache_beam.io.gcp import gcsio - DEFAULT_READ_BUFFER_SIZE = gcsio.DEFAULT_READ_BUFFER_SIZE - MAX_BATCH_OPERATION_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE -except ImportError: - class FakeGcsIO(object): - def __getattr__(self, attr): - raise ImportError( - 'Google Cloud Platform IO not available, ' - 'please install apache_beam[gcp]') - gcsio = FakeGcsIO() - DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 - MAX_BATCH_OPERATION_SIZE = 100 -# pylint: enable=wrong-import-order, wrong-import-position - - +MAX_BATCH_OPERATION_SIZE = 100 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' -class _CompressionType(object): - """Object representing single compression type.""" - - def __init__(self, identifier): - self.identifier = identifier - - def __eq__(self, other): - return (isinstance(other, _CompressionType) and - self.identifier == other.identifier) - - def __hash__(self): - return hash(self.identifier) - - def __ne__(self, other): - return not self.__eq__(other) - - def __str__(self): - return self.identifier - - def __repr__(self): - return '_CompressionType(%s)' % self.identifier - - -class CompressionTypes(object): - """Enum-like class representing known compression types.""" - - # Detect compression based on filename extension. - # - # The following extensions are currently recognized by auto-detection: - # .bz2 (implies BZIP2 as described below). - # .gz (implies GZIP as described below) - # Any non-recognized extension implies UNCOMPRESSED as described below. - AUTO = _CompressionType('auto') - - # BZIP2 compression. - BZIP2 = _CompressionType('bzip2') - - # GZIP compression (deflate with GZIP headers). - GZIP = _CompressionType('gzip') - - # Uncompressed (i.e., may be split). - UNCOMPRESSED = _CompressionType('uncompressed') - - @classmethod - def is_valid_compression_type(cls, compression_type): - """Returns true for valid compression types, false otherwise.""" - return isinstance(compression_type, _CompressionType) - - @classmethod - def mime_type(cls, compression_type, default='application/octet-stream'): - mime_types_by_compression_type = { - cls.BZIP2: 'application/x-bz2', - cls.GZIP: 'application/x-gzip', - } - return mime_types_by_compression_type.get(compression_type, default) - - @classmethod - def detect_compression_type(cls, file_path): - """Returns the compression type of a file (based on its suffix).""" - compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP} - lowercased_path = file_path.lower() - for suffix, compression_type in compression_types_by_suffix.iteritems(): - if lowercased_path.endswith(suffix): - return compression_type - return cls.UNCOMPRESSED - - +# TODO(sourabhbajaj): Remove this after BFS API is used everywhere class ChannelFactory(object): - # TODO: Generalize into extensible framework. - @staticmethod def mkdir(path): - if path.startswith('gs://'): - return - else: - try: - os.makedirs(path) - except OSError as err: - raise IOError(err) + bfs = get_filesystem(path) + bfs.mkdirs(path) @staticmethod def open(path, mode, mime_type='application/octet-stream', compression_type=CompressionTypes.AUTO): - if compression_type == CompressionTypes.AUTO: - compression_type = CompressionTypes.detect_compression_type(path) - elif not CompressionTypes.is_valid_compression_type(compression_type): - raise TypeError('compression_type must be CompressionType object but ' - 'was %s' % type(compression_type)) - - if path.startswith('gs://'): - raw_file = gcsio.GcsIO().open( - path, - mode, - mime_type=CompressionTypes.mime_type(compression_type, mime_type)) - else: - raw_file = open(path, mode) - - if compression_type == CompressionTypes.UNCOMPRESSED: - return raw_file - else: - return _CompressedFile(raw_file, compression_type=compression_type) + bfs = get_filesystem(path) + if mode == 'rb': + return bfs.open(path, mime_type, compression_type) + elif mode == 'wb': + return bfs.create(path, mime_type, compression_type) @staticmethod def is_compressed(fileobj): @@ -165,358 +59,66 @@ class ChannelFactory(object): @staticmethod def rename(src, dest): - if src.startswith('gs://'): - if not dest.startswith('gs://'): - raise ValueError('Destination %r must be GCS path.', dest) - gcsio.GcsIO().rename(src, dest) - else: - try: - os.rename(src, dest) - except OSError as err: - raise IOError(err) + bfs = get_filesystem(path) + bfs.rename([src], [dest]) @staticmethod def rename_batch(src_dest_pairs): - # Filter out local and GCS operations. - local_src_dest_pairs = [] - gcs_src_dest_pairs = [] - for src, dest in src_dest_pairs: - if src.startswith('gs://'): - if not dest.startswith('gs://'): - raise ValueError('Destination %r must be GCS path.', dest) - gcs_src_dest_pairs.append((src, dest)) - else: - local_src_dest_pairs.append((src, dest)) - - # Execute local operations. - exceptions = [] - for src, dest in local_src_dest_pairs: - try: - ChannelFactory.rename(src, dest) - except Exception as e: # pylint: disable=broad-except - exceptions.append((src, dest, e)) - - # Execute GCS operations. - exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs) - - return exceptions - - @staticmethod - def _rename_gcs_batch(src_dest_pairs): - # Prepare batches. - gcs_batches = [] - gcs_current_batch = [] - for src, dest in src_dest_pairs: - gcs_current_batch.append((src, dest)) - if len(gcs_current_batch) == MAX_BATCH_OPERATION_SIZE: - gcs_batches.append(gcs_current_batch) - gcs_current_batch = [] - if gcs_current_batch: - gcs_batches.append(gcs_current_batch) - - # Execute GCS renames if any and return exceptions. - exceptions = [] - for batch in gcs_batches: - copy_statuses = gcsio.GcsIO().copy_batch(batch) - copy_succeeded = [] - for src, dest, exception in copy_statuses: - if exception: - exceptions.append((src, dest, exception)) - else: - copy_succeeded.append((src, dest)) - delete_batch = [src for src, dest in copy_succeeded] - delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) - for i, (src, exception) in enumerate(delete_statuses): - dest = copy_succeeded[i] - if exception: - exceptions.append((src, dest, exception)) - return exceptions + sources = [s for s, _ in src_dest_pairs] + destinations = [d for _, d in src_dest_pairs] + bfs = get_filesystem() + try: + bfs.rename(sources, destinations) + return [] + except BeamIOError as exp: + return [(s, d, e) for (s, d), e in exp.exception_details.iteritems()] @staticmethod def copytree(src, dest): - if src.startswith('gs://'): - if not dest.startswith('gs://'): - raise ValueError('Destination %r must be GCS path.', dest) - assert src.endswith('/'), src - assert dest.endswith('/'), dest - gcsio.GcsIO().copytree(src, dest) - else: - try: - if os.path.exists(dest): - shutil.rmtree(dest) - shutil.copytree(src, dest) - except OSError as err: - raise IOError(err) + bfs = get_filesystem() + bfs.copy([src], [dest]) @staticmethod def exists(path): - if path.startswith('gs://'): - return gcsio.GcsIO().exists(path) - else: - return os.path.exists(path) + bfs = get_filesystem(path) + bfs.exists(path) @staticmethod def rmdir(path): - if path.startswith('gs://'): - gcs = gcsio.GcsIO() - if not path.endswith('/'): - path += '/' - # TODO: Threadpool? - for entry in gcs.glob(path + '*'): - gcs.delete(entry) - else: - try: - shutil.rmtree(path) - except OSError as err: - raise IOError(err) + bfs = get_filesystem(path) + bfs.delete([path]) @staticmethod def rm(path): - if path.startswith('gs://'): - gcsio.GcsIO().delete(path) - else: - try: - os.remove(path) - except OSError as err: - raise IOError(err) + bfs = get_filesystem(path) + bfs.delete([path]) @staticmethod def glob(path, limit=None): - if path.startswith('gs://'): - return gcsio.GcsIO().glob(path, limit) - else: - files = glob.glob(path) - return files[:limit] + bfs = get_filesystem(path) + match_result = bfs.match([path], [limit])[0] + return [f.path for f in match_result.metadata_list] @staticmethod def size_in_bytes(path): - """Returns the size of a file in bytes. - - Args: - path: a string that gives the path of a single file. - """ - if path.startswith('gs://'): - return gcsio.GcsIO().size(path) - else: - return os.path.getsize(path) + bfs = get_filesystem(path) + match_result = bfs.match([path], [limit])[0] + return [f.size_in_bytes for f in match_result.metadata_list][0] @staticmethod def size_of_files_in_glob(path, file_names=None): - """Returns a map of file names to sizes. - - Args: - path: a file path pattern that reads the size of all the files - file_names: List of file names that we need size for, this is added to - support eventually consistent sources where two expantions of glob - might yield to different files. - """ - if path.startswith('gs://'): - file_sizes = gcsio.GcsIO().size_of_files_in_glob(path) - if file_names is None: - return file_sizes - else: - result = {} - # We need to make sure we fetched the size for all the files as the - # list API in GCS is eventually consistent so directly call size for - # any files that may be missing. - for file_name in file_names: - if file_name in file_sizes: - result[file_name] = file_sizes[file_name] - else: - result[file_name] = ChannelFactory.size_in_bytes(file_name) - return result - else: - if file_names is None: - file_names = ChannelFactory.glob(path) - return {file_name: ChannelFactory.size_in_bytes(file_name) - for file_name in file_names} - - -class _CompressedFile(object): - """Somewhat limited file wrapper for easier handling of compressed files.""" - - # The bit mask to use for the wbits parameters of the zlib compressor and - # decompressor objects. - _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers. - - def __init__(self, - fileobj, - compression_type=CompressionTypes.GZIP, - read_size=DEFAULT_READ_BUFFER_SIZE): - if not fileobj: - raise ValueError('File object must be opened file but was at %s' % - fileobj) - - if not CompressionTypes.is_valid_compression_type(compression_type): - raise TypeError('compression_type must be CompressionType object but ' - 'was %s' % type(compression_type)) - if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED - ): - raise ValueError( - 'Cannot create object with unspecified or no compression') - - self._file = fileobj - self._compression_type = compression_type - - if self._file.tell() != 0: - raise ValueError('File object must be at position 0 but was %d' % - self._file.tell()) - self._uncompressed_position = 0 - - if self.readable(): - self._read_size = read_size - self._read_buffer = cStringIO.StringIO() - self._read_position = 0 - self._read_eof = False - - if self._compression_type == CompressionTypes.BZIP2: - self._decompressor = bz2.BZ2Decompressor() - else: - assert self._compression_type == CompressionTypes.GZIP - self._decompressor = zlib.decompressobj(self._gzip_mask) - else: - self._decompressor = None - - if self.writeable(): - if self._compression_type == CompressionTypes.BZIP2: - self._compressor = bz2.BZ2Compressor() - else: - assert self._compression_type == CompressionTypes.GZIP - self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, - zlib.DEFLATED, self._gzip_mask) - else: - self._compressor = None - - def readable(self): - mode = self._file.mode - return 'r' in mode or 'a' in mode - - def writeable(self): - mode = self._file.mode - return 'w' in mode or 'a' in mode - - def write(self, data): - """Write data to file.""" - if not self._compressor: - raise ValueError('compressor not initialized') - self._uncompressed_position += len(data) - compressed = self._compressor.compress(data) - if compressed: - self._file.write(compressed) - - def _fetch_to_internal_buffer(self, num_bytes): - """Fetch up to num_bytes into the internal buffer.""" - if (not self._read_eof and self._read_position > 0 and - (self._read_buffer.tell() - self._read_position) < num_bytes): - # There aren't enough number of bytes to accommodate a read, so we - # prepare for a possibly large read by clearing up all internal buffers - # but without dropping any previous held data. - self._read_buffer.seek(self._read_position) - data = self._read_buffer.read() - self._read_position = 0 - self._read_buffer.seek(0) - self._read_buffer.truncate(0) - self._read_buffer.write(data) - - while not self._read_eof and (self._read_buffer.tell() - self._read_position - ) < num_bytes: - # Continue reading from the underlying file object until enough bytes are - # available, or EOF is reached. - buf = self._file.read(self._read_size) - if buf: - decompressed = self._decompressor.decompress(buf) - del buf # Free up some possibly large and no-longer-needed memory. - self._read_buffer.write(decompressed) - else: - # EOF reached. - # Verify completeness and no corruption and flush (if needed by - # the underlying algorithm). - if self._compression_type == CompressionTypes.BZIP2: - # Having unused_data past end of stream would imply file corruption. - assert not self._decompressor.unused_data, 'Possible file corruption.' - try: - # EOF implies that the underlying BZIP2 stream must also have - # reached EOF. We expect this to raise an EOFError and we catch it - # below. Any other kind of error though would be problematic. - self._decompressor.decompress('dummy') - assert False, 'Possible file corruption.' - except EOFError: - pass # All is as expected! - else: - self._read_buffer.write(self._decompressor.flush()) - - # Record that we have hit the end of file, so we won't unnecessarily - # repeat the completeness verification step above. - self._read_eof = True - - def _read_from_internal_buffer(self, read_fn): - """Read from the internal buffer by using the supplied read_fn.""" - self._read_buffer.seek(self._read_position) - result = read_fn() - self._read_position += len(result) - self._uncompressed_position += len(result) - self._read_buffer.seek(0, os.SEEK_END) # Allow future writes. - return result - - def read(self, num_bytes): - if not self._decompressor: - raise ValueError('decompressor not initialized') - - self._fetch_to_internal_buffer(num_bytes) - return self._read_from_internal_buffer( - lambda: self._read_buffer.read(num_bytes)) - - def readline(self): - """Equivalent to standard file.readline(). Same return conventions apply.""" - if not self._decompressor: - raise ValueError('decompressor not initialized') - - io = cStringIO.StringIO() - while True: - # Ensure that the internal buffer has at least half the read_size. Going - # with half the _read_size (as opposed to a full _read_size) to ensure - # that actual fetches are more evenly spread out, as opposed to having 2 - # consecutive reads at the beginning of a read. - self._fetch_to_internal_buffer(self._read_size / 2) - line = self._read_from_internal_buffer( - lambda: self._read_buffer.readline()) - io.write(line) - if line.endswith('\n') or not line: - break # Newline or EOF reached. - - return io.getvalue() - - def closed(self): - return not self._file or self._file.closed() - - def close(self): - if self.readable(): - self._read_buffer.close() - - if self.writeable(): - self._file.write(self._compressor.flush()) + bfs = get_filesystem(path) + match_result = bfs.match([path], [limit])[0] + part_files = {f.path:f.size_in_bytes for f in match_result.metadata_list} - self._file.close() + if file_names is not None: + specific_files = {} + match_results = bfs.match(file_names) + for match_result in match_results: + for metadata in match_result.metadata_list: + specific_files[metadata.path] = metadata.size_in_bytes - def flush(self): - if self.writeable(): - self._file.write(self._compressor.flush()) - self._file.flush() - - @property - def seekable(self): - # TODO: Add support for seeking to a file position. - return False - - def tell(self): - """Returns current position in uncompressed file.""" - return self._uncompressed_position - - def __enter__(self): - return self - - def __exit__(self, exception_type, exception_value, traceback): - self.close() + return part_files.update(specific_files) class FileSink(iobase.Sink): @@ -570,6 +172,7 @@ class FileSink(iobase.Sink): self.shard_name_format = self._template_to_format(shard_name_template) self.compression_type = compression_type self.mime_type = mime_type + self._file_system = get_filesystem(file_path_prefix) def display_data(self): return {'shards': @@ -589,11 +192,8 @@ class FileSink(iobase.Sink): The returned file handle is passed to ``write_[encoded_]record`` and ``close``. """ - return ChannelFactory.open( - temp_path, - 'wb', - mime_type=self.mime_type, - compression_type=self.compression_type) + return self._file_system.create(temp_path, self.mime_type, + self.compression_type) def write_record(self, file_handle, value): """Writes a single record go the file handle returned by ``open()``. @@ -621,7 +221,7 @@ class FileSink(iobase.Sink): def initialize_write(self): tmp_dir = self.file_path_prefix + self.file_name_suffix + time.strftime( '-temp-%Y-%m-%d_%H-%M-%S') - ChannelFactory().mkdir(tmp_dir) + self._file_system.mkdirs(tmp_dir) return tmp_dir def open_writer(self, init_result, uid): @@ -639,75 +239,79 @@ class FileSink(iobase.Sink): min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS) num_threads = max(1, min_threads) - rename_ops = [] + source_files = [] + destination_files = [] for shard_num, shard in enumerate(writer_results): final_name = ''.join([ self.file_path_prefix, self.shard_name_format % dict( shard_num=shard_num, num_shards=num_shards), self.file_name_suffix ]) - rename_ops.append((shard, final_name)) - - batches = [] - current_batch = [] - for rename_op in rename_ops: - current_batch.append(rename_op) - if len(current_batch) == MAX_BATCH_OPERATION_SIZE: - batches.append(current_batch) - current_batch = [] - if current_batch: - batches.append(current_batch) + source_files.append(shard) + destination_files.append(final_name) + + source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE] + for i in xrange(0, len(source_files), + MAX_BATCH_OPERATION_SIZE)] + destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE] + for i in xrange(0, len(destination_files), + MAX_BATCH_OPERATION_SIZE)] logging.info( 'Starting finalize_write threads with num_shards: %d, ' 'batches: %d, num_threads: %d', - num_shards, len(batches), num_threads) + num_shards, len(source_file_batch), num_threads) start_time = time.time() # Use a thread pool for renaming operations. def _rename_batch(batch): """_rename_batch executes batch rename operations.""" + source_files, destination_files = batch exceptions = [] - exception_infos = ChannelFactory.rename_batch(batch) - for src, dest, exception in exception_infos: - if exception: - logging.warning('Rename not successful: %s -> %s, %s', src, dest, - exception) - should_report = True - if isinstance(exception, IOError): - # May have already been copied. - try: - if ChannelFactory.exists(dest): - should_report = False - except Exception as exists_e: # pylint: disable=broad-except - logging.warning('Exception when checking if file %s exists: ' - '%s', dest, exists_e) - if should_report: - logging.warning(('Exception in _rename_batch. src: %s, ' - 'dest: %s, err: %s'), src, dest, exception) - exceptions.append(exception) - else: - logging.debug('Rename successful: %s -> %s', src, dest) - return exceptions + try: + self._file_system.rename(source_files, destination_files) + return exceptions + except BeamIOError as exp: + if exp.exception_details is None: + raise exp + for (src, dest), exception in exp.exception_details.iteritems(): + if exception: + logging.warning('Rename not successful: %s -> %s, %s', src, dest, + exception) + should_report = True + if isinstance(exception, IOError): + # May have already been copied. + try: + if self._file_system.exists(dest): + should_report = False + except Exception as exists_e: # pylint: disable=broad-except + logging.warning('Exception when checking if file %s exists: ' + '%s', dest, exists_e) + if should_report: + logging.warning(('Exception in _rename_batch. src: %s, ' + 'dest: %s, err: %s'), src, dest, exception) + exceptions.append(exception) + else: + logging.debug('Rename successful: %s -> %s', src, dest) + return exceptions exception_batches = util.run_using_threadpool( - _rename_batch, batches, num_threads) + _rename_batch, zip(source_file_batch, destination_file_batch), + num_threads) - all_exceptions = [] - for exceptions in exception_batches: - if exceptions: - all_exceptions += exceptions + all_exceptions = [e for exception_batch in exception_batches + for e in exception_batch] if all_exceptions: raise Exception('Encountered exceptions in finalize_write: %s', all_exceptions) - for shard, final_name in rename_ops: + for final_name in destination_files: yield final_name logging.info('Renamed %d shards in %.2f seconds.', num_shards, time.time() - start_time) try: - ChannelFactory.rmdir(init_result) + self._file_system.delete([init_result]) except IOError: # May have already been removed. pass http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index a963c67..6b7437d 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -16,7 +16,7 @@ # limitations under the License. # -"""Unit tests for local and GCS sources and sinks.""" +"""Unit tests for file sinks.""" import glob import logging @@ -26,60 +26,16 @@ import tempfile import unittest import hamcrest as hc -import mock import apache_beam as beam from apache_beam import coders from apache_beam.io import fileio +from apache_beam.io.filesystem import CompressedFile from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -class TestChannelFactory(unittest.TestCase): - - @mock.patch('apache_beam.io.fileio.gcsio') - def test_size_of_files_in_glob_complete(self, *unused_args): - # Prepare mocks. - gcsio_mock = mock.MagicMock() - fileio.gcsio.GcsIO = lambda: gcsio_mock - file_names = ['gs://bucket/file1', 'gs://bucket/file2'] - gcsio_mock.size_of_files_in_glob.return_value = { - 'gs://bucket/file1': 1, - 'gs://bucket/file2': 2 - } - expected_results = { - 'gs://bucket/file1': 1, - 'gs://bucket/file2': 2 - } - self.assertEqual( - fileio.ChannelFactory.size_of_files_in_glob( - 'gs://bucket/*', file_names), - expected_results) - gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') - - @mock.patch('apache_beam.io.fileio.gcsio') - def test_size_of_files_in_glob_incomplete(self, *unused_args): - # Prepare mocks. - gcsio_mock = mock.MagicMock() - fileio.gcsio.GcsIO = lambda: gcsio_mock - file_names = ['gs://bucket/file1', 'gs://bucket/file2'] - gcsio_mock.size_of_files_in_glob.return_value = { - 'gs://bucket/file1': 1 - } - gcsio_mock.size.return_value = 2 - expected_results = { - 'gs://bucket/file1': 1, - 'gs://bucket/file2': 2 - } - self.assertEqual( - fileio.ChannelFactory.size_of_files_in_glob( - 'gs://bucket/*', file_names), - expected_results) - gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') - gcsio_mock.size.assert_called_once_with('gs://bucket/file2') - - # TODO: Refactor code so all io tests are using same library # TestCaseWithTempDirCleanup class. class _TestCaseWithTempDirCleanUp(unittest.TestCase): @@ -115,16 +71,16 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase): class TestCompressedFile(_TestCaseWithTempDirCleanUp): def test_seekable(self): - readable = fileio._CompressedFile(open(self._create_temp_file(), 'r')) + readable = CompressedFile(open(self._create_temp_file(), 'r')) self.assertFalse(readable.seekable) - writeable = fileio._CompressedFile(open(self._create_temp_file(), 'w')) + writeable = CompressedFile(open(self._create_temp_file(), 'w')) self.assertFalse(writeable.seekable) def test_tell(self): lines = ['line%d\n' % i for i in range(10)] tmpfile = self._create_temp_file() - writeable = fileio._CompressedFile(open(tmpfile, 'w')) + writeable = CompressedFile(open(tmpfile, 'w')) current_offset = 0 for line in lines: writeable.write(line) @@ -132,7 +88,7 @@ class TestCompressedFile(_TestCaseWithTempDirCleanUp): self.assertEqual(current_offset, writeable.tell()) writeable.close() - readable = fileio._CompressedFile(open(tmpfile)) + readable = CompressedFile(open(tmpfile)) current_offset = 0 while True: line = readable.readline() @@ -300,52 +256,6 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): with self.assertRaises(Exception): list(sink.finalize_write(init_token, [res1, res2])) - @mock.patch('apache_beam.io.fileio.ChannelFactory.rename') - @mock.patch('apache_beam.io.fileio.gcsio') - def test_rename_batch(self, *unused_args): - # Prepare mocks. - gcsio_mock = mock.MagicMock() - fileio.gcsio.GcsIO = lambda: gcsio_mock - fileio.ChannelFactory.rename = mock.MagicMock() - to_rename = [ - ('gs://bucket/from1', 'gs://bucket/to1'), - ('gs://bucket/from2', 'gs://bucket/to2'), - ('/local/from1', '/local/to1'), - ('gs://bucket/from3', 'gs://bucket/to3'), - ('/local/from2', '/local/to2'), - ] - gcsio_mock.copy_batch.side_effect = [[ - ('gs://bucket/from1', 'gs://bucket/to1', None), - ('gs://bucket/from2', 'gs://bucket/to2', None), - ('gs://bucket/from3', 'gs://bucket/to3', None), - ]] - gcsio_mock.delete_batch.side_effect = [[ - ('gs://bucket/from1', None), - ('gs://bucket/from2', None), - ('gs://bucket/from3', None), - ]] - - # Issue batch rename. - fileio.ChannelFactory.rename_batch(to_rename) - - # Verify mocks. - expected_local_rename_calls = [ - mock.call('/local/from1', '/local/to1'), - mock.call('/local/from2', '/local/to2'), - ] - self.assertEqual(fileio.ChannelFactory.rename.call_args_list, - expected_local_rename_calls) - gcsio_mock.copy_batch.assert_called_once_with([ - ('gs://bucket/from1', 'gs://bucket/to1'), - ('gs://bucket/from2', 'gs://bucket/to2'), - ('gs://bucket/from3', 'gs://bucket/to3'), - ]) - gcsio_mock.delete_batch.assert_called_once_with([ - 'gs://bucket/from1', - 'gs://bucket/from2', - 'gs://bucket/from3', - ]) - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py new file mode 100644 index 0000000..14493c0 --- /dev/null +++ b/sdks/python/apache_beam/io/filesystem.py @@ -0,0 +1,439 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""File system abstraction for file-based sources and sinks.""" + +from __future__ import absolute_import + +import abc +import bz2 +import cStringIO +import os +import zlib + + +DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 + + +class CompressionTypes(object): + """Enum-like class representing known compression types.""" + + # Detect compression based on filename extension. + # + # The following extensions are currently recognized by auto-detection: + # .bz2 (implies BZIP2 as described below). + # .gz (implies GZIP as described below) + # Any non-recognized extension implies UNCOMPRESSED as described below. + AUTO = 'auto' + + # BZIP2 compression. + BZIP2 = 'bzip2' + + # GZIP compression (deflate with GZIP headers). + GZIP = 'gzip' + + # Uncompressed (i.e., may be split). + UNCOMPRESSED = 'uncompressed' + + @classmethod + def is_valid_compression_type(cls, compression_type): + """Returns True for valid compression types, False otherwise.""" + types = set([ + CompressionTypes.AUTO, + CompressionTypes.BZIP2, + CompressionTypes.GZIP, + CompressionTypes.UNCOMPRESSED + ]) + return compression_type in types + + @classmethod + def mime_type(cls, compression_type, default='application/octet-stream'): + mime_types_by_compression_type = { + cls.BZIP2: 'application/x-bz2', + cls.GZIP: 'application/x-gzip', + } + return mime_types_by_compression_type.get(compression_type, default) + + @classmethod + def detect_compression_type(cls, file_path): + """Returns the compression type of a file (based on its suffix).""" + compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP} + lowercased_path = file_path.lower() + for suffix, compression_type in compression_types_by_suffix.iteritems(): + if lowercased_path.endswith(suffix): + return compression_type + return cls.UNCOMPRESSED + + +class CompressedFile(object): + """Somewhat limited file wrapper for easier handling of compressed files.""" + + # The bit mask to use for the wbits parameters of the zlib compressor and + # decompressor objects. + _gzip_mask = zlib.MAX_WBITS | 16 # Mask when using GZIP headers. + + def __init__(self, + fileobj, + compression_type=CompressionTypes.GZIP, + read_size=DEFAULT_READ_BUFFER_SIZE): + if not fileobj: + raise ValueError('File object must not be None') + + if not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + if compression_type in (CompressionTypes.AUTO, CompressionTypes.UNCOMPRESSED + ): + raise ValueError( + 'Cannot create object with unspecified or no compression') + + self._file = fileobj + self._compression_type = compression_type + + if self._file.tell() != 0: + raise ValueError('File object must be at position 0 but was %d' % + self._file.tell()) + self._uncompressed_position = 0 + + if self.readable(): + self._read_size = read_size + self._read_buffer = cStringIO.StringIO() + self._read_position = 0 + self._read_eof = False + + if self._compression_type == CompressionTypes.BZIP2: + self._decompressor = bz2.BZ2Decompressor() + else: + assert self._compression_type == CompressionTypes.GZIP + self._decompressor = zlib.decompressobj(self._gzip_mask) + else: + self._decompressor = None + + if self.writeable(): + if self._compression_type == CompressionTypes.BZIP2: + self._compressor = bz2.BZ2Compressor() + else: + assert self._compression_type == CompressionTypes.GZIP + self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, + zlib.DEFLATED, self._gzip_mask) + else: + self._compressor = None + + def readable(self): + mode = self._file.mode + return 'r' in mode or 'a' in mode + + def writeable(self): + mode = self._file.mode + return 'w' in mode or 'a' in mode + + def write(self, data): + """Write data to file.""" + if not self._compressor: + raise ValueError('compressor not initialized') + self._uncompressed_position += len(data) + compressed = self._compressor.compress(data) + if compressed: + self._file.write(compressed) + + def _fetch_to_internal_buffer(self, num_bytes): + """Fetch up to num_bytes into the internal buffer.""" + if (not self._read_eof and self._read_position > 0 and + (self._read_buffer.tell() - self._read_position) < num_bytes): + # There aren't enough number of bytes to accommodate a read, so we + # prepare for a possibly large read by clearing up all internal buffers + # but without dropping any previous held data. + self._read_buffer.seek(self._read_position) + data = self._read_buffer.read() + self._read_position = 0 + self._read_buffer.seek(0) + self._read_buffer.truncate(0) + self._read_buffer.write(data) + + while not self._read_eof and (self._read_buffer.tell() - self._read_position + ) < num_bytes: + # Continue reading from the underlying file object until enough bytes are + # available, or EOF is reached. + buf = self._file.read(self._read_size) + if buf: + decompressed = self._decompressor.decompress(buf) + del buf # Free up some possibly large and no-longer-needed memory. + self._read_buffer.write(decompressed) + else: + # EOF reached. + # Verify completeness and no corruption and flush (if needed by + # the underlying algorithm). + if self._compression_type == CompressionTypes.BZIP2: + # Having unused_data past end of stream would imply file corruption. + assert not self._decompressor.unused_data, 'Possible file corruption.' + try: + # EOF implies that the underlying BZIP2 stream must also have + # reached EOF. We expect this to raise an EOFError and we catch it + # below. Any other kind of error though would be problematic. + self._decompressor.decompress('dummy') + assert False, 'Possible file corruption.' + except EOFError: + pass # All is as expected! + else: + self._read_buffer.write(self._decompressor.flush()) + + # Record that we have hit the end of file, so we won't unnecessarily + # repeat the completeness verification step above. + self._read_eof = True + + def _read_from_internal_buffer(self, read_fn): + """Read from the internal buffer by using the supplied read_fn.""" + self._read_buffer.seek(self._read_position) + result = read_fn() + self._read_position += len(result) + self._uncompressed_position += len(result) + self._read_buffer.seek(0, os.SEEK_END) # Allow future writes. + return result + + def read(self, num_bytes): + if not self._decompressor: + raise ValueError('decompressor not initialized') + + self._fetch_to_internal_buffer(num_bytes) + return self._read_from_internal_buffer( + lambda: self._read_buffer.read(num_bytes)) + + def readline(self): + """Equivalent to standard file.readline(). Same return conventions apply.""" + if not self._decompressor: + raise ValueError('decompressor not initialized') + + io = cStringIO.StringIO() + while True: + # Ensure that the internal buffer has at least half the read_size. Going + # with half the _read_size (as opposed to a full _read_size) to ensure + # that actual fetches are more evenly spread out, as opposed to having 2 + # consecutive reads at the beginning of a read. + self._fetch_to_internal_buffer(self._read_size / 2) + line = self._read_from_internal_buffer( + lambda: self._read_buffer.readline()) + io.write(line) + if line.endswith('\n') or not line: + break # Newline or EOF reached. + + return io.getvalue() + + def closed(self): + return not self._file or self._file.closed() + + def close(self): + if self.readable(): + self._read_buffer.close() + + if self.writeable(): + self._file.write(self._compressor.flush()) + + self._file.close() + + def flush(self): + if self.writeable(): + self._file.write(self._compressor.flush()) + self._file.flush() + + @property + def seekable(self): + # TODO: Add support for seeking to a file position. + return False + + def tell(self): + """Returns current position in uncompressed file.""" + return self._uncompressed_position + + def __enter__(self): + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.close() + + +class FileMetadata(object): + """Metadata about a file path that is the output of FileSystem.match + """ + def __init__(self, path, size_in_bytes): + assert isinstance(path, basestring) and path, "Path should be a string" + assert isinstance(size_in_bytes, int) and size_in_bytes >= 0, \ + "Size of bytes should be greater than equal to zero" + self.path = path + self.size_in_bytes = size_in_bytes + + def __eq__(self, other): + """Note: This is only used in tests where we verify that mock objects match. + """ + return (isinstance(other, FileMetadata) and + self.path == other.path and + self.size_in_bytes == other.size_in_bytes) + + def __hash__(self): + return hash((self.path, self.size_in_bytes)) + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return 'FileMetadata(%s, %s)' % (self.path, self.size_in_bytes) + + +class MatchResult(object): + """Result from the ``FileSystem`` match operation which contains the list + of matched FileMetadata. + """ + def __init__(self, pattern, metadata_list): + self.metadata_list = metadata_list + self.pattern = pattern + + +class BeamIOError(IOError): + def __init__(self, msg, exception_details=None): + """Class representing the errors thrown in the batch file operations. + Args: + msg: Message string for the exception thrown + exception_details: Optional map of individual input to exception for + failed operations in batch. This parameter is optional so if specified + the user can assume that the all errors in the filesystem operation + have been reported. When the details are missing then the operation + may have failed anywhere so the user should use match to determine + the current state of the system. + """ + super(BeamIOError, self).__init__(msg) + self.exception_details = exception_details + + +class FileSystem(object): + """A class that defines the functions that can be performed on a filesystem. + + All methods are abstract and they are for file system providers to + implement. Clients should use the FileSystemUtil class to interact with + the correct file system based on the provided file pattern scheme. + """ + __metaclass__ = abc.ABCMeta + + @staticmethod + def _get_compression_type(path, compression_type): + if compression_type == CompressionTypes.AUTO: + compression_type = CompressionTypes.detect_compression_type(path) + elif not CompressionTypes.is_valid_compression_type(compression_type): + raise TypeError('compression_type must be CompressionType object but ' + 'was %s' % type(compression_type)) + return compression_type + + @abc.abstractmethod + def mkdirs(self, path): + """Recursively create directories for the provided path. + + Args: + path: string path of the directory structure that should be created + + Raises: + IOError if leaf directory already exists. + """ + raise NotImplementedError + + @abc.abstractmethod + def match(self, patterns, limits=None): + """Find all matching paths to the patterns provided. + + Args: + patterns: list of string for the file path pattern to match against + limits: list of maximum number of responses that need to be fetched + + Returns: list of ``MatchResult`` objects. + + Raises: + ``BeamIOError`` if any of the pattern match operations fail + """ + raise NotImplementedError + + @abc.abstractmethod + def create(self, path, mime_type, compression_type): + """Returns a write channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + raise NotImplementedError + + @abc.abstractmethod + def open(self, path, mime_type, compression_type): + """Returns a read channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + raise NotImplementedError + + @abc.abstractmethod + def copy(self, source_file_names, destination_file_names): + """Recursively copy the file tree from the source to the destination + + Args: + source_file_names: list of source file objects that needs to be copied + destination_file_names: list of destination of the new object + + Raises: + ``BeamIOError`` if any of the copy operations fail + """ + raise NotImplementedError + + @abc.abstractmethod + def rename(self, source_file_names, destination_file_names): + """Rename the files at the source list to the destination list. + Source and destination lists should be of the same size. + + Args: + source_file_names: List of file paths that need to be moved + destination_file_names: List of destination_file_names for the files + + Raises: + ``BeamIOError`` if any of the rename operations fail + """ + raise NotImplementedError + + @abc.abstractmethod + def exists(self, path): + """Check if the provided path exists on the FileSystem. + + Args: + path: string path that needs to be checked. + + Returns: boolean flag indicating if path exists + """ + raise NotImplementedError + + @abc.abstractmethod + def delete(self, paths): + """Deletes files or directories at the provided paths. + Directories will be deleted recursively. + + Args: + paths: list of paths that give the file objects to be deleted + + Raises: + ``BeamIOError`` if any of the delete operations fail + """ + raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/filesystems_util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py new file mode 100644 index 0000000..47c2361 --- /dev/null +++ b/sdks/python/apache_beam/io/filesystems_util.py @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Utility functions for getting the correct file systems for a file name""" + +from apache_beam.io.localfilesystem import LocalFileSystem + + +# TODO(BEAM-1585): Add a mechanism to add user implemented file systems +def get_filesystem(path): + """Function that returns the FileSystem class to use based on the path + provided in the input. + """ + if path.startswith('gs://'): + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem + return GCSFileSystem() + else: + return LocalFileSystem() http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py new file mode 100644 index 0000000..5aef0ab --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -0,0 +1,242 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""GCS file system implementation for accessing files on GCS.""" + +from __future__ import absolute_import + +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import CompressedFile +from apache_beam.io.filesystem import CompressionTypes +from apache_beam.io.filesystem import FileMetadata +from apache_beam.io.filesystem import FileSystem +from apache_beam.io.filesystem import MatchResult +from apache_beam.io.gcp import gcsio + + +class GCSFileSystem(FileSystem): + """A GCS ``FileSystem`` implementation for accessing files on GCS. + """ + + def mkdirs(self, path): + """Recursively create directories for the provided path. + + Args: + path: string path of the directory structure that should be created + + Raises: + IOError if leaf directory already exists. + """ + pass + + def match(self, patterns, limits=None): + """Find all matching paths to the pattern provided. + + Args: + pattern: string for the file path pattern to match against + limit: Maximum number of responses that need to be fetched + + Returns: list of ``MatchResult`` objects. + + Raises: + ``BeamIOError`` if any of the pattern match operations fail + """ + if limits is None: + limits = [None] * len(patterns) + else: + err_msg = "Patterns and limits should be equal in length" + assert len(patterns) == len(limits), err_msg + + def _match(pattern, limit): + """Find all matching paths to the pattern provided. + """ + if pattern.endswith('/'): + pattern += '*' + file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern) + metadata_list = [FileMetadata(path, size) + for path, size in file_sizes.iteritems()] + return MatchResult(pattern, metadata_list) + + exceptions = {} + result = [] + for pattern, limit in zip(patterns, limits): + try: + result.append(_match(pattern, limit)) + except Exception as e: # pylint: disable=broad-except + exceptions[pattern] = e + + if exceptions: + raise BeamIOError("Match operation failed", exceptions) + return result + + def _path_open(self, path, mode, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Helper functions to open a file in the provided mode. + """ + compression_type = FileSystem._get_compression_type(path, compression_type) + mime_type = CompressionTypes.mime_type(compression_type, mime_type) + raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type) + if compression_type == CompressionTypes.UNCOMPRESSED: + return raw_file + else: + return CompressedFile(raw_file, compression_type=compression_type) + + def create(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a write channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + return self._path_open(path, 'wb', mime_type, compression_type) + + def open(self, path, mime_type='application/octet-stream', + compression_type=CompressionTypes.AUTO): + """Returns a read channel for the given file path. + + Args: + path: string path of the file object to be written to the system + mime_type: MIME type to specify the type of content in the file object + compression_type: Type of compression to be used for this object + + Returns: file handle with a close function for the user to use + """ + return self._path_open(path, 'rb', mime_type, compression_type) + + def copy(self, source_file_names, destination_file_names): + """Recursively copy the file tree from the source to the destination + + Args: + source_file_names: list of source file objects that needs to be copied + destination_file_names: list of destination of the new object + + Raises: + ``BeamIOError`` if any of the copy operations fail + """ + err_msg = ("source_file_names and destination_file_names should " + "be equal in length") + assert len(source_file_names) == len(destination_file_names), err_msg + + def _copy_path(source, destination): + """Recursively copy the file tree from the source to the destination + """ + if not destination.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', destination) + # Use copy_tree if the path ends with / as it is a directory + if source.endswith('/'): + gcsio.GcsIO().copytree(source, destination) + else: + gcsio.GcsIO().copy(source, destination) + + exceptions = {} + for source, destination in zip(source_file_names, destination_file_names): + try: + _copy_path(source, destination) + except Exception as e: # pylint: disable=broad-except + exceptions[(source, destination)] = e + + if exceptions: + raise BeamIOError("Copy operation failed", exceptions) + + def rename(self, source_file_names, destination_file_names): + """Rename the files at the source list to the destination list. + Source and destination lists should be of the same size. + + Args: + source_file_names: List of file paths that need to be moved + destination_file_names: List of destination_file_names for the files + + Raises: + ``BeamIOError`` if any of the rename operations fail + """ + err_msg = ("source_file_names and destination_file_names should " + "be equal in length") + assert len(source_file_names) == len(destination_file_names), err_msg + + gcs_batches = [] + gcs_current_batch = [] + for src, dest in zip(source_file_names, destination_file_names): + gcs_current_batch.append((src, dest)) + if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + gcs_batches.append(gcs_current_batch) + gcs_current_batch = [] + if gcs_current_batch: + gcs_batches.append(gcs_current_batch) + + # Execute GCS renames if any and return exceptions. + exceptions = {} + for batch in gcs_batches: + copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_succeeded = [] + for src, dest, exception in copy_statuses: + if exception: + exceptions[(src, dest)] = exception + else: + copy_succeeded.append((src, dest)) + delete_batch = [src for src, dest in copy_succeeded] + delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + for i, (src, exception) in enumerate(delete_statuses): + dest = copy_succeeded[i][1] + if exception: + exceptions[(src, dest)] = exception + + if exceptions: + raise BeamIOError("Rename operation failed", exceptions) + + def exists(self, path): + """Check if the provided path exists on the FileSystem. + + Args: + path: string path that needs to be checked. + + Returns: boolean flag indicating if path exists + """ + return gcsio.GcsIO().exists(path) + + def delete(self, paths): + """Deletes files or directories at the provided paths. + Directories will be deleted recursively. + + Args: + paths: list of paths that give the file objects to be deleted + """ + def _delete_path(path): + """Recursively delete the file or directory at the provided path. + """ + if path.endswith('/'): + path_to_use = path + '*' + else: + path_to_use = path + match_result = self.match([path_to_use])[0] + statuses = gcsio.GcsIO().delete_batch( + [m.path for m in match_result.metadata_list]) + failures = [e for (_, e) in statuses if e is not None] + if failures: + raise failures[0] + + exceptions = {} + for path in paths: + try: + _delete_path(path) + except Exception as e: # pylint: disable=broad-except + exceptions[path] = e + + if exceptions: + raise BeamIOError("Delete operation failed", exceptions) http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py new file mode 100644 index 0000000..3fe5cce --- /dev/null +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for GCS File System.""" + +import unittest + +import mock +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystem import FileMetadata + +# Protect against environments where apitools library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apache_beam.io.gcp import gcsfilesystem +except ImportError: + gcsfilesystem = None +# pylint: enable=wrong-import-order, wrong-import-position + + [email protected](gcsfilesystem is None, 'GCP dependencies are not installed') +class GCSFileSystemTest(unittest.TestCase): + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_match_multiples(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + expected_results = set([ + FileMetadata('gs://bucket/file1', 1), + FileMetadata('gs://bucket/file2', 2) + ]) + file_system = gcsfilesystem.GCSFileSystem() + match_result = file_system.match(['gs://bucket/'])[0] + self.assertEqual( + set(match_result.metadata_list), + expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_match_multiples_error(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + exception = IOError('Failed') + gcsio_mock.size_of_files_in_glob.side_effect = exception + expected_results = {'gs://bucket/': exception} + + file_system = gcsfilesystem.GCSFileSystem() + with self.assertRaises(BeamIOError) as error: + file_system.match(['gs://bucket/']) + self.assertEqual(error.exception.message, 'Match operation failed') + self.assertEqual(error.exception.exception_details, expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_match_multiple_patterns(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsio_mock.size_of_files_in_glob.side_effect = [ + {'gs://bucket/file1': 1}, + {'gs://bucket/file2': 2}, + ] + expected_results = [ + [FileMetadata('gs://bucket/file1', 1)], + [FileMetadata('gs://bucket/file2', 2)] + ] + file_system = gcsfilesystem.GCSFileSystem() + result = file_system.match(['gs://bucket/file1*', 'gs://bucket/file2*']) + self.assertEqual( + [mr.metadata_list for mr in result], + expected_results) + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_create(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + # Issue file copy + file_system = gcsfilesystem.GCSFileSystem() + _ = file_system.create('gs://bucket/from1', 'application/octet-stream') + + gcsio_mock.open.assert_called_once_with( + 'gs://bucket/from1', 'wb', mime_type='application/octet-stream') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_open(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + # Issue file copy + file_system = gcsfilesystem.GCSFileSystem() + _ = file_system.open('gs://bucket/from1', 'application/octet-stream') + + gcsio_mock.open.assert_called_once_with( + 'gs://bucket/from1', 'rb', mime_type='application/octet-stream') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_copy_file(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + sources = ['gs://bucket/from1'] + destinations = ['gs://bucket/to1'] + + # Issue file copy + file_system = gcsfilesystem.GCSFileSystem() + file_system.copy(sources, destinations) + + gcsio_mock.copy.assert_called_once_with( + 'gs://bucket/from1', 'gs://bucket/to1') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_copy_file_error(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + sources = ['gs://bucket/from1'] + destinations = ['gs://bucket/to1'] + + exception = IOError('Failed') + gcsio_mock.copy.side_effect = exception + + # Issue batch rename. + expected_results = {(s, d):exception for s, d in zip(sources, destinations)} + + # Issue batch copy. + file_system = gcsfilesystem.GCSFileSystem() + with self.assertRaises(BeamIOError) as error: + file_system.copy(sources, destinations) + self.assertEqual(error.exception.message, 'Copy operation failed') + self.assertEqual(error.exception.exception_details, expected_results) + + gcsio_mock.copy.assert_called_once_with( + 'gs://bucket/from1', 'gs://bucket/to1') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_copy_tree(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + sources = ['gs://bucket1/'] + destinations = ['gs://bucket2/'] + + # Issue directory copy + file_system = gcsfilesystem.GCSFileSystem() + file_system.copy(sources, destinations) + + gcsio_mock.copytree.assert_called_once_with( + 'gs://bucket1/', 'gs://bucket2/') + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_rename(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + sources = [ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ] + destinations = [ + 'gs://bucket/to1', + 'gs://bucket/to2', + 'gs://bucket/to3', + ] + gcsio_mock.copy_batch.side_effect = [[ + ('gs://bucket/from1', 'gs://bucket/to1', None), + ('gs://bucket/from2', 'gs://bucket/to2', None), + ('gs://bucket/from3', 'gs://bucket/to3', None), + ]] + gcsio_mock.delete_batch.side_effect = [[ + ('gs://bucket/from1', None), + ('gs://bucket/from2', None), + ('gs://bucket/from3', None), + ]] + + # Issue batch rename. + file_system = gcsfilesystem.GCSFileSystem() + file_system.rename(sources, destinations) + + gcsio_mock.copy_batch.assert_called_once_with([ + ('gs://bucket/from1', 'gs://bucket/to1'), + ('gs://bucket/from2', 'gs://bucket/to2'), + ('gs://bucket/from3', 'gs://bucket/to3'), + ]) + gcsio_mock.delete_batch.assert_called_once_with([ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ]) + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_rename_error(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + sources = [ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ] + destinations = [ + 'gs://bucket/to1', + 'gs://bucket/to2', + 'gs://bucket/to3', + ] + exception = IOError('Failed') + gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]] + gcsio_mock.copy_batch.side_effect = [[ + ('gs://bucket/from1', 'gs://bucket/to1', None), + ('gs://bucket/from2', 'gs://bucket/to2', None), + ('gs://bucket/from3', 'gs://bucket/to3', None), + ]] + + # Issue batch rename. + expected_results = {(s, d):exception for s, d in zip(sources, destinations)} + + # Issue batch rename. + file_system = gcsfilesystem.GCSFileSystem() + with self.assertRaises(BeamIOError) as error: + file_system.rename(sources, destinations) + self.assertEqual(error.exception.message, 'Rename operation failed') + self.assertEqual(error.exception.exception_details, expected_results) + + gcsio_mock.copy_batch.assert_called_once_with([ + ('gs://bucket/from1', 'gs://bucket/to1'), + ('gs://bucket/from2', 'gs://bucket/to2'), + ('gs://bucket/from3', 'gs://bucket/to3'), + ]) + gcsio_mock.delete_batch.assert_called_once_with([ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ]) + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_delete(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + files = [ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ] + + # Issue batch delete. + file_system = gcsfilesystem.GCSFileSystem() + file_system.delete(files) + gcsio_mock.delete_batch.assert_called() + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_delete_error(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + exception = IOError('Failed') + gcsio_mock.delete_batch.side_effect = exception + files = [ + 'gs://bucket/from1', + 'gs://bucket/from2', + 'gs://bucket/from3', + ] + expected_results = {f:exception for f in files} + + # Issue batch delete. + file_system = gcsfilesystem.GCSFileSystem() + with self.assertRaises(BeamIOError) as error: + file_system.delete(files) + self.assertEqual(error.exception.message, 'Delete operation failed') + self.assertEqual(error.exception.exception_details, expected_results) + gcsio_mock.delete_batch.assert_called() http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 020c38f..285e272 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -31,20 +31,20 @@ import re import threading import traceback -import apitools.base.py.transfer as transfer -from apitools.base.py.batch import BatchApiRequest -from apitools.base.py.exceptions import HttpError - -from apache_beam.internal.gcp import auth from apache_beam.utils import retry # Issue a friendlier error message if the storage library is not available. # TODO(silviuc): Remove this guard when storage is available everywhere. try: # pylint: disable=wrong-import-order, wrong-import-position + # pylint: disable=ungrouped-imports + import apitools.base.py.transfer as transfer + from apitools.base.py.batch import BatchApiRequest + from apitools.base.py.exceptions import HttpError + from apache_beam.internal.gcp import auth from apache_beam.io.gcp.internal.clients import storage except ImportError: - raise RuntimeError( + raise ImportError( 'Google Cloud Storage I/O not supported for this execution environment ' '(could not import storage API client).') http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/gcp/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index a852689..c028f0d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -28,12 +28,11 @@ import unittest import httplib2 import mock -from apache_beam.io import gcsio -from apache_beam.io.gcp.internal.clients import storage - # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position try: + from apache_beam.io.gcp import gcsio + from apache_beam.io.gcp.internal.clients import storage from apitools.base.py.exceptions import HttpError except ImportError: HttpError = None @@ -295,7 +294,7 @@ class TestGCSIO(unittest.TestCase): self.assertFalse( gcsio.parse_gcs_path(file_name) in self.client.objects.files) - @mock.patch('apache_beam.io.gcsio.BatchApiRequest') + @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest') def test_delete_batch(self, *unused_args): gcsio.BatchApiRequest = FakeBatchApiRequest file_name_pattern = 'gs://gcsio-test/delete_me_%d' @@ -346,7 +345,7 @@ class TestGCSIO(unittest.TestCase): self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent', 'gs://gcsio-test/non-existent-destination') - @mock.patch('apache_beam.io.gcsio.BatchApiRequest') + @mock.patch('apache_beam.io.gcp.gcsio.BatchApiRequest') def test_copy_batch(self, *unused_args): gcsio.BatchApiRequest = FakeBatchApiRequest from_name_pattern = 'gs://gcsio-test/copy_me_%d' http://git-wip-us.apache.org/repos/asf/beam/blob/1bc240bc/sdks/python/apache_beam/io/iobase.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 057f853..512824b 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -591,8 +591,8 @@ class Sink(HasDisplayData): single record from the bundle and ``close()`` which is called once at the end of writing a bundle. - See also ``beam.io.fileio.FileSink`` which provides a simpler API for writing - sinks that produce files. + See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API + for writing sinks that produce files. **Execution of the Write transform**
