This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d903dcf  [BEAM-4747] mkdirs if they don't exist in localfilesystem 
(#5903)
d903dcf is described below

commit d903dcfc7ff7300355688b08b779da880f15fe9d
Author: Ryan Williams <[email protected]>
AuthorDate: Fri Jul 27 19:46:19 2018 -0400

    [BEAM-4747] mkdirs if they don't exist in localfilesystem (#5903)
    
    * mkdirs if they don't exist in localfilesystem
    * make localfilesystem create ancestor directories for output paths
---
 sdks/python/apache_beam/io/localfilesystem.py      | 11 ++++++++
 sdks/python/apache_beam/io/localfilesystem_test.py | 33 ++++++++++++++++++++--
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
index 23a1e8a..e373ad0 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -144,6 +144,9 @@ class LocalFileSystem(FileSystem):
 
     Returns: file handle with a close function for the user to use
     """
+    parent = os.path.dirname(path)
+    if not os.path.exists(parent):
+      os.makedirs(parent)
     return self._path_open(path, 'wb', mime_type, compression_type)
 
   def open(self, path, mime_type='application/octet-stream',
@@ -185,6 +188,10 @@ class LocalFileSystem(FileSystem):
         if os.path.isdir(source):
           shutil.copytree(source, destination)
         else:
+          parent = os.path.dirname(destination)
+          if not os.path.exists(parent):
+            os.makedirs(parent)
+
           shutil.copy2(source, destination)
       except OSError as err:
         raise IOError(err)
@@ -217,6 +224,10 @@ class LocalFileSystem(FileSystem):
     def _rename_file(source, destination):
       """Rename a single file object"""
       try:
+        parent = os.path.dirname(destination)
+        if not os.path.exists(parent):
+          os.makedirs(parent)
+
         os.rename(source, destination)
       except OSError as err:
         raise IOError(err)
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py 
b/sdks/python/apache_beam/io/localfilesystem_test.py
index d6d8eb4..5d032db 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -196,8 +196,8 @@ class LocalFileSystemTest(unittest.TestCase):
                      [(path1, path2)])
 
   def test_copy_directory(self):
-    path_t1 = os.path.join(self.tmpdir, 't1')
-    path_t2 = os.path.join(self.tmpdir, 't2')
+    path_t1 = os.path.join(self.tmpdir, 't1/11')
+    path_t2 = os.path.join(self.tmpdir, 't2/22')
     self.fs.mkdirs(path_t1)
     self.fs.mkdirs(path_t2)
 
@@ -209,6 +209,19 @@ class LocalFileSystemTest(unittest.TestCase):
     self.fs.copy([path_t1], [path_t2])
     self.assertTrue(filecmp.cmp(path1, path2))
 
+  def test_create_mkdirs_open(self):
+    path = os.path.join(self.tmpdir, 't1/t2/t3')
+    with self.fs.create(path) as f:
+      f.write("yay")
+
+    with self.fs.open(path) as f:
+      self.assertEqual(f.read(), "yay")
+
+  def test_open_error(self):
+    path = os.path.join(self.tmpdir, 't1')
+    with self.assertRaisesRegexp(IOError, r'No such file or directory'):
+      self.fs.open(path)
+
   def test_rename(self):
     path1 = os.path.join(self.tmpdir, 'f1')
     path2 = os.path.join(self.tmpdir, 'f2')
@@ -244,6 +257,22 @@ class LocalFileSystemTest(unittest.TestCase):
     self.assertTrue(self.fs.exists(path2))
     self.assertFalse(self.fs.exists(path1))
 
+  def test_rename_mkdirs(self):
+    path_t1 = os.path.join(self.tmpdir, 't1')
+    path_t2 = os.path.join(self.tmpdir, 't2/t3/t4')
+    self.fs.mkdirs(path_t1)
+
+    path1 = os.path.join(path_t1, 'f1')
+    path2 = os.path.join(path_t2, 'f1')
+    with open(path1, 'a') as f:
+      f.write('Hello')
+
+    self.fs.rename([path_t1], [path_t2])
+    self.assertTrue(self.fs.exists(path_t2))
+    self.assertFalse(self.fs.exists(path_t1))
+    self.assertTrue(self.fs.exists(path2))
+    self.assertFalse(self.fs.exists(path1))
+
   def test_exists(self):
     path1 = os.path.join(self.tmpdir, 'f1')
     path2 = os.path.join(self.tmpdir, 'f2')

Reply via email to