Repository: beam Updated Branches: refs/heads/master 78a8d7d6d -> f138b3569
Replace gsutil with storage API calls This commit replaces the `gsutil cp` call with GCS API call when staging resources. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32f218db Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32f218db Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32f218db Branch: refs/heads/master Commit: 32f218dbddea5e904ab325ce9e4165b01e4d30d7 Parents: 78a8d7d Author: David Volquartz Lebech <[email protected]> Authored: Mon Mar 27 12:18:18 2017 +0200 Committer: Ahmet Altay <[email protected]> Committed: Tue Mar 28 15:14:01 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/gcsio.py | 1 - .../runners/dataflow/internal/dependency.py | 22 +++++++++++++++++--- 2 files changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/32f218db/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 285e272..0a10094 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -754,7 +754,6 @@ class GcsBufferedWriter(object): self.path = path self.mode = mode self.bucket, self.name = parse_gcs_path(path) - self.mode = mode self.closed = False self.position = 0 http://git-wip-us.apache.org/repos/asf/beam/blob/32f218db/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 60630e9..22de5c6 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -52,6 +52,7 @@ TODO(silviuc): Should we allow several setup packages? TODO(silviuc): We should allow customizing the exact command for setup build. """ +import functools import glob import logging import os @@ -87,9 +88,24 @@ def _dependency_file_copy(from_path, to_path): """Copies a local file to a GCS file or vice versa.""" logging.info('file copy from %s to %s.', from_path, to_path) if from_path.startswith('gs://') or to_path.startswith('gs://'): - command_args = ['gsutil', '-m', '-q', 'cp', from_path, to_path] - logging.info('Executing command: %s', command_args) - processes.check_call(command_args) + from apache_beam.io.gcp import gcsio + if from_path.startswith('gs://') and to_path.startswith('gs://'): + # Both files are GCS files so copy. + gcsio.GcsIO().copy(from_path, to_path) + elif to_path.startswith('gs://'): + # Only target is a GCS file, read local file and upload. + with open(from_path, 'rb') as f: + with gcsio.GcsIO().open(to_path, mode='wb') as g: + pfun = functools.partial(f.read, gcsio.WRITE_CHUNK_SIZE) + for chunk in iter(pfun, ''): + g.write(chunk) + else: + # Source is a GCS file but target is local file. + with gcsio.GcsIO().open(from_path, mode='rb') as g: + with open(to_path, 'wb') as f: + pfun = functools.partial(g.read, gcsio.DEFAULT_READ_BUFFER_SIZE) + for chunk in iter(pfun, ''): + f.write(chunk) else: # Branch used only for unit tests and integration tests. # In such environments GCS support is not available.
