Repository: beam Updated Branches: refs/heads/master bebee2a72 -> a2047acdb
[BEAM-1988] Add join operation to the filesystem Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82dcfc6f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82dcfc6f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82dcfc6f Branch: refs/heads/master Commit: 82dcfc6ffea46bc1dc5f12b3d8365af98caf7a94 Parents: bebee2a Author: Sourabh Bajaj <[email protected]> Authored: Tue Apr 18 16:18:38 2017 -0700 Committer: Chamikara Jayalath <[email protected]> Committed: Fri Apr 21 16:18:40 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filesystem.py | 12 +++++++ sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 19 +++++++++++ .../apache_beam/io/gcp/gcsfilesystem_test.py | 9 ++++++ sdks/python/apache_beam/io/localfilesystem.py | 11 +++++++ .../apache_beam/io/localfilesystem_test.py | 33 ++++++++++++++++++-- 5 files changed, 82 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 3a71ac1..591d0b0 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -426,6 +426,18 @@ class FileSystem(object): return compression_type @abc.abstractmethod + def join(self, basepath, *paths): + """Join two or more pathname components for the filesystem + + Args: + basepath: string path of the first component of the path + paths: path components to be added + + Returns: full path after combining all the passed components + """ + raise NotImplementedError + + @abc.abstractmethod def mkdirs(self, path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index a10a3d2..99f27f8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -33,6 +33,25 @@ class GCSFileSystem(FileSystem): CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE # Chuck size in batch operations + def join(self, basepath, *paths): + """Join two or more pathname components for the filesystem + + Args: + basepath: string path of the first component of the path + paths: path components to be added + + Returns: full path after combining all the passed components + """ + if not basepath.startswith('gs://'): + raise ValueError('Basepath %r must be GCS path.', basepath) + path = basepath + for p in paths: + if path == '' or path.endswith('/'): + path += p + else: + path += '/' + p + return path + def mkdirs(self, path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 5a1f10d..d6a8fd7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -36,6 +36,15 @@ except ImportError: @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed') class GCSFileSystemTest(unittest.TestCase): + def test_join(self): + file_system = gcsfilesystem.GCSFileSystem() + self.assertEqual('gs://bucket/path/to/file', + file_system.join('gs://bucket/path', 'to', 'file')) + self.assertEqual('gs://bucket/path/to/file', + file_system.join('gs://bucket/path', 'to/file')) + self.assertEqual('gs://bucket/path//to/file', + file_system.join('gs://bucket/path', '/to/file')) + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiples(self, mock_gcsio): # Prepare mocks. http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/localfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 7637f2a..fbb65bf 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -34,6 +34,17 @@ class LocalFileSystem(FileSystem): """A Local ``FileSystem`` implementation for accessing files on disk. """ + def join(self, basepath, *paths): + """Join two or more pathname components for the filesystem + + Args: + basepath: string path of the first component of the path + paths: path components to be added + + Returns: full path after combining all the passed components + """ + return os.path.join(basepath, *paths) + def mkdirs(self, path): """Recursively create directories for the provided path. http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/localfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py index 5eacbc2..3fe308d 100644 --- a/sdks/python/apache_beam/io/localfilesystem_test.py +++ b/sdks/python/apache_beam/io/localfilesystem_test.py @@ -24,19 +24,48 @@ import filecmp import os import shutil import tempfile +import mock + from apache_beam.io.filesystem import BeamIOError -from apache_beam.io.localfilesystem import LocalFileSystem +from apache_beam.io import localfilesystem + + +def _gen_fake_join(separator): + """Returns a callable that joins paths with the given separator.""" + + def _join(first_path, *paths): + return separator.join((first_path,) + paths) + + return _join class LocalFileSystemTest(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() - self.fs = LocalFileSystem() + self.fs = localfilesystem.LocalFileSystem() def tearDown(self): shutil.rmtree(self.tmpdir) + @mock.patch('apache_beam.io.localfilesystem.os') + def test_unix_path_join(self, *unused_mocks): + # Test joining of Unix paths. + localfilesystem.os.path.join.side_effect = _gen_fake_join('/') + self.assertEqual('/tmp/path/to/file', + self.fs.join('/tmp/path', 'to', 'file')) + self.assertEqual('/tmp/path/to/file', + self.fs.join('/tmp/path', 'to/file')) + + @mock.patch('apache_beam.io.localfilesystem.os') + def test_windows_path_join(self, *unused_mocks): + # Test joining of Windows paths. + localfilesystem.os.path.join.side_effect = _gen_fake_join('\\') + self.assertEqual(r'C:\tmp\path\to\file', + self.fs.join(r'C:\tmp\path', 'to', 'file')) + self.assertEqual(r'C:\tmp\path\to\file', + self.fs.join(r'C:\tmp\path', r'to\file')) + def test_mkdirs(self): path = os.path.join(self.tmpdir, 't1/t2') self.fs.mkdirs(path)
