Repository: beam Updated Branches: refs/heads/master 6fed1779d -> 1d2000d8c
Fix the staging directory path in copying from GCS Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/171a9930 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/171a9930 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/171a9930 Branch: refs/heads/master Commit: 171a993044d97c42f027e1ec44436a3b8af7c32f Parents: 6fed177 Author: Sourabh Bajaj <[email protected]> Authored: Tue Jun 6 12:55:42 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Jun 6 16:43:58 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/internal/dependency.py | 7 ++++++- .../runners/dataflow/internal/dependency_test.py | 14 +++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 3a0ff46..e656600 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -181,7 +181,12 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) logging.info('Downloading extra package: %s locally before staging', package) - _dependency_file_copy(package, staging_temp_dir) + if os.path.isfile(staging_temp_dir): + local_file_path = staging_temp_dir + else: + _, last_component = FileSystems.split(package) + local_file_path = FileSystems.join(staging_temp_dir, last_component) + _dependency_file_copy(package, local_file_path) else: raise RuntimeError( 'The file %s cannot be found. It was specified in the ' http://git-wip-us.apache.org/repos/asf/beam/blob/171a9930/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 5eac7d6..e555b69 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -31,6 +31,16 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +# Protect against environments where GCS library is not available. +# pylint: disable=wrong-import-order, wrong-import-position +try: + from apitools.base.py.exceptions import HttpError +except ImportError: + HttpError = None +# pylint: enable=wrong-import-order, wrong-import-position + + [email protected](HttpError is None, 'GCP dependencies are not installed') class SetupTest(unittest.TestCase): def update_options(self, options): @@ -369,7 +379,9 @@ class SetupTest(unittest.TestCase): if from_path.startswith('gs://'): gcs_copied_files.append(from_path) _, from_name = os.path.split(from_path) - self.create_temp_file(os.path.join(to_path, from_name), 'nothing') + if os.path.isdir(to_path): + to_path = os.path.join(to_path, from_name) + self.create_temp_file(to_path, 'nothing') logging.info('Fake copied GCS file: %s to %s', from_path, to_path) elif to_path.startswith('gs://'): logging.info('Faking file_copy(%s, %s)', from_path, to_path)
