Improve size estimation speed for file samples

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9caaea0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9caaea0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9caaea0f

Branch: refs/heads/python-sdk
Commit: 9caaea0f15131ecc64c56ce579361094edc50ae5
Parents: 739a431
Author: Sourabh Bajaj <sourabhba...@google.com>
Authored: Wed Nov 30 12:18:04 2016 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Dec 1 09:10:06 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9caaea0f/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index 14eaf27..14c2b06 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -188,11 +188,12 @@ class FileBasedSource(iobase.BoundedSource):
 
   def estimate_size(self):
     file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
+    # We're reading very few files so we can pass names file names to
+    # _estimate_sizes_of_files without pattern as otherwise we'll try to do
+    # optimization based on the pattern and might end up reading much more
+    # data than needed for a few files.
     if (len(file_names) <=
         FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
-      # We're reading very few files so we can pass names without pattern
-      # as otherwise we'll try to do optimization based on the pattern and
-      # might end up reading much more data than needed for a few files.
       return sum(self._estimate_sizes_of_files(file_names))
     else:
       # Estimating size of a random sample.
@@ -202,10 +203,8 @@ class FileBasedSource(iobase.BoundedSource):
                         int(len(file_names) *
                             FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
       sample = random.sample(file_names, sample_size)
-      estimate = self._estimate_sizes_of_files(sample, self._pattern)
-      return int(
-          sum(estimate) *
-          (float(len(file_names)) / len(sample)))
+      estimate = self._estimate_sizes_of_files(sample)
+      return int(sum(estimate) * (float(len(file_names)) / len(sample)))
 
   def read(self, range_tracker):
     return self._get_concat_source().read(range_tracker)

Reply via email to