Repository: beam Updated Branches: refs/heads/master 1dce98f07 -> a67019739
[BEAM-1988] Migrate from utils.path to BFS Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b7bbc2d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b7bbc2d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b7bbc2d Branch: refs/heads/master Commit: 9b7bbc2dbce01605fa4a4e8ddebaf2bc648d6f9b Parents: 1dce98f Author: Sourabh Bajaj <[email protected]> Authored: Fri Apr 21 16:39:34 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Sun Apr 23 12:54:46 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 5 +- .../apache_beam/io/gcp/gcsfilesystem_test.py | 10 ++- .../runners/dataflow/internal/apiclient.py | 8 ++- .../runners/dataflow/internal/dependency.py | 23 ++++--- .../dataflow/internal/dependency_test.py | 5 +- sdks/python/apache_beam/utils/__init__.py | 4 -- sdks/python/apache_beam/utils/path.py | 46 ------------- sdks/python/apache_beam/utils/path_test.py | 70 -------------------- 8 files changed, 31 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 99f27f8..fdc4757 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -46,10 +46,7 @@ class GCSFileSystem(FileSystem): raise ValueError('Basepath %r must be GCS path.', basepath) path = basepath for p in paths: - if path == '' or path.endswith('/'): - path += p - else: - path += '/' + p + path = path.rstrip('/') + '/' + p.lstrip('/') return path def mkdirs(self, path): http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index d6a8fd7..0669bf2 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -42,8 +42,16 @@ class GCSFileSystemTest(unittest.TestCase): file_system.join('gs://bucket/path', 'to', 'file')) self.assertEqual('gs://bucket/path/to/file', file_system.join('gs://bucket/path', 'to/file')) - self.assertEqual('gs://bucket/path//to/file', + self.assertEqual('gs://bucket/path/to/file', file_system.join('gs://bucket/path', '/to/file')) + self.assertEqual('gs://bucket/path/to/file', + file_system.join('gs://bucket/path/', 'to', 'file')) + self.assertEqual('gs://bucket/path/to/file', + file_system.join('gs://bucket/path/', 'to/file')) + self.assertEqual('gs://bucket/path/to/file', + file_system.join('gs://bucket/path/', '/to/file')) + with self.assertRaises(ValueError): + file_system.join('/bucket/path/', '/to/file') @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiples(self, mock_gcsio): http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 24e1129..d95b33f 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -30,9 +30,9 @@ from datetime import datetime from apitools.base.py import encoding from apitools.base.py import exceptions -from apache_beam import utils from apache_beam.internal.gcp.auth import get_service_credentials from apache_beam.internal.gcp.json_value import to_json_value +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.io.gcp.internal.clients import storage from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal.clients import dataflow @@ -336,10 +336,12 @@ class Job(object): # for GCS staging locations where the potential for such clashes is high. if self.google_cloud_options.staging_location.startswith('gs://'): path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time()) - self.google_cloud_options.staging_location = utils.path.join( + filesystem = get_filesystem(self.google_cloud_options.staging_location) + self.google_cloud_options.staging_location = filesystem.join( self.google_cloud_options.staging_location, path_suffix) - self.google_cloud_options.temp_location = utils.path.join( + self.google_cloud_options.temp_location = filesystem.join( self.google_cloud_options.temp_location, path_suffix) + self.proto = dataflow.Job(name=self.google_cloud_options.job_name) if self.options.view_as(StandardOptions).streaming: self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 1f28b26..bb490f3 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -61,10 +61,9 @@ import shutil import sys import tempfile - -from apache_beam import utils from apache_beam import version as beam_version from apache_beam.internal import pickler +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.runners.dataflow.internal import names from apache_beam.utils import processes from apache_beam.utils.pipeline_options import GoogleCloudOptions @@ -158,6 +157,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, name patterns. """ resources = [] + staging_filesystem = get_filesystem(staging_location) staging_temp_dir = None local_packages = [] for package in extra_packages: @@ -190,13 +190,14 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, local_packages.append(package) if staging_temp_dir: + temp_fs = get_filesystem(staging_temp_dir) local_packages.extend( - [utils.path.join(staging_temp_dir, f) for f in os.listdir( + [temp_fs.join(staging_temp_dir, f) for f in os.listdir( staging_temp_dir)]) for package in local_packages: basename = os.path.basename(package) - staged_path = utils.path.join(staging_location, basename) + staged_path = staging_filesystem.join(staging_location, basename) file_copy(package, staged_path) resources.append(basename) # Create a file containing the list of extra packages and stage it. @@ -209,7 +210,7 @@ def _stage_extra_packages(extra_packages, staging_location, temp_dir, with open(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), 'wt') as f: for package in local_packages: f.write('%s\n' % os.path.basename(package)) - staged_path = utils.path.join(staging_location, EXTRA_PACKAGES_FILE) + staged_path = staging_filesystem.join(staging_location, EXTRA_PACKAGES_FILE) # Note that the caller of this function is responsible for deleting the # temporary folder where all temp files are created, including this one. file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path) @@ -284,13 +285,15 @@ def stage_job_resources( raise RuntimeError( 'The --temp_location option must be specified.') + filesystem = get_filesystem(google_cloud_options.staging_location) + # Stage a requirements file if present. if setup_options.requirements_file is not None: if not os.path.isfile(setup_options.requirements_file): raise RuntimeError('The file %s cannot be found. It was specified in the ' '--requirements_file command line option.' % setup_options.requirements_file) - staged_path = utils.path.join(google_cloud_options.staging_location, + staged_path = filesystem.join(google_cloud_options.staging_location, REQUIREMENTS_FILE) file_copy(setup_options.requirements_file, staged_path) resources.append(REQUIREMENTS_FILE) @@ -305,7 +308,7 @@ def stage_job_resources( populate_requirements_cache( setup_options.requirements_file, requirements_cache_path) for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): - file_copy(pkg, utils.path.join(google_cloud_options.staging_location, + file_copy(pkg, filesystem.join(google_cloud_options.staging_location, os.path.basename(pkg))) resources.append(os.path.basename(pkg)) @@ -324,7 +327,7 @@ def stage_job_resources( 'setup.py instead of %s' % setup_options.setup_file) tarball_file = _build_setup_package(setup_options.setup_file, temp_dir, build_setup_args) - staged_path = utils.path.join(google_cloud_options.staging_location, + staged_path = filesystem.join(google_cloud_options.staging_location, WORKFLOW_TARBALL_FILE) file_copy(tarball_file, staged_path) resources.append(WORKFLOW_TARBALL_FILE) @@ -344,7 +347,7 @@ def stage_job_resources( pickled_session_file = os.path.join(temp_dir, names.PICKLED_MAIN_SESSION_FILE) pickler.dump_session(pickled_session_file) - staged_path = utils.path.join(google_cloud_options.staging_location, + staged_path = filesystem.join(google_cloud_options.staging_location, names.PICKLED_MAIN_SESSION_FILE) file_copy(pickled_session_file, staged_path) resources.append(names.PICKLED_MAIN_SESSION_FILE) @@ -359,7 +362,7 @@ def stage_job_resources( else: stage_tarball_from_remote_location = False - staged_path = utils.path.join(google_cloud_options.staging_location, + staged_path = filesystem.join(google_cloud_options.staging_location, names.DATAFLOW_SDK_TARBALL_FILE) if stage_tarball_from_remote_location: # If --sdk_location is not specified then the appropriate package http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 545bcd6..24f65d0 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -23,7 +23,7 @@ import shutil import tempfile import unittest -from apache_beam import utils +from apache_beam.io.filesystems_util import get_filesystem from apache_beam.runners.dataflow.internal import dependency from apache_beam.runners.dataflow.internal import names from apache_beam.utils.pipeline_options import GoogleCloudOptions @@ -241,7 +241,8 @@ class SetupTest(unittest.TestCase): def file_copy(from_path, to_path): if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): self.assertEqual(expected_from_path, from_path) - self.assertEqual(utils.path.join(expected_to_dir, + filesystem = get_filesystem(expected_to_dir) + self.assertEqual(filesystem.join(expected_to_dir, names.DATAFLOW_SDK_TARBALL_FILE), to_path) if from_path.startswith('gs://') or to_path.startswith('gs://'): http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/__init__.py b/sdks/python/apache_beam/utils/__init__.py index 20ff35d..74cf45d 100644 --- a/sdks/python/apache_beam/utils/__init__.py +++ b/sdks/python/apache_beam/utils/__init__.py @@ -16,7 +16,3 @@ # """A package containing utilities.""" - -# We must import path here to support the pattern of referencing utils.path -# without needing to explicitly import apache_beam.utils.path. -import path http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/path.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/path.py b/sdks/python/apache_beam/utils/path.py deleted file mode 100644 index 86dc4db..0000000 --- a/sdks/python/apache_beam/utils/path.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""Utilities for dealing with file paths.""" - -import os - - -def join(path, *paths): - """Joins given path pieces with the appropriate separator. - - This function is useful for joining parts of a path that could at times refer - to either a GCS path or a local path. In particular, this is useful for - ensuring Windows compatibility as on Windows, the GCS path separator is - different from the separator for local paths. - - Use os.path.join instead if a path always refers to a local path. - - Args: - path: First part of path to join. If this part starts with 'gs:/', the GCS - separator will be used in joining this path. - *paths: Remaining part(s) of path to join. - - Returns: - Pieces joined by the appropriate path separator. - """ - if path.startswith('gs:/'): - # Note that we explicitly choose not to use posixpath.join() here, since - # that function has the undesirable behavior of having, for example, - # posixpath.join('gs://bucket/path', '/to/file') return '/to/file' instead - # of the slightly less surprising result 'gs://bucket/path//to/file'. - return '/'.join((path,) + paths) - return os.path.join(path, *paths) http://git-wip-us.apache.org/repos/asf/beam/blob/9b7bbc2d/sdks/python/apache_beam/utils/path_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/path_test.py b/sdks/python/apache_beam/utils/path_test.py deleted file mode 100644 index e59eca0..0000000 --- a/sdks/python/apache_beam/utils/path_test.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -"""Unit tests for the path module.""" - -import unittest - - -import mock - -from apache_beam.utils import path - - -def _gen_fake_join(separator): - """Returns a callable that joins paths with the given separator.""" - - def _join(first_path, *paths): - return separator.join((first_path,) + paths) - - return _join - - -class Path(unittest.TestCase): - - def setUp(self): - pass - - @mock.patch('apache_beam.utils.path.os') - def test_gcs_path(self, *unused_mocks): - # Test joining of GCS paths when os.path.join uses Windows-style separator. - path.os.path.join.side_effect = _gen_fake_join('\\') - self.assertEqual('gs://bucket/path/to/file', - path.join('gs://bucket/path', 'to', 'file')) - self.assertEqual('gs://bucket/path/to/file', - path.join('gs://bucket/path', 'to/file')) - self.assertEqual('gs://bucket/path//to/file', - path.join('gs://bucket/path', '/to/file')) - - @mock.patch('apache_beam.utils.path.os') - def test_unix_path(self, *unused_mocks): - # Test joining of Unix paths. - path.os.path.join.side_effect = _gen_fake_join('/') - self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to', 'file')) - self.assertEqual('/tmp/path/to/file', path.join('/tmp/path', 'to/file')) - - @mock.patch('apache_beam.utils.path.os') - def test_windows_path(self, *unused_mocks): - # Test joining of Windows paths. - path.os.path.join.side_effect = _gen_fake_join('\\') - self.assertEqual(r'C:\tmp\path\to\file', - path.join(r'C:\tmp\path', 'to', 'file')) - self.assertEqual(r'C:\tmp\path\to\file', - path.join(r'C:\tmp\path', r'to\file')) - - -if __name__ == '__main__': - unittest.main()
