[ 
https://issues.apache.org/jira/browse/BEAM-4011?focusedWorklogId=91574&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91574
 ]

ASF GitHub Bot logged work on BEAM-4011:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Apr/18 02:20
            Start Date: 17/Apr/18 02:20
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #5024: [BEAM-4011] 
Unify Python IO glob implementation.
URL: https://github.com/apache/beam/pull/5024
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 3f7e9aba847..929b7ef5607 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -21,8 +21,11 @@
 import abc
 import bz2
 import cStringIO
+import fnmatch
 import logging
 import os
+import posixpath
+import re
 import time
 import zlib
 
@@ -498,9 +501,49 @@ def mkdirs(self, path):
     raise NotImplementedError
 
   @abc.abstractmethod
+  def has_dirs(self):
+    """Whether this FileSystem supports directories."""
+    raise NotImplementedError
+
+  @abc.abstractmethod
+  def _list(self, dir_or_prefix):
+    """List files in a location.
+
+    Listing is non-recursive (for filesystems that support directories).
+
+    Args:
+      dir_or_prefix: (string) A directory or location prefix (for filesystems
+        that don't have directories).
+
+    Returns:
+      Generator of ``FileMetadata`` objects.
+
+    Raises:
+      ``BeamIOError`` if listing fails, but not if no files were found.
+    """
+    raise NotImplementedError
+
+  @staticmethod
+  def _url_dirname(url_or_path):
+    """Like posixpath.dirname, but preserves scheme:// prefix.
+
+    Args:
+      url_or_path: A string in the form of scheme://some/path OR /some/path.
+    """
+    match = re.match(r'([a-z]+://)(.*)', url_or_path)
+    if match is None:
+      return posixpath.dirname(url_or_path)
+    url_prefix, path = match.groups()
+    return url_prefix + posixpath.dirname(path)
+
   def match(self, patterns, limits=None):
     """Find all matching paths to the patterns provided.
 
+    Pattern matching is done using fnmatch.fnmatch.
+    For filesystems that have directories, matching is not recursive. Patterns
+    like scheme://path/*/foo will not match anything.
+    Patterns ending with '/' will be appended with '*'.
+
     Args:
       patterns: list of string for the file path pattern to match against
       limits: list of maximum number of responses that need to be fetched
@@ -510,7 +553,52 @@ def match(self, patterns, limits=None):
     Raises:
       ``BeamIOError`` if any of the pattern match operations fail
     """
-    raise NotImplementedError
+    if limits is None:
+      limits = [None] * len(patterns)
+    else:
+      err_msg = "Patterns and limits should be equal in length"
+      assert len(patterns) == len(limits), err_msg
+
+    def _match(pattern, limit):
+      """Find all matching paths to the pattern provided."""
+      if pattern.endswith('/'):
+        pattern += '*'
+      # Get the part of the pattern before the first globbing character.
+      # For example scheme://path/foo* will become scheme://path/foo for
+      # filesystems like GCS, or converted to scheme://path for filesystems 
with
+      # directories.
+      prefix_or_dir = re.match('^[^[*?]*', pattern).group(0)
+
+      file_metadatas = []
+      if prefix_or_dir == pattern:
+        # Short-circuit calling self.list() if there's no glob pattern to 
match.
+        if self.exists(pattern):
+          file_metadatas = [FileMetadata(pattern, self.size(pattern))]
+      else:
+        if self.has_dirs():
+          prefix_or_dir = self._url_dirname(prefix_or_dir)
+        file_metadatas = self._list(prefix_or_dir)
+
+      metadata_list = []
+      for file_metadata in file_metadatas:
+        if limit is not None and len(metadata_list) >= limit:
+          break
+        if fnmatch.fnmatch(file_metadata.path, pattern):
+          metadata_list.append(file_metadata)
+
+      return MatchResult(pattern, metadata_list)
+
+    exceptions = {}
+    result = []
+    for pattern, limit in zip(patterns, limits):
+      try:
+        result.append(_match(pattern, limit))
+      except Exception as e:  # pylint: disable=broad-except
+        exceptions[pattern] = e
+
+    if exceptions:
+      raise BeamIOError("Match operation failed", exceptions)
+    return result
 
   @abc.abstractmethod
   def create(self, path, mime_type='application/octet-stream',
@@ -579,6 +667,19 @@ def exists(self, path):
     raise NotImplementedError
 
   @abc.abstractmethod
+  def size(self, path):
+    """Get size of path on the FileSystem.
+
+    Args:
+      path: string path in question.
+
+    Returns: int size of path according to the FileSystem.
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    raise NotImplementedError
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index 68e9df59a7b..f50f25e79e5 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -27,6 +27,162 @@
 
 from apache_beam.io.filesystem import CompressedFile
 from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystem import FileMetadata
+from apache_beam.io.filesystem import FileSystem
+
+
+class TestingFileSystem(FileSystem):
+
+  def __init__(self, pipeline_options, has_dirs=False):
+    super(TestingFileSystem, self).__init__(pipeline_options)
+    self._has_dirs = has_dirs
+    self._files = {}
+
+  @classmethod
+  def scheme(cls):
+    # Required for FileSystems.get_filesystem().
+    return 'test'
+
+  def join(self, basepath, *paths):
+    raise NotImplementedError
+
+  def split(self, path):
+    raise NotImplementedError
+
+  def mkdirs(self, path):
+    raise NotImplementedError
+
+  def has_dirs(self):
+    return self._has_dirs
+
+  def _insert_random_file(self, path, size):
+    self._files[path] = size
+
+  def _list(self, dir_or_prefix):
+    for path, size in self._files.iteritems():
+      if path.startswith(dir_or_prefix):
+        yield FileMetadata(path, size)
+
+  def create(self, path, mime_type='application/octet-stream',
+             compression_type=CompressionTypes.AUTO):
+    raise NotImplementedError
+
+  def open(self, path, mime_type='application/octet-stream',
+           compression_type=CompressionTypes.AUTO):
+    raise NotImplementedError
+
+  def copy(self, source_file_names, destination_file_names):
+    raise NotImplementedError
+
+  def rename(self, source_file_names, destination_file_names):
+    raise NotImplementedError
+
+  def exists(self, path):
+    raise NotImplementedError
+
+  def size(self, path):
+    raise NotImplementedError
+
+  def checksum(self, path):
+    raise NotImplementedError
+
+  def delete(self, paths):
+    raise NotImplementedError
+
+
+class TestFileSystem(unittest.TestCase):
+
+  def setUp(self):
+    self.fs = TestingFileSystem(pipeline_options=None)
+
+  def _flatten_match(self, match_results):
+    return [file_metadata
+            for match_result in match_results
+            for file_metadata in match_result.metadata_list]
+
+  def test_match_glob(self):
+    bucket_name = 'gcsio-test'
+    objects = [
+        ('cow/cat/fish', 2),
+        ('cow/cat/blubber', 3),
+        ('cow/dog/blubber', 4),
+        ('apple/dog/blubber', 5),
+        ('apple/fish/blubber', 6),
+        ('apple/fish/blowfish', 7),
+        ('apple/fish/bambi', 8),
+        ('apple/fish/balloon', 9),
+        ('apple/fish/cat', 10),
+        ('apple/fish/cart', 11),
+        ('apple/fish/carl', 12),
+        ('apple/dish/bat', 13),
+        ('apple/dish/cat', 14),
+        ('apple/dish/carl', 15),
+    ]
+    for (object_name, size) in objects:
+      file_name = 'gs://%s/%s' % (bucket_name, object_name)
+      self.fs._insert_random_file(file_name, size)
+    test_cases = [
+        ('gs://*', objects),
+        ('gs://gcsio-test/*', objects),
+        ('gs://gcsio-test/cow/*', [
+            ('cow/cat/fish', 2),
+            ('cow/cat/blubber', 3),
+            ('cow/dog/blubber', 4),
+        ]),
+        ('gs://gcsio-test/cow/ca*', [
+            ('cow/cat/fish', 2),
+            ('cow/cat/blubber', 3),
+        ]),
+        ('gs://gcsio-test/apple/[df]ish/ca*', [
+            ('apple/fish/cat', 10),
+            ('apple/fish/cart', 11),
+            ('apple/fish/carl', 12),
+            ('apple/dish/cat', 14),
+            ('apple/dish/carl', 15),
+        ]),
+        ('gs://gcsio-test/apple/fish/car?', [
+            ('apple/fish/cart', 11),
+            ('apple/fish/carl', 12),
+        ]),
+        ('gs://gcsio-test/apple/fish/b*', [
+            ('apple/fish/blubber', 6),
+            ('apple/fish/blowfish', 7),
+            ('apple/fish/bambi', 8),
+            ('apple/fish/balloon', 9),
+        ]),
+        ('gs://gcsio-test/apple/f*/b*', [
+            ('apple/fish/blubber', 6),
+            ('apple/fish/blowfish', 7),
+            ('apple/fish/bambi', 8),
+            ('apple/fish/balloon', 9),
+        ]),
+        ('gs://gcsio-test/apple/dish/[cb]at', [
+            ('apple/dish/bat', 13),
+            ('apple/dish/cat', 14),
+        ]),
+    ]
+    for file_pattern, expected_object_names in test_cases:
+      expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
+                             for (object_name, size) in expected_object_names]
+      self.assertEqual(
+          set([(file_metadata.path, file_metadata.size_in_bytes)
+               for file_metadata in
+               self._flatten_match(self.fs.match([file_pattern]))]),
+          set(expected_file_names))
+
+    # Check if limits are followed correctly
+    limit = 3
+    for file_pattern, expected_object_names in test_cases:
+      expected_num_items = min(len(expected_object_names), limit)
+      self.assertEqual(
+          len(self._flatten_match(self.fs.match([file_pattern], [limit]))),
+          expected_num_items)
+
+
+class TestFileSystemWithDirs(TestFileSystem):
+
+  def setUp(self):
+    self.fs = TestingFileSystem(pipeline_options=None, has_dirs=True)
 
 
 class TestCompressedFile(unittest.TestCase):
diff --git a/sdks/python/apache_beam/io/filesystems.py 
b/sdks/python/apache_beam/io/filesystems.py
index 5bc195bb0d9..66eff061fb0 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -146,6 +146,11 @@ def mkdirs(path):
   def match(patterns, limits=None):
     """Find all matching paths to the patterns provided.
 
+    Pattern matching is done using fnmatch.fnmatch.
+    For filesystems that have directories, matching is not recursive. Patterns
+    like scheme://path/*/foo will not match anything.
+    Patterns ending with '/' will be appended with '*'.
+
     Args:
       patterns: list of string for the file path pattern to match against
       limits: list of maximum number of responses that need to be fetched
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 8a2cff5e5e3..60a0420507d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -23,7 +23,6 @@
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystem import FileMetadata
 from apache_beam.io.filesystem import FileSystem
-from apache_beam.io.filesystem import MatchResult
 from apache_beam.io.gcp import gcsio
 
 __all__ = ['GCSFileSystem']
@@ -98,45 +97,30 @@ def mkdirs(self, path):
     """
     pass
 
-  def match(self, patterns, limits=None):
-    """Find all matching paths to the pattern provided.
+  def has_dirs(self):
+    """Whether this FileSystem supports directories."""
+    return False
+
+  def _list(self, dir_or_prefix):
+    """List files in a location.
+
+    Listing is non-recursive, for filesystems that support directories.
 
     Args:
-      pattern: string for the file path pattern to match against
-      limit: Maximum number of responses that need to be fetched
+      dir_or_prefix: (string) A directory or location prefix (for filesystems
+        that don't have directories).
 
-    Returns: list of ``MatchResult`` objects.
+    Returns:
+      Generator of ``FileMetadata`` objects.
 
     Raises:
-      ``BeamIOError`` if any of the pattern match operations fail
+      ``BeamIOError`` if listing fails, but not if no files were found.
     """
-    if limits is None:
-      limits = [None] * len(patterns)
-    else:
-      err_msg = "Patterns and limits should be equal in length"
-      assert len(patterns) == len(limits), err_msg
-
-    def _match(pattern, limit):
-      """Find all matching paths to the pattern provided.
-      """
-      if pattern.endswith('/'):
-        pattern += '*'
-      file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit)
-      metadata_list = [FileMetadata(path, size)
-                       for path, size in file_sizes.iteritems()]
-      return MatchResult(pattern, metadata_list)
-
-    exceptions = {}
-    result = []
-    for pattern, limit in zip(patterns, limits):
-      try:
-        result.append(_match(pattern, limit))
-      except Exception as e:  # pylint: disable=broad-except
-        exceptions[pattern] = e
-
-    if exceptions:
-      raise BeamIOError("Match operation failed", exceptions)
-    return result
+    try:
+      for path, size in gcsio.GcsIO().list_prefix(dir_or_prefix).iteritems():
+        yield FileMetadata(path, size)
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
   def _path_open(self, path, mode, mime_type='application/octet-stream',
                  compression_type=CompressionTypes.AUTO):
@@ -265,6 +249,19 @@ def exists(self, path):
     """
     return gcsio.GcsIO().exists(path)
 
+  def size(self, path):
+    """Get size of path on the FileSystem.
+
+    Args:
+      path: string path in question.
+
+    Returns: int size of path according to the FileSystem.
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    return gcsio.GcsIO().size(path)
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py 
b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index e8a827e618a..f0d46f9620b 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -79,7 +79,7 @@ def test_match_multiples(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
-    gcsio_mock.size_of_files_in_glob.return_value = {
+    gcsio_mock.list_prefix.return_value = {
         'gs://bucket/file1': 1,
         'gs://bucket/file2': 2
     }
@@ -91,8 +91,7 @@ def test_match_multiples(self, mock_gcsio):
     self.assertEqual(
         set(match_result.metadata_list),
         expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
-        'gs://bucket/*', None)
+    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples_limit(self, mock_gcsio):
@@ -100,7 +99,7 @@ def test_match_multiples_limit(self, mock_gcsio):
     gcsio_mock = mock.MagicMock()
     limit = 1
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
-    gcsio_mock.size_of_files_in_glob.return_value = {
+    gcsio_mock.list_prefix.return_value = {
         'gs://bucket/file1': 1
     }
     expected_results = set([
@@ -113,8 +112,7 @@ def test_match_multiples_limit(self, mock_gcsio):
     self.assertEqual(
         len(match_result.metadata_list),
         limit)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
-        'gs://bucket/*', 1)
+    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples_error(self, mock_gcsio):
@@ -122,22 +120,21 @@ def test_match_multiples_error(self, mock_gcsio):
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     exception = IOError('Failed')
-    gcsio_mock.size_of_files_in_glob.side_effect = exception
-    expected_results = {'gs://bucket/': exception}
+    gcsio_mock.list_prefix.side_effect = exception
 
     with self.assertRaisesRegexp(BeamIOError,
                                  r'^Match operation failed') as error:
       self.fs.match(['gs://bucket/'])
-    self.assertEqual(error.exception.exception_details, expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
-        'gs://bucket/*', None)
+    self.assertRegexpMatches(str(error.exception.exception_details),
+                             r'gs://bucket/.*%s' % exception)
+    gcsio_mock.list_prefix.assert_called_once_with('gs://bucket/')
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiple_patterns(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
-    gcsio_mock.size_of_files_in_glob.side_effect = [
+    gcsio_mock.list_prefix.side_effect = [
         {'gs://bucket/file1': 1},
         {'gs://bucket/file2': 2},
     ]
@@ -311,6 +308,7 @@ def test_delete(self, mock_gcsio):
     # Prepare mocks.
     gcsio_mock = mock.MagicMock()
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsio_mock.size.return_value = 0
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
@@ -328,6 +326,7 @@ def test_delete_error(self, mock_gcsio):
     gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
     exception = IOError('Failed')
     gcsio_mock.delete_batch.side_effect = exception
+    gcsio_mock.size.return_value = 0
     files = [
         'gs://bucket/from1',
         'gs://bucket/from2',
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py 
b/sdks/python/apache_beam/io/gcp/gcsio.py
index f687686fd64..f953f5a46ac 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -22,7 +22,6 @@
 
 import cStringIO
 import errno
-import fnmatch
 import io
 import logging
 import multiprocessing
@@ -205,40 +204,6 @@ def open(self,
     else:
       raise ValueError('Invalid file open mode: %s.' % mode)
 
-  @retry.with_exponential_backoff(
-      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def glob(self, pattern, limit=None):
-    """Return the GCS path names matching a given path name pattern.
-
-    Path name patterns are those recognized by fnmatch.fnmatch().  The path
-    can contain glob characters (*, ?, and [...] sets).
-
-    Args:
-      pattern: GCS file path pattern in the form gs://<bucket>/<name_pattern>.
-      limit: Maximal number of path names to return.
-        All matching paths are returned if set to None.
-
-    Returns:
-      list of GCS file paths matching the given pattern.
-    """
-    bucket, name_pattern = parse_gcs_path(pattern)
-    # Get the prefix with which we can list objects in the given bucket.
-    prefix = re.match('^[^[*?]*', name_pattern).group(0)
-    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
-    object_paths = []
-    while True:
-      response = self.client.objects.List(request)
-      for item in response.items:
-        if fnmatch.fnmatch(item.name, name_pattern):
-          object_paths.append('gs://%s/%s' % (item.bucket, item.name))
-      if response.nextPageToken:
-        request.pageToken = response.nextPageToken
-        if limit is not None and len(object_paths) >= limit:
-          break
-      else:
-        break
-    return object_paths[:limit]
-
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def delete(self, path):
@@ -364,7 +329,7 @@ def copytree(self, src, dest):
     """
     assert src.endswith('/')
     assert dest.endswith('/')
-    for entry in self.glob(src + '*'):
+    for entry in self.list_prefix(src):
       rel_path = entry[len(src):]
       self.copy(entry, dest + rel_path)
 
@@ -433,15 +398,16 @@ def size(self, path):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def size_of_files_in_glob(self, pattern, limit=None):
-    """Returns the size of all the files in the glob as a dictionary
+  def list_prefix(self, path):
+    """Lists files matching the prefix.
 
     Args:
-      pattern: a file path pattern that reads the size of all the files
+      path: GCS file path pattern in the form gs://<bucket>/<name>.
+
+    Returns:
+      Dictionary of file name -> size.
     """
-    bucket, name_pattern = parse_gcs_path(pattern)
-    # Get the prefix with which we can list objects in the given bucket.
-    prefix = re.match('^[^[*?]*', name_pattern).group(0)
+    bucket, prefix = parse_gcs_path(path)
     request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
     file_sizes = {}
     counter = 0
@@ -450,23 +416,17 @@ def size_of_files_in_glob(self, pattern, limit=None):
     while True:
       response = self.client.objects.List(request)
       for item in response.items:
-        if fnmatch.fnmatch(item.name, name_pattern):
-          file_name = 'gs://%s/%s' % (item.bucket, item.name)
-          file_sizes[file_name] = item.size
-          counter += 1
-        if limit is not None and counter >= limit:
-          break
+        file_name = 'gs://%s/%s' % (item.bucket, item.name)
+        file_sizes[file_name] = item.size
+        counter += 1
         if counter % 10000 == 0:
           logging.info("Finished computing size of: %s files", len(file_sizes))
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
-        if limit is not None and len(file_sizes) >= limit:
-          break
       else:
         break
-    logging.info(
-        "Finished the size estimation of the input at %s files. " +\
-        "Estimation took %s seconds", counter, time.time() - start_time)
+    logging.info("Finished listing %s files in %s seconds.",
+                 counter, time.time() - start_time)
     return file_sizes
 
 
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py 
b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index 8404d979b52..b2bb43e30e8 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -241,6 +241,14 @@ def test_exists(self):
     self.assertFalse(self.gcs.exists(file_name + 'xyz'))
     self.assertTrue(self.gcs.exists(file_name))
 
+  def test_checksum(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+    checksum = 'deadbeef'
+    self._insert_random_file(self.client, file_name, file_size, 
crc32c=checksum)
+    self.assertTrue(self.gcs.exists(file_name))
+    self.assertEqual(checksum, self.gcs.checksum(file_name))
+
   @mock.patch.object(FakeGcsObjects, 'Get')
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
@@ -600,184 +608,37 @@ def test_context_manager(self):
       with self.gcs.open(file_name) as f:
         f.read(0 / 0)
 
-  def test_glob(self):
-    bucket_name = 'gcsio-test'
-    object_names = [
-        'cow/cat/fish',
-        'cow/cat/blubber',
-        'cow/dog/blubber',
-        'apple/dog/blubber',
-        'apple/fish/blubber',
-        'apple/fish/blowfish',
-        'apple/fish/bambi',
-        'apple/fish/balloon',
-        'apple/fish/cat',
-        'apple/fish/cart',
-        'apple/fish/carl',
-        'apple/fish/handle',
-        'apple/dish/bat',
-        'apple/dish/cat',
-        'apple/dish/carl',
-    ]
-    for object_name in object_names:
-      file_name = 'gs://%s/%s' % (bucket_name, object_name)
-      self._insert_random_file(self.client, file_name, 0)
-    test_cases = [
-        ('gs://gcsio-test/*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-            'cow/dog/blubber',
-            'apple/dog/blubber',
-            'apple/fish/blubber',
-            'apple/fish/blowfish',
-            'apple/fish/bambi',
-            'apple/fish/balloon',
-            'apple/fish/cat',
-            'apple/fish/cart',
-            'apple/fish/carl',
-            'apple/fish/handle',
-            'apple/dish/bat',
-            'apple/dish/cat',
-            'apple/dish/carl',
-        ]),
-        ('gs://gcsio-test/cow/*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-            'cow/dog/blubber',
-        ]),
-        ('gs://gcsio-test/cow/ca*', [
-            'cow/cat/fish',
-            'cow/cat/blubber',
-        ]),
-        ('gs://gcsio-test/apple/[df]ish/ca*', [
-            'apple/fish/cat',
-            'apple/fish/cart',
-            'apple/fish/carl',
-            'apple/dish/cat',
-            'apple/dish/carl',
-        ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            'apple/fish/cart',
-            'apple/fish/carl',
-        ]),
-        ('gs://gcsio-test/apple/fish/b*', [
-            'apple/fish/blubber',
-            'apple/fish/blowfish',
-            'apple/fish/bambi',
-            'apple/fish/balloon',
-        ]),
-        ('gs://gcsio-test/apple/f*/b*', [
-            'apple/fish/blubber',
-            'apple/fish/blowfish',
-            'apple/fish/bambi',
-            'apple/fish/balloon',
-        ]),
-        ('gs://gcsio-test/apple/dish/[cb]at', [
-            'apple/dish/bat',
-            'apple/dish/cat',
-        ]),
-    ]
-    for file_pattern, expected_object_names in test_cases:
-      expected_file_names = ['gs://%s/%s' % (bucket_name, o)
-                             for o in expected_object_names]
-      self.assertEqual(
-          set(self.gcs.glob(file_pattern)), set(expected_file_names))
-
-    # Check if limits are followed correctly
-    limit = 3
-    for file_pattern, expected_object_names in test_cases:
-      expected_num_items = min(len(expected_object_names), limit)
-      self.assertEqual(
-          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
-
-  def test_size_of_files_in_glob(self):
+  def test_list_prefix(self):
     bucket_name = 'gcsio-test'
-    object_names = [
+    objects = [
         ('cow/cat/fish', 2),
         ('cow/cat/blubber', 3),
         ('cow/dog/blubber', 4),
-        ('apple/dog/blubber', 5),
-        ('apple/fish/blubber', 6),
-        ('apple/fish/blowfish', 7),
-        ('apple/fish/bambi', 8),
-        ('apple/fish/balloon', 9),
-        ('apple/fish/cat', 10),
-        ('apple/fish/cart', 11),
-        ('apple/fish/carl', 12),
-        ('apple/dish/bat', 13),
-        ('apple/dish/cat', 14),
-        ('apple/dish/carl', 15),
-        ('apple/fish/handle', 16),
     ]
-    for (object_name, size) in object_names:
+    for (object_name, size) in objects:
       file_name = 'gs://%s/%s' % (bucket_name, object_name)
       self._insert_random_file(self.client, file_name, size)
     test_cases = [
-        ('gs://gcsio-test/cow/*', [
+        ('gs://gcsio-test/c', [
             ('cow/cat/fish', 2),
             ('cow/cat/blubber', 3),
             ('cow/dog/blubber', 4),
         ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-        ]),
-        ('gs://gcsio-test/*/f*/car?', [
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-        ]),
-    ]
-    for file_pattern, expected_object_names in test_cases:
-      expected_file_sizes = {'gs://%s/%s' % (bucket_name, o): s
-                             for (o, s) in expected_object_names}
-      self.assertEqual(
-          self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
-
-    # Check if limits are followed correctly
-    limit = 1
-    for file_pattern, expected_object_names in test_cases:
-      expected_num_items = min(len(expected_object_names), limit)
-      self.assertEqual(
-          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
-
-  def test_size_of_files_in_glob_limited(self):
-    bucket_name = 'gcsio-test'
-    object_names = [
-        ('cow/cat/fish', 2),
-        ('cow/cat/blubber', 3),
-        ('cow/dog/blubber', 4),
-        ('apple/dog/blubber', 5),
-        ('apple/fish/blubber', 6),
-        ('apple/fish/blowfish', 7),
-        ('apple/fish/bambi', 8),
-        ('apple/fish/balloon', 9),
-        ('apple/fish/cat', 10),
-        ('apple/fish/cart', 11),
-        ('apple/fish/carl', 12),
-        ('apple/dish/bat', 13),
-        ('apple/dish/cat', 14),
-        ('apple/dish/carl', 15),
-    ]
-    for (object_name, size) in object_names:
-      file_name = 'gs://%s/%s' % (bucket_name, object_name)
-      self._insert_random_file(self.client, file_name, size)
-    test_cases = [
-        ('gs://gcsio-test/cow/*', [
+        ('gs://gcsio-test/cow/', [
             ('cow/cat/fish', 2),
             ('cow/cat/blubber', 3),
             ('cow/dog/blubber', 4),
         ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-        ])
+        ('gs://gcsio-test/cow/cat/fish', [
+            ('cow/cat/fish', 2),
+        ]),
     ]
-    # Check if limits are followed correctly
-    limit = 1
     for file_pattern, expected_object_names in test_cases:
-      expected_num_items = min(len(expected_object_names), limit)
+      expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
+                             for (object_name, size) in expected_object_names]
       self.assertEqual(
-          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
+          set(self.gcs.list_prefix(file_pattern).iteritems()),
+          set(expected_file_names))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py 
b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 7382c3c8ade..845a12d3557 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -33,7 +33,6 @@
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystem import FileMetadata
 from apache_beam.io.filesystem import FileSystem
-from apache_beam.io.filesystem import MatchResult
 from apache_beam.options.pipeline_options import HadoopFileSystemOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 
@@ -181,42 +180,17 @@ def mkdirs(self, url):
   def _mkdirs(self, path):
     self._hdfs_client.makedirs(path)
 
-  def match(self, url_patterns, limits=None):
-    if limits is None:
-      limits = [None] * len(url_patterns)
+  def has_dirs(self):
+    return True
 
-    if len(url_patterns) != len(limits):
-      raise BeamIOError(
-          'Patterns and limits should be equal in length: %d != %d' % (
-              len(url_patterns), len(limits)))
-
-    def _match(path_pattern, limit):
-      """Find all matching paths to the pattern provided."""
-      fs = self._hdfs_client.status(path_pattern, strict=False)
-      if fs and fs[_FILE_STATUS_TYPE] == _FILE_STATUS_TYPE_FILE:
-        file_statuses = [(path_pattern, fs)][:limit]
-      else:
-        file_statuses = [(self._join(path_pattern, fs[0]), fs[1])
-                         for fs in self._hdfs_client.list(path_pattern,
-                                                          status=True)[:limit]]
-      metadata_list = [
-          FileMetadata(_HDFS_PREFIX + file_status[0],
-                       file_status[1][_FILE_STATUS_LENGTH])
-          for file_status in file_statuses]
-      return MatchResult(path_pattern, metadata_list)
-
-    exceptions = {}
-    result = []
-    for url_pattern, limit in zip(url_patterns, limits):
-      try:
-        path_pattern = self._parse_url(url_pattern)
-        result.append(_match(path_pattern, limit))
-      except Exception as e:  # pylint: disable=broad-except
-        exceptions[url_pattern] = e
-
-    if exceptions:
-      raise BeamIOError('Match operation failed', exceptions)
-    return result
+  def _list(self, url):
+    try:
+      path = self._parse_url(url)
+      for res in self._hdfs_client.list(path, status=True):
+        yield FileMetadata(_HDFS_PREFIX + self._join(path, res[0]),
+                           res[1][_FILE_STATUS_LENGTH])
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError('Match operation failed', {url: e})
 
   @staticmethod
   def _add_compression(stream, path, mime_type, compression_type):
@@ -358,6 +332,13 @@ def _exists(self, path):
     """
     return self._hdfs_client.status(path, strict=False) is not None
 
+  def size(self, url):
+    path = self._parse_url(url)
+    status = self._hdfs_client.status(path, strict=False)
+    if status is None:
+      raise BeamIOError('File not found: %s' % url)
+    return status[_FILE_STATUS_LENGTH]
+
   def checksum(self, url):
     """Fetches a checksum description for a URL.
 
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py 
b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
index 6302831ebca..59cd0b19000 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py
@@ -258,19 +258,15 @@ def test_match_file(self):
   def test_match_file_with_limits(self):
     expected_files = [self.fs.join(self.tmpdir, filename)
                       for filename in ['old_file1', 'old_file2']]
-    result = self.fs.match([self.tmpdir], [1])[0]
+    result = self.fs.match([self.tmpdir + '/'], [1])[0]
     files = [f.path for f in result.metadata_list]
     self.assertEquals(len(files), 1)
     self.assertIn(files[0], expected_files)
 
   def test_match_file_with_zero_limit(self):
-    result = self.fs.match([self.tmpdir], [0])[0]
+    result = self.fs.match([self.tmpdir + '/'], [0])[0]
     self.assertEquals(len(result.metadata_list), 0)
 
-  def test_match_file_with_bad_limit(self):
-    with self.assertRaisesRegexp(BeamIOError, r'TypeError'):
-      _ = self.fs.match([self.tmpdir], ['a'])[0]
-
   def test_match_file_empty(self):
     url = self.fs.join(self.tmpdir, 'nonexistent_file')
     result = self.fs.match([url])[0]
@@ -290,7 +286,10 @@ def test_match_directory(self):
     expected_files = [self.fs.join(self.tmpdir, filename)
                       for filename in ['old_file1', 'old_file2']]
 
-    result = self.fs.match([self.tmpdir])[0]
+    # Listing without a trailing '/' should return the directory itself and not
+    # its contents. The fake HDFS client here has a "sparse" directory
+    # structure, so listing without a '/' will return no results.
+    result = self.fs.match([self.tmpdir + '/'])[0]
     files = [f.path for f in result.metadata_list]
     self.assertItemsEqual(files, expected_files)
 
@@ -467,6 +466,12 @@ def test_exists(self):
     self.assertTrue(self.fs.exists(url1))
     self.assertFalse(self.fs.exists(url2))
 
+  def test_size(self):
+    url = self.fs.join(self.tmpdir, 'f1')
+    with self.fs.create(url) as f:
+      f.write('Hello')
+    self.assertEqual(5, self.fs.size(url))
+
   def test_checksum(self):
     url = self.fs.join(self.tmpdir, 'f1')
     with self.fs.create(url) as f:
diff --git a/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile 
b/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
index b7955e4c0f9..6a4f30737cb 100644
--- a/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
+++ b/sdks/python/apache_beam/io/hdfs_integration_test/Dockerfile
@@ -39,6 +39,6 @@ CMD holdup -t 45 http://namenode:50070 http://datanode:50075 
&& \
     sleep 45 && \
     hdfscli -v -v -v upload -f kinglear.txt / && \
     python -m apache_beam.examples.wordcount \
-        --input hdfs://kinglear.txt \
+        --input hdfs://kinglear* \
         --output hdfs://py-wordcount-integration \
         --hdfs_host namenode --hdfs_port 50070 --hdfs_user root
diff --git a/sdks/python/apache_beam/io/localfilesystem.py 
b/sdks/python/apache_beam/io/localfilesystem.py
index 34eb60e7f17..fdbedc43c29 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -18,7 +18,6 @@
 
 from __future__ import absolute_import
 
-import glob
 import os
 import shutil
 
@@ -27,7 +26,6 @@
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.filesystem import FileMetadata
 from apache_beam.io.filesystem import FileSystem
-from apache_beam.io.filesystem import MatchResult
 
 __all__ = ['LocalFileSystem']
 
@@ -79,42 +77,34 @@ def mkdirs(self, path):
     except OSError as err:
       raise IOError(err)
 
-  def match(self, patterns, limits=None):
-    """Find all matching paths to the pattern provided.
+  def has_dirs(self):
+    """Whether this FileSystem supports directories."""
+    return True
+
+  def _list(self, dir_or_prefix):
+    """List files in a location.
+
+    Listing is non-recursive, for filesystems that support directories.
 
     Args:
-      patterns: list of string for the file path pattern to match against
-      limits: list of maximum number of responses that need to be fetched
+      dir_or_prefix: (string) A directory or location prefix (for filesystems
+        that don't have directories).
 
-    Returns: list of ``MatchResult`` objects.
+    Returns:
+      Generator of ``FileMetadata`` objects.
 
     Raises:
-      ``BeamIOError`` if any of the pattern match operations fail
+      ``BeamIOError`` if listing fails, but not if no files were found.
     """
-    if limits is None:
-      limits = [None] * len(patterns)
-    else:
-      err_msg = "Patterns and limits should be equal in length"
-      assert len(patterns) == len(limits), err_msg
-
-    def _match(pattern, limit):
-      """Find all matching paths to the pattern provided.
-      """
-      files = glob.glob(pattern)
-      metadata = [FileMetadata(f, os.path.getsize(f)) for f in files[:limit]]
-      return MatchResult(pattern, metadata)
-
-    exceptions = {}
-    result = []
-    for pattern, limit in zip(patterns, limits):
-      try:
-        result.append(_match(pattern, limit))
-      except Exception as e:  # pylint: disable=broad-except
-        exceptions[pattern] = e
+    if not self.exists(dir_or_prefix):
+      return
 
-    if exceptions:
-      raise BeamIOError("Match operation failed", exceptions)
-    return result
+    try:
+      for f in os.listdir(dir_or_prefix):
+        f = self.join(dir_or_prefix, f)
+        yield FileMetadata(f, os.path.getsize(f))
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("List operation failed", {dir_or_prefix: e})
 
   def _path_open(self, path, mode, mime_type='application/octet-stream',
                  compression_type=CompressionTypes.AUTO):
@@ -235,6 +225,22 @@ def exists(self, path):
     """
     return os.path.exists(path)
 
+  def size(self, path):
+    """Get size of path on the FileSystem.
+
+    Args:
+      path: string path in question.
+
+    Returns: int size of path according to the FileSystem.
+
+    Raises:
+      ``BeamIOError`` if path doesn't exist.
+    """
+    try:
+      return os.path.getsize(path)
+    except Exception as e:  # pylint: disable=broad-except
+      raise BeamIOError("Size operation failed", {path: e})
+
   def checksum(self, path):
     """Fetch checksum metadata of a file on the
     :class:`~apache_beam.io.filesystem.FileSystem`.
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py 
b/sdks/python/apache_beam/io/localfilesystem_test.py
index 30216a404f4..faa47d083ef 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -165,6 +165,16 @@ def test_match_directory(self):
     files = [f.path for f in result.metadata_list]
     self.assertEqual(files, [self.tmpdir])
 
+  def test_match_directory_contents(self):
+    path1 = os.path.join(self.tmpdir, 'f1')
+    path2 = os.path.join(self.tmpdir, 'f2')
+    open(path1, 'a').close()
+    open(path2, 'a').close()
+
+    result = self.fs.match([self.tmpdir + '/'])[0]
+    files = [f.path for f in result.metadata_list]
+    self.assertEqual(files, [path1, path2])
+
   def test_copy(self):
     path1 = os.path.join(self.tmpdir, 'f1')
     path2 = os.path.join(self.tmpdir, 'f2')


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91574)
    Time Spent: 2.5h  (was: 2h 20m)

> Python SDK: add glob support for HDFS
> -------------------------------------
>
>                 Key: BEAM-4011
>                 URL: https://issues.apache.org/jira/browse/BEAM-4011
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to