Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b83f12b9f -> 3a0f01c8e


Fixes a couple of issues of FileBasedSource.

(1) Updates code so that a user-specified coder properly gets set to 
sub-sources.

(2) Currently each SingleFileSource takes a reference to FileBasedSource while  
FileBasedSource takes a reference to Concatsource.  ConcatSource has a 
reference to list of SingleFileSources. This results in quadratic space 
complexity when serializing splits of a FileBasedSource. This CL fixes this 
issue by making sure that FileBasedSource is cloned before taking a reference 
to  ConcatSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93c5233a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93c5233a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93c5233a

Branch: refs/heads/python-sdk
Commit: 93c5233a1bf28e9b13412b909c2ee877bd6cf635
Parents: b83f12b
Author: Chamikara Jayalath <chamik...@google.com>
Authored: Thu Nov 17 19:18:26 2016 -0800
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Nov 18 13:33:33 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py     | 14 ++++++++++----
 .../python/apache_beam/io/filebasedsource_test.py | 18 +++++++++++++++++-
 2 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index c7bc27e..7d8f686 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -109,6 +109,12 @@ class FileBasedSource(iobase.BoundedSource):
       file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
       sizes = FileBasedSource._estimate_sizes_in_parallel(file_names)
 
+      # We create a reference for FileBasedSource that will be serialized along
+      # with each _SingleFileSource. To prevent this FileBasedSource from 
having
+      # a reference to ConcatSource (resulting in quadratic space complexity)
+      # we clone it here.
+      file_based_source_ref = pickler.loads(pickler.dumps(self))
+
       for index, file_name in enumerate(file_names):
         if sizes[index] == 0:
           continue  # Ignoring empty file.
@@ -123,7 +129,7 @@ class FileBasedSource(iobase.BoundedSource):
             splittable = False
 
         single_file_source = _SingleFileSource(
-            self, file_name,
+            file_based_source_ref, file_name,
             0,
             sizes[index],
             min_bundle_size=self._min_bundle_size,
@@ -194,9 +200,6 @@ class FileBasedSource(iobase.BoundedSource):
     return self._get_concat_source().get_range_tracker(start_position,
                                                        stop_position)
 
-  def default_output_coder(self):
-    return self._get_concat_source().default_output_coder()
-
   def read_records(self, file_name, offset_range_tracker):
     """Returns a generator of records created by reading file 'file_name'.
 
@@ -315,3 +318,6 @@ class _SingleFileSource(iobase.BoundedSource):
 
   def read(self, range_tracker):
     return self._file_based_source.read_records(self._file_name, range_tracker)
+
+  def default_output_coder(self):
+    return self._file_based_source.default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py 
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7f4d8d3..a455cd3 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -533,6 +533,23 @@ class TestFileBasedSource(unittest.TestCase):
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
+  def test_splits_get_coder_from_fbs(self):
+    class DummyCoder(object):
+      val = 12345
+
+    class FileBasedSourceWithCoder(LineSource):
+
+      def default_output_coder(self):
+        return DummyCoder()
+
+    pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12])
+    self.assertEqual(200, len(expected_data))
+    fbs = FileBasedSourceWithCoder(pattern)
+    splits = [split for split in fbs.split(desired_bundle_size=50)]
+    self.assertTrue(len(splits))
+    for split in splits:
+      self.assertEqual(DummyCoder.val, split.source.default_output_coder().val)
+
 
 class TestSingleFileSource(unittest.TestCase):
 
@@ -685,7 +702,6 @@ class TestSingleFileSource(unittest.TestCase):
       read_data.extend(data_from_split)
     self.assertItemsEqual(expected_data[2:9], read_data)
 
-
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to