Repository: beam
Updated Branches:
  refs/heads/master bebee2a72 -> a2047acdb


[BEAM-1988] Add join operation to the filesystem


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82dcfc6f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82dcfc6f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82dcfc6f

Branch: refs/heads/master
Commit: 82dcfc6ffea46bc1dc5f12b3d8365af98caf7a94
Parents: bebee2a
Author: Sourabh Bajaj <[email protected]>
Authored: Tue Apr 18 16:18:38 2017 -0700
Committer: Chamikara Jayalath <[email protected]>
Committed: Fri Apr 21 16:18:40 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filesystem.py        | 12 +++++++
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 19 +++++++++++
 .../apache_beam/io/gcp/gcsfilesystem_test.py    |  9 ++++++
 sdks/python/apache_beam/io/localfilesystem.py   | 11 +++++++
 .../apache_beam/io/localfilesystem_test.py      | 33 ++++++++++++++++++--
 5 files changed, 82 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 3a71ac1..591d0b0 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -426,6 +426,18 @@ class FileSystem(object):
     return compression_type
 
   @abc.abstractmethod
+  def join(self, basepath, *paths):
+    """Join two or more pathname components for the filesystem
+
+    Args:
+      basepath: string path of the first component of the path
+      paths: path components to be added
+
+    Returns: full path after combining all the passed components
+    """
+    raise NotImplementedError
+
+  @abc.abstractmethod
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index a10a3d2..99f27f8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -33,6 +33,25 @@ class GCSFileSystem(FileSystem):
 
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
 
+  def join(self, basepath, *paths):
+    """Join two or more pathname components for the filesystem
+
+    Args:
+      basepath: string path of the first component of the path
+      paths: path components to be added
+
+    Returns: full path after combining all the passed components
+    """
+    if not basepath.startswith('gs://'):
+      raise ValueError('Basepath %r must be GCS path.', basepath)
+    path = basepath
+    for p in paths:
+      if path == '' or path.endswith('/'):
+        path += p
+      else:
+        path += '/' + p
+    return path
+
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 5a1f10d..d6a8fd7 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -36,6 +36,15 @@ except ImportError:
 @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
 class GCSFileSystemTest(unittest.TestCase):
 
+  def test_join(self):
+    file_system = gcsfilesystem.GCSFileSystem()
+    self.assertEqual('gs://bucket/path/to/file',
+                     file_system.join('gs://bucket/path', 'to', 'file'))
+    self.assertEqual('gs://bucket/path/to/file',
+                     file_system.join('gs://bucket/path', 'to/file'))
+    self.assertEqual('gs://bucket/path//to/file',
+                     file_system.join('gs://bucket/path', '/to/file'))
+
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples(self, mock_gcsio):
     # Prepare mocks.

http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
index 7637f2a..fbb65bf 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -34,6 +34,17 @@ class LocalFileSystem(FileSystem):
   """A Local ``FileSystem`` implementation for accessing files on disk.
   """
 
+  def join(self, basepath, *paths):
+    """Join two or more pathname components for the filesystem
+
+    Args:
+      basepath: string path of the first component of the path
+      paths: path components to be added
+
+    Returns: full path after combining all the passed components
+    """
+    return os.path.join(basepath, *paths)
+
   def mkdirs(self, path):
     """Recursively create directories for the provided path.
 

http://git-wip-us.apache.org/repos/asf/beam/blob/82dcfc6f/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py 
b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5eacbc2..3fe308d 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -24,19 +24,48 @@ import filecmp
 import os
 import shutil
 import tempfile
+import mock
+
 from apache_beam.io.filesystem import BeamIOError
-from apache_beam.io.localfilesystem import LocalFileSystem
+from apache_beam.io import localfilesystem
+
+
+def _gen_fake_join(separator):
+  """Returns a callable that joins paths with the given separator."""
+
+  def _join(first_path, *paths):
+    return separator.join((first_path,) + paths)
+
+  return _join
 
 
 class LocalFileSystemTest(unittest.TestCase):
 
   def setUp(self):
     self.tmpdir = tempfile.mkdtemp()
-    self.fs = LocalFileSystem()
+    self.fs = localfilesystem.LocalFileSystem()
 
   def tearDown(self):
     shutil.rmtree(self.tmpdir)
 
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_unix_path_join(self, *unused_mocks):
+    # Test joining of Unix paths.
+    localfilesystem.os.path.join.side_effect = _gen_fake_join('/')
+    self.assertEqual('/tmp/path/to/file',
+                     self.fs.join('/tmp/path', 'to', 'file'))
+    self.assertEqual('/tmp/path/to/file',
+                     self.fs.join('/tmp/path', 'to/file'))
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_windows_path_join(self, *unused_mocks):
+    # Test joining of Windows paths.
+    localfilesystem.os.path.join.side_effect = _gen_fake_join('\\')
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     self.fs.join(r'C:\tmp\path', 'to', 'file'))
+    self.assertEqual(r'C:\tmp\path\to\file',
+                     self.fs.join(r'C:\tmp\path', r'to\file'))
+
   def test_mkdirs(self):
     path = os.path.join(self.tmpdir, 't1/t2')
     self.fs.mkdirs(path)

Reply via email to