This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 073f77d [BEAM-4711] fix globbing in LocalFileSystem.delete (#5863)
073f77d is described below
commit 073f77df021c452bea03ee21eb61fc5331b61c14
Author: Ryan Williams <[email protected]>
AuthorDate: Tue Sep 18 17:52:39 2018 -0400
[BEAM-4711] fix globbing in LocalFileSystem.delete (#5863)
---
sdks/python/apache_beam/io/localfilesystem.py | 13 +-
sdks/python/apache_beam/io/localfilesystem_test.py | 189 +++++++++++++++++++++
2 files changed, 201 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/io/localfilesystem.py
b/sdks/python/apache_beam/io/localfilesystem.py
index 73d9a4d..ddd5022 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -321,11 +321,22 @@ class LocalFileSystem(FileSystem):
raise IOError(err)
exceptions = {}
- for path in paths:
+
+ def try_delete(path):
try:
_delete_path(path)
except Exception as e: # pylint: disable=broad-except
exceptions[path] = e
+ for match_result in self.match(paths):
+ metadata_list = match_result.metadata_list
+
+ if not metadata_list:
+ exceptions[match_result.pattern] = \
+ IOError('No files found to delete under: %s' % match_result.pattern)
+
+ for metadata in match_result.metadata_list:
+ try_delete(metadata.path)
+
if exceptions:
raise BeamIOError("Delete operation failed", exceptions)
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py
b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5d032db..bc45e42 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -291,6 +291,195 @@ class LocalFileSystemTest(unittest.TestCase):
self.assertEquals(self.fs.checksum(path1), str(5))
self.assertEquals(self.fs.checksum(path2), str(3))
+ def make_tree(self, path, value, expected_leaf_count=None):
+ """Create a file+directory structure from a simple dict-based DSL
+
+ :param path: root path to create directories+files under
+ :param value: a specification of what ``path`` should contain: ``None`` to
+ make it an empty directory, a string literal to make it a file with those
+ contents, and a ``dict`` to make it a non-empty directory and recurse
+ :param expected_leaf_count: only be set at the top of a recursive call
+ stack; after the whole tree has been created, verify the presence and
+ number of all files+directories, as a sanity check
+ """
+ if value is None:
+ # empty directory
+ os.makedirs(path)
+ elif isinstance(value, str):
+ # file with string-literal contents
+ dir = os.path.dirname(path)
+ if not os.path.exists(dir):
+ os.makedirs(dir)
+ with open(path, 'a') as f:
+ f.write(value)
+ elif isinstance(value, dict):
+ # recurse to create a subdirectory tree
+ for basename, v in value.items():
+ self.make_tree(
+ os.path.join(path, basename),
+ v
+ )
+ else:
+ raise Exception(
+ 'Unexpected value in tempdir tree: %s' % value
+ )
+
+ if expected_leaf_count != None:
+ self.assertEqual(
+ self.check_tree(path, value),
+ expected_leaf_count
+ )
+
+ def check_tree(self, path, value, expected_leaf_count=None):
+ """Verify a directory+file structure according to the rules described in
+ ``make_tree``
+
+ :param path: path to check under
+ :param value: DSL-representation of expected files+directories under
+ ``path``
+ :return: number of leaf files/directories that were verified
+ """
+ actual_leaf_count = None
+ if value is None:
+ # empty directory
+ self.assertTrue(os.path.exists(path), msg=path)
+ self.assertEqual(os.listdir(path), [])
+ actual_leaf_count = 1
+ elif isinstance(value, str):
+ # file with string-literal contents
+ with open(path, 'r') as f:
+ self.assertEqual(f.read(), value, msg=path)
+
+ actual_leaf_count = 1
+ elif isinstance(value, dict):
+ # recurse to check subdirectory tree
+ actual_leaf_count = sum(
+ [
+ self.check_tree(
+ os.path.join(path, basename),
+ v
+ )
+ for basename, v in value.items()
+ ]
+ )
+ else:
+ raise Exception(
+ 'Unexpected value in tempdir tree: %s' % value
+ )
+
+ if expected_leaf_count != None:
+ self.assertEqual(actual_leaf_count, expected_leaf_count)
+
+ return actual_leaf_count
+
+ _test_tree = {
+ 'path1': '111',
+ 'path2': {
+ '2': '222',
+ 'emptydir': None
+ },
+ 'aaa': {
+ 'b1': 'b1',
+ 'b2': None,
+ 'bbb': {
+ 'ccc': {
+ 'ddd': 'DDD'
+ }
+ },
+ 'c': None
+ }
+ }
+
+ def test_delete_globs(self):
+ dir = os.path.join(self.tmpdir, 'dir')
+ self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+ self.fs.delete([
+ os.path.join(dir, 'path*'),
+ os.path.join(dir, 'aaa/b*')
+ ])
+
+ # One empty nested directory is left
+ self.check_tree(
+ dir,
+ {
+ 'aaa': {
+ 'c': None
+ }
+ },
+ expected_leaf_count=1
+ )
+
+ def test_recursive_delete(self):
+ dir = os.path.join(self.tmpdir, 'dir')
+ self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+ self.fs.delete([dir])
+
+ self.check_tree(
+ self.tmpdir,
+ {'': None},
+ expected_leaf_count=1
+ )
+
+ def test_delete_glob_errors(self):
+ dir = os.path.join(self.tmpdir, 'dir')
+ self.make_tree(dir, self._test_tree, expected_leaf_count=7)
+
+ with self.assertRaisesRegexp(BeamIOError,
+ r'^Delete operation failed') as error:
+ self.fs.delete([
+ os.path.join(dir, 'path*'),
+ os.path.join(dir, 'aaa/b*'),
+ os.path.join(dir, 'aaa/d*') # doesn't match anything, will raise
+ ])
+
+ self.check_tree(
+ dir,
+ {
+ 'aaa': {
+ 'c': None
+ }
+ },
+ expected_leaf_count=1
+ )
+
+ self.assertEqual(
+ list(
+ error
+ .exception
+ .exception_details
+ .keys()
+ ),
+ [os.path.join(dir, 'aaa/d*')]
+ )
+
+ with self.assertRaisesRegexp(BeamIOError,
+ r'^Delete operation failed') as error:
+ self.fs.delete([
+ os.path.join(dir, 'path*') # doesn't match anything, will raise
+ ])
+
+ self.check_tree(
+ dir,
+ {
+ 'aaa': {
+ 'c': None
+ }
+ },
+ expected_leaf_count=1
+ )
+
+ self.assertEqual(
+ list(
+ error
+ .exception
+ .exception_details
+ .keys()
+ ),
+ [os.path.join(dir, 'path*')]
+ )
+
def test_delete(self):
path1 = os.path.join(self.tmpdir, 'f1')