Repository: beam Updated Branches: refs/heads/master 94c9e3817 -> 8479094c2
[BEAM-1222] Chunk size should be FS dependent Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f3fd354 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f3fd354 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f3fd354 Branch: refs/heads/master Commit: 6f3fd3545c793eb9cdb2dbd75a4a6e680f1cb7ec Parents: 94c9e38 Author: Sourabh Bajaj <[email protected]> Authored: Mon Apr 10 14:09:48 2017 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Wed Apr 12 15:11:55 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 10 +++++----- sdks/python/apache_beam/io/filesystem.py | 1 + sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 4 +++- 3 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index f33942a..b128dc5 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -31,7 +31,6 @@ from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems_util import get_filesystem from apache_beam.transforms.display import DisplayDataItem -MAX_BATCH_OPERATION_SIZE = 100 DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN' @@ -244,6 +243,7 @@ class FileSink(iobase.Sink): source_files = [] destination_files = [] + chunk_size = self._file_system.CHUNK_SIZE for shard_num, shard in enumerate(writer_results): final_name = ''.join([ self.file_path_prefix, self.shard_name_format % dict( @@ -252,12 +252,12 @@ class FileSink(iobase.Sink): source_files.append(shard) destination_files.append(final_name) - source_file_batch = [source_files[i:i + MAX_BATCH_OPERATION_SIZE] + source_file_batch = [source_files[i:i + chunk_size] for i in xrange(0, len(source_files), - MAX_BATCH_OPERATION_SIZE)] - destination_file_batch = [destination_files[i:i + MAX_BATCH_OPERATION_SIZE] + chunk_size)] + destination_file_batch = [destination_files[i:i + chunk_size] for i in xrange(0, len(destination_files), - MAX_BATCH_OPERATION_SIZE)] + chunk_size)] logging.info( 'Starting finalize_write threads with num_shards: %d, ' http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 85c7f06..3a71ac1 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -414,6 +414,7 @@ class FileSystem(object): the correct file system based on the provided file pattern scheme. """ __metaclass__ = abc.ABCMeta + CHUNK_SIZE = 1 # Chuck size in the batch operations @staticmethod def _get_compression_type(path, compression_type): http://git-wip-us.apache.org/repos/asf/beam/blob/6f3fd354/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index d79630f..b2bc809 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -31,6 +31,8 @@ class GCSFileSystem(FileSystem): """A GCS ``FileSystem`` implementation for accessing files on GCS. """ + CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations + def mkdirs(self, path): """Recursively create directories for the provided path. @@ -174,7 +176,7 @@ class GCSFileSystem(FileSystem): gcs_current_batch = [] for src, dest in zip(source_file_names, destination_file_names): gcs_current_batch.append((src, dest)) - if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: + if len(gcs_current_batch) == self.CHUNK_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] if gcs_current_batch:
