Repository: beam
Updated Branches:
  refs/heads/master 661c06652 -> 0a8ac3528


[BEAM-778] Fix the Compressed file seek tests on windows


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aaae9d77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aaae9d77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aaae9d77

Branch: refs/heads/master
Commit: aaae9d776e958d6d891c2d2c635d0164f07132a1
Parents: 661c066
Author: Sourabh Bajaj <[email protected]>
Authored: Thu Apr 6 16:23:33 2017 -0700
Committer: [email protected] <[email protected]>
Committed: Fri Apr 7 11:10:41 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filesystem.py      |   2 +-
 sdks/python/apache_beam/io/filesystem_test.py | 242 ++++++++++-----------
 2 files changed, 118 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aaae9d77/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index e6c3c29..85c7f06 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -259,7 +259,7 @@ class CompressedFile(object):
 
   @property
   def seekable(self):
-    return self._file.mode == 'r'
+    return 'r' in self._file.mode
 
   def _clear_read_buffer(self):
     """Clears the read buffer by removing all the contents and

http://git-wip-us.apache.org/repos/asf/beam/blob/aaae9d77/sdks/python/apache_beam/io/filesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index 168925d..607393d 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -17,48 +17,23 @@
 #
 
 """Unit tests for filesystem module."""
-import shutil
+import bz2
+import gzip
 import os
 import unittest
 import tempfile
-import bz2
-import gzip
 from StringIO import StringIO
 
 from apache_beam.io.filesystem import CompressedFile, CompressionTypes
 
 
-class _TestCaseWithTempDirCleanUp(unittest.TestCase):
+class TestCompressedFile(unittest.TestCase):
   """Base class for TestCases that deals with TempDir clean-up.
 
   Inherited test cases will call self._new_tempdir() to start a temporary dir
   which will be deleted at the end of the tests (when tearDown() is called).
   """
 
-  def setUp(self):
-    self._tempdirs = []
-
-  def tearDown(self):
-    for path in self._tempdirs:
-      if os.path.exists(path):
-        shutil.rmtree(path)
-    self._tempdirs = []
-
-  def _new_tempdir(self):
-    result = tempfile.mkdtemp()
-    self._tempdirs.append(result)
-    return result
-
-  def _create_temp_file(self, name='', suffix=''):
-    if not name:
-      name = tempfile.template
-    file_name = tempfile.NamedTemporaryFile(
-        delete=False, prefix=name,
-        dir=self._new_tempdir(), suffix=suffix).name
-    return file_name
-
-
-class TestCompressedFile(_TestCaseWithTempDirCleanUp):
   content = """- the BEAM -
 How things really are we would like to know.
 Does
@@ -72,9 +47,21 @@ atomized in instants hammered around the
   # in compressed file and not just in the internal buffer
   read_block_size = 4
 
-  def _create_compressed_file(self, compression_type, content,
-                              name='', suffix=''):
-    file_name = self._create_temp_file(name, suffix)
+  def setUp(self):
+    self._tempfiles = []
+
+  def tearDown(self):
+    for path in self._tempfiles:
+      if os.path.exists(path):
+        os.remove(path)
+
+  def _create_temp_file(self):
+    path = tempfile.NamedTemporaryFile(delete=False).name
+    self._tempfiles.append(path)
+    return path
+
+  def _create_compressed_file(self, compression_type, content):
+    file_name = self._create_temp_file()
 
     if compression_type == CompressionTypes.BZIP2:
       compress_factory = bz2.BZ2File
@@ -83,139 +70,144 @@ atomized in instants hammered around the
     else:
       assert False, "Invalid compression type: %s" % compression_type
 
-    with compress_factory(file_name, 'w') as f:
+    with compress_factory(file_name, 'wb') as f:
       f.write(content)
 
     return file_name
 
   def test_seekable_enabled_on_read(self):
-    readable = CompressedFile(open(self._create_temp_file(), 'r'))
-    self.assertTrue(readable.seekable)
+    with open(self._create_temp_file(), 'rb') as f:
+      readable = CompressedFile(f)
+      self.assertTrue(readable.seekable)
 
   def test_seekable_disabled_on_write(self):
-    writeable = CompressedFile(open(self._create_temp_file(), 'w'))
-    self.assertFalse(writeable.seekable)
+    with open(self._create_temp_file(), 'wb') as f:
+      writeable = CompressedFile(f)
+      self.assertFalse(writeable.seekable)
 
   def test_seekable_disabled_on_append(self):
-    writeable = CompressedFile(open(self._create_temp_file(), 'a'))
-    self.assertFalse(writeable.seekable)
+    with open(self._create_temp_file(), 'ab') as f:
+      writeable = CompressedFile(f)
+      self.assertFalse(writeable.seekable)
 
   def test_seek_set(self):
     for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
-
-      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
-                                     read_size=self.read_block_size)
-      reference_fd = StringIO(self.content)
-
-      # Note: content (readline) check must come before position (tell) check
-      # because cStringIO's tell() reports out of bound positions (if we seek
-      # beyond the file) up until a real read occurs.
-      # _CompressedFile.tell() always stays within the bounds of the
-      # uncompressed content.
-      for seek_position in (-1, 0, 1,
-                            len(self.content)-1, len(self.content),
-                            len(self.content) + 1):
-        compressed_fd.seek(seek_position, os.SEEK_SET)
-        reference_fd.seek(seek_position, os.SEEK_SET)
-
-        uncompressed_line = compressed_fd.readline()
-        reference_line = reference_fd.readline()
-        self.assertEqual(uncompressed_line, reference_line)
-
-        uncompressed_position = compressed_fd.tell()
-        reference_position = reference_fd.tell()
-        self.assertEqual(uncompressed_position, reference_position)
+      with open(file_name, 'rb') as f:
+        compressed_fd = CompressedFile(f, compression_type,
+                                       read_size=self.read_block_size)
+        reference_fd = StringIO(self.content)
+
+        # Note: content (readline) check must come before position (tell) check
+        # because cStringIO's tell() reports out of bound positions (if we seek
+        # beyond the file) up until a real read occurs.
+        # _CompressedFile.tell() always stays within the bounds of the
+        # uncompressed content.
+        for seek_position in (-1, 0, 1,
+                              len(self.content)-1, len(self.content),
+                              len(self.content) + 1):
+          compressed_fd.seek(seek_position, os.SEEK_SET)
+          reference_fd.seek(seek_position, os.SEEK_SET)
+
+          uncompressed_line = compressed_fd.readline()
+          reference_line = reference_fd.readline()
+          self.assertEqual(uncompressed_line, reference_line)
+
+          uncompressed_position = compressed_fd.tell()
+          reference_position = reference_fd.tell()
+          self.assertEqual(uncompressed_position, reference_position)
 
   def test_seek_cur(self):
     for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
-
-      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
-                                     read_size=self.read_block_size)
-      reference_fd = StringIO(self.content)
-
-      # Test out of bound, inbound seeking in both directions
-      for seek_position in (-1, 0, 1,
-                            len(self.content) / 2,
-                            len(self.content) / 2,
-                            -1 * len(self.content) / 2):
-        compressed_fd.seek(seek_position, os.SEEK_CUR)
-        reference_fd.seek(seek_position, os.SEEK_CUR)
-
-        uncompressed_line = compressed_fd.readline()
-        expected_line = reference_fd.readline()
-        self.assertEqual(uncompressed_line, expected_line)
-
-        reference_position = reference_fd.tell()
-        uncompressed_position = compressed_fd.tell()
-        self.assertEqual(uncompressed_position, reference_position)
+      with open(file_name, 'rb') as f:
+        compressed_fd = CompressedFile(f, compression_type,
+                                       read_size=self.read_block_size)
+        reference_fd = StringIO(self.content)
+
+        # Test out of bound, inbound seeking in both directions
+        for seek_position in (-1, 0, 1,
+                              len(self.content) / 2,
+                              len(self.content) / 2,
+                              -1 * len(self.content) / 2):
+          compressed_fd.seek(seek_position, os.SEEK_CUR)
+          reference_fd.seek(seek_position, os.SEEK_CUR)
+
+          uncompressed_line = compressed_fd.readline()
+          expected_line = reference_fd.readline()
+          self.assertEqual(uncompressed_line, expected_line)
+
+          reference_position = reference_fd.tell()
+          uncompressed_position = compressed_fd.tell()
+          self.assertEqual(uncompressed_position, reference_position)
 
   def test_read_from_end_returns_no_data(self):
     for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
+      with open(file_name, 'rb') as f:
+        compressed_fd = CompressedFile(f, compression_type,
+                                       read_size=self.read_block_size)
 
-      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
-                                     read_size=self.read_block_size)
-
-      seek_position = 0
-      compressed_fd.seek(seek_position, os.SEEK_END)
+        seek_position = 0
+        compressed_fd.seek(seek_position, os.SEEK_END)
 
-      expected_data = ''
-      uncompressed_data = compressed_fd.read(10)
+        expected_data = ''
+        uncompressed_data = compressed_fd.read(10)
 
-      self.assertEqual(uncompressed_data, expected_data)
+        self.assertEqual(uncompressed_data, expected_data)
 
   def test_seek_outside(self):
     for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
+      with open(file_name, 'rb') as f:
+        compressed_fd = CompressedFile(f, compression_type,
+                                       read_size=self.read_block_size)
 
-      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
-                                     read_size=self.read_block_size)
-
-      for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END):
-        seek_position = -1 * len(self.content) - 10
-        compressed_fd.seek(seek_position, whence)
+        for whence in (os.SEEK_CUR, os.SEEK_SET, os.SEEK_END):
+          seek_position = -1 * len(self.content) - 10
+          compressed_fd.seek(seek_position, whence)
 
-        expected_position = 0
-        uncompressed_position = compressed_fd.tell()
-        self.assertEqual(uncompressed_position, expected_position)
+          expected_position = 0
+          uncompressed_position = compressed_fd.tell()
+          self.assertEqual(uncompressed_position, expected_position)
 
-        seek_position = len(self.content) + 20
-        compressed_fd.seek(seek_position, whence)
+          seek_position = len(self.content) + 20
+          compressed_fd.seek(seek_position, whence)
 
-        expected_position = len(self.content)
-        uncompressed_position = compressed_fd.tell()
-        self.assertEqual(uncompressed_position, expected_position)
+          expected_position = len(self.content)
+          uncompressed_position = compressed_fd.tell()
+          self.assertEqual(uncompressed_position, expected_position)
 
   def test_read_and_seek_back_to_beginning(self):
     for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
-      compressed_fd = CompressedFile(open(file_name, 'r'), compression_type,
-                                     read_size=self.read_block_size)
+      with open(file_name, 'rb') as f:
+        compressed_fd = CompressedFile(f, compression_type,
+                                       read_size=self.read_block_size)
 
-      first_pass = compressed_fd.readline()
-      compressed_fd.seek(0, os.SEEK_SET)
-      second_pass = compressed_fd.readline()
+        first_pass = compressed_fd.readline()
+        compressed_fd.seek(0, os.SEEK_SET)
+        second_pass = compressed_fd.readline()
 
-      self.assertEqual(first_pass, second_pass)
+        self.assertEqual(first_pass, second_pass)
 
   def test_tell(self):
     lines = ['line%d\n' % i for i in range(10)]
     tmpfile = self._create_temp_file()
-    writeable = CompressedFile(open(tmpfile, 'w'))
-    current_offset = 0
-    for line in lines:
-      writeable.write(line)
-      current_offset += len(line)
-      self.assertEqual(current_offset, writeable.tell())
-
-    writeable.close()
-    readable = CompressedFile(open(tmpfile))
-    current_offset = 0
-    while True:
-      line = readable.readline()
-      current_offset += len(line)
-      self.assertEqual(current_offset, readable.tell())
-      if not line:
-        break
+    with open(tmpfile, 'w') as f:
+      writeable = CompressedFile(f)
+      current_offset = 0
+      for line in lines:
+        writeable.write(line)
+        current_offset += len(line)
+        self.assertEqual(current_offset, writeable.tell())
+
+    with open(tmpfile) as f:
+      readable = CompressedFile(f)
+      current_offset = 0
+      while True:
+        line = readable.readline()
+        current_offset += len(line)
+        self.assertEqual(current_offset, readable.tell())
+        if not line:
+          break

Reply via email to