This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d903dcf [BEAM-4747] mkdirs if they don't exist in localfilesystem
(#5903)
d903dcf is described below
commit d903dcfc7ff7300355688b08b779da880f15fe9d
Author: Ryan Williams <[email protected]>
AuthorDate: Fri Jul 27 19:46:19 2018 -0400
[BEAM-4747] mkdirs if they don't exist in localfilesystem (#5903)
* mkdirs if they don't exist in localfilesystem
* make localfilesystem create ancestor directories for output paths
---
sdks/python/apache_beam/io/localfilesystem.py | 11 ++++++++
sdks/python/apache_beam/io/localfilesystem_test.py | 33 ++++++++++++++++++++--
2 files changed, 42 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/io/localfilesystem.py
b/sdks/python/apache_beam/io/localfilesystem.py
index 23a1e8a..e373ad0 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -144,6 +144,9 @@ class LocalFileSystem(FileSystem):
Returns: file handle with a close function for the user to use
"""
+ parent = os.path.dirname(path)
+ if not os.path.exists(parent):
+ os.makedirs(parent)
return self._path_open(path, 'wb', mime_type, compression_type)
def open(self, path, mime_type='application/octet-stream',
@@ -185,6 +188,10 @@ class LocalFileSystem(FileSystem):
if os.path.isdir(source):
shutil.copytree(source, destination)
else:
+ parent = os.path.dirname(destination)
+ if not os.path.exists(parent):
+ os.makedirs(parent)
+
shutil.copy2(source, destination)
except OSError as err:
raise IOError(err)
@@ -217,6 +224,10 @@ class LocalFileSystem(FileSystem):
def _rename_file(source, destination):
"""Rename a single file object"""
try:
+ parent = os.path.dirname(destination)
+ if not os.path.exists(parent):
+ os.makedirs(parent)
+
os.rename(source, destination)
except OSError as err:
raise IOError(err)
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py
b/sdks/python/apache_beam/io/localfilesystem_test.py
index d6d8eb4..5d032db 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -196,8 +196,8 @@ class LocalFileSystemTest(unittest.TestCase):
[(path1, path2)])
def test_copy_directory(self):
- path_t1 = os.path.join(self.tmpdir, 't1')
- path_t2 = os.path.join(self.tmpdir, 't2')
+ path_t1 = os.path.join(self.tmpdir, 't1/11')
+ path_t2 = os.path.join(self.tmpdir, 't2/22')
self.fs.mkdirs(path_t1)
self.fs.mkdirs(path_t2)
@@ -209,6 +209,19 @@ class LocalFileSystemTest(unittest.TestCase):
self.fs.copy([path_t1], [path_t2])
self.assertTrue(filecmp.cmp(path1, path2))
+ def test_create_mkdirs_open(self):
+ path = os.path.join(self.tmpdir, 't1/t2/t3')
+ with self.fs.create(path) as f:
+ f.write("yay")
+
+ with self.fs.open(path) as f:
+ self.assertEqual(f.read(), "yay")
+
+ def test_open_error(self):
+ path = os.path.join(self.tmpdir, 't1')
+ with self.assertRaisesRegexp(IOError, r'No such file or directory'):
+ self.fs.open(path)
+
def test_rename(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')
@@ -244,6 +257,22 @@ class LocalFileSystemTest(unittest.TestCase):
self.assertTrue(self.fs.exists(path2))
self.assertFalse(self.fs.exists(path1))
+ def test_rename_mkdirs(self):
+ path_t1 = os.path.join(self.tmpdir, 't1')
+ path_t2 = os.path.join(self.tmpdir, 't2/t3/t4')
+ self.fs.mkdirs(path_t1)
+
+ path1 = os.path.join(path_t1, 'f1')
+ path2 = os.path.join(path_t2, 'f1')
+ with open(path1, 'a') as f:
+ f.write('Hello')
+
+ self.fs.rename([path_t1], [path_t2])
+ self.assertTrue(self.fs.exists(path_t2))
+ self.assertFalse(self.fs.exists(path_t1))
+ self.assertTrue(self.fs.exists(path2))
+ self.assertFalse(self.fs.exists(path1))
+
def test_exists(self):
path1 = os.path.join(self.tmpdir, 'f1')
path2 = os.path.join(self.tmpdir, 'f2')