This is an automated email from the git hooks/post-receive script. misterc-guest pushed a commit to branch master in repository python-bz2file.
commit f8610188fc2c01b03192544bd5740b5c5e5ab5b4 Author: Michael R. Crusoe <[email protected]> Date: Wed May 27 21:43:01 2015 -0400 Imported Upstream version 0.98 --- PKG-INFO | 85 +++++++ README.rst | 59 +++++ bz2file.py | 499 ++++++++++++++++++++++++++++++++++++++ setup.py | 34 +++ test_bz2file.py | 732 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 1409 insertions(+) diff --git a/PKG-INFO b/PKG-INFO new file mode 100644 index 0000000..90b8d79 --- /dev/null +++ b/PKG-INFO @@ -0,0 +1,85 @@ +Metadata-Version: 1.1 +Name: bz2file +Version: 0.98 +Summary: Read and write bzip2-compressed files. +Home-page: https://github.com/nvawda/bz2file +Author: Nadeem Vawda +Author-email: [email protected] +License: Apache License, Version 2.0 +Description: Bz2file is a Python library for reading and writing bzip2-compressed files. + + It contains a drop-in replacement for the file interface in the standard + library's ``bz2`` module, including features from the latest development + version of CPython that are not available in older releases. + + Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as + PyPy 2.0. + + + Features + -------- + + - Supports multi-stream files. + + - Can read from or write to any file-like object. + + - Can open files in either text or binary mode. + + - Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``, + ``readable()``, ``writable()``, ``seekable()``. + + + Installation + ------------ + + To install bz2file, run: :: + + $ pip install bz2file + + + Documentation + ------------- + + The ``open()`` function and ``BZ2File`` class in this module provide the same + features and interface as the ones in the standard library's ``bz2`` module in + the current development version of CPython, `documented here + <http://docs.python.org/dev/library/bz2.html>`_. + + + Version History + --------------- + + 0.98: 19 January 2014 + + - Added support for the 'x' family of modes. + - Ignore non-bz2 data at the end of a file, rather than raising an exception. + - Tests now pass on PyPy. + + 0.95: 08 October 2012 + + - Added the ``open()`` function. + - Improved performance when reading in small chunks. + - Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file + object, pass it as the first argument (``filename``). + + 0.9: 04 February 2012 + + - Initial release. + +Platform: UNKNOWN +Classifier: Development Status :: 4 - Beta +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: Apache Software License +Classifier: Operating System :: OS Independent +Classifier: Programming Language :: Python +Classifier: Programming Language :: Python :: 2 +Classifier: Programming Language :: Python :: 2.6 +Classifier: Programming Language :: Python :: 2.7 +Classifier: Programming Language :: Python :: 3 +Classifier: Programming Language :: Python :: 3.0 +Classifier: Programming Language :: Python :: 3.1 +Classifier: Programming Language :: Python :: 3.2 +Classifier: Programming Language :: Python :: 3.3 +Classifier: Programming Language :: Python :: 3.4 +Classifier: Topic :: Software Development :: Libraries :: Python Modules +Classifier: Topic :: System :: Archiving :: Compression diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..2e848ef --- /dev/null +++ b/README.rst @@ -0,0 +1,59 @@ +Bz2file is a Python library for reading and writing bzip2-compressed files. + +It contains a drop-in replacement for the file interface in the standard +library's ``bz2`` module, including features from the latest development +version of CPython that are not available in older releases. + +Bz2file is compatible with CPython 2.6, 2.7, and 3.0 through 3.4, as well as +PyPy 2.0. + + +Features +-------- + +- Supports multi-stream files. + +- Can read from or write to any file-like object. + +- Can open files in either text or binary mode. + +- Added methods: ``peek()``, ``read1()``, ``readinto()``, ``fileno()``, + ``readable()``, ``writable()``, ``seekable()``. + + +Installation +------------ + +To install bz2file, run: :: + + $ pip install bz2file + + +Documentation +------------- + +The ``open()`` function and ``BZ2File`` class in this module provide the same +features and interface as the ones in the standard library's ``bz2`` module in +the current development version of CPython, `documented here +<http://docs.python.org/dev/library/bz2.html>`_. + + +Version History +--------------- + +0.98: 19 January 2014 + +- Added support for the 'x' family of modes. +- Ignore non-bz2 data at the end of a file, rather than raising an exception. +- Tests now pass on PyPy. + +0.95: 08 October 2012 + +- Added the ``open()`` function. +- Improved performance when reading in small chunks. +- Removed the ``fileobj`` argument to ``BZ2File()``. To wrap an existing file + object, pass it as the first argument (``filename``). + +0.9: 04 February 2012 + +- Initial release. diff --git a/bz2file.py b/bz2file.py new file mode 100644 index 0000000..1d91195 --- /dev/null +++ b/bz2file.py @@ -0,0 +1,499 @@ +"""Module for reading and writing bzip2-compressed files. + +This module contains a backport of Python 3.4's bz2.open() function and +BZ2File class, adapted to work with earlier versions of Python. +""" + +__all__ = ["BZ2File", "open"] + +__author__ = "Nadeem Vawda <[email protected]>" + +import io +import sys +import warnings + +try: + from threading import RLock +except ImportError: + from dummy_threading import RLock + +from bz2 import BZ2Compressor, BZ2Decompressor + + +_MODE_CLOSED = 0 +_MODE_READ = 1 +_MODE_READ_EOF = 2 +_MODE_WRITE = 3 + +_BUFFER_SIZE = 8192 + +_STR_TYPES = (str, unicode) if (str is bytes) else (str, bytes) + +# The 'x' mode for open() was introduced in Python 3.3. +_HAS_OPEN_X_MODE = sys.version_info[:2] >= (3, 3) + +_builtin_open = open + + +class BZ2File(io.BufferedIOBase): + + """A file object providing transparent bzip2 (de)compression. + + A BZ2File can act as a wrapper for an existing file object, or refer + directly to a named file on disk. + + Note that BZ2File provides a *binary* file interface - data read is + returned as bytes, and data to be written should be given as bytes. + """ + + def __init__(self, filename, mode="r", buffering=None, compresslevel=9): + """Open a bzip2-compressed file. + + If filename is a str, bytes or unicode object, it gives the name + of the file to be opened. Otherwise, it should be a file object, + which will be used to read or write the compressed data. + + mode can be 'r' for reading (default), 'w' for (over)writing, + 'x' for creating exclusively, or 'a' for appending. These can + equivalently be given as 'rb', 'wb', 'xb', and 'ab'. + + buffering is ignored. Its use is deprecated. + + If mode is 'w', 'x' or 'a', compresslevel can be a number between 1 + and 9 specifying the level of compression: 1 produces the least + compression, and 9 (default) produces the most compression. + + If mode is 'r', the input file may be the concatenation of + multiple compressed streams. + """ + # This lock must be recursive, so that BufferedIOBase's + # readline(), readlines() and writelines() don't deadlock. + self._lock = RLock() + self._fp = None + self._closefp = False + self._mode = _MODE_CLOSED + self._pos = 0 + self._size = -1 + + if buffering is not None: + warnings.warn("Use of 'buffering' argument is deprecated", + DeprecationWarning) + + if not (1 <= compresslevel <= 9): + raise ValueError("compresslevel must be between 1 and 9") + + if mode in ("", "r", "rb"): + mode = "rb" + mode_code = _MODE_READ + self._decompressor = BZ2Decompressor() + self._buffer = b"" + self._buffer_offset = 0 + elif mode in ("w", "wb"): + mode = "wb" + mode_code = _MODE_WRITE + self._compressor = BZ2Compressor(compresslevel) + elif mode in ("x", "xb") and _HAS_OPEN_X_MODE: + mode = "xb" + mode_code = _MODE_WRITE + self._compressor = BZ2Compressor(compresslevel) + elif mode in ("a", "ab"): + mode = "ab" + mode_code = _MODE_WRITE + self._compressor = BZ2Compressor(compresslevel) + else: + raise ValueError("Invalid mode: %r" % (mode,)) + + if isinstance(filename, _STR_TYPES): + self._fp = _builtin_open(filename, mode) + self._closefp = True + self._mode = mode_code + elif hasattr(filename, "read") or hasattr(filename, "write"): + self._fp = filename + self._mode = mode_code + else: + raise TypeError("filename must be a %s or %s object, or a file" % + (_STR_TYPES[0].__name__, _STR_TYPES[1].__name__)) + + def close(self): + """Flush and close the file. + + May be called more than once without error. Once the file is + closed, any other operation on it will raise a ValueError. + """ + with self._lock: + if self._mode == _MODE_CLOSED: + return + try: + if self._mode in (_MODE_READ, _MODE_READ_EOF): + self._decompressor = None + elif self._mode == _MODE_WRITE: + self._fp.write(self._compressor.flush()) + self._compressor = None + finally: + try: + if self._closefp: + self._fp.close() + finally: + self._fp = None + self._closefp = False + self._mode = _MODE_CLOSED + self._buffer = b"" + self._buffer_offset = 0 + + @property + def closed(self): + """True if this file is closed.""" + return self._mode == _MODE_CLOSED + + def fileno(self): + """Return the file descriptor for the underlying file.""" + self._check_not_closed() + return self._fp.fileno() + + def seekable(self): + """Return whether the file supports seeking.""" + return self.readable() and (self._fp.seekable() + if hasattr(self._fp, "seekable") + else hasattr(self._fp, "seek")) + + def readable(self): + """Return whether the file was opened for reading.""" + self._check_not_closed() + return self._mode in (_MODE_READ, _MODE_READ_EOF) + + def writable(self): + """Return whether the file was opened for writing.""" + self._check_not_closed() + return self._mode == _MODE_WRITE + + # Mode-checking helper functions. + + def _check_not_closed(self): + if self.closed: + raise ValueError("I/O operation on closed file") + + def _check_can_read(self): + if self._mode not in (_MODE_READ, _MODE_READ_EOF): + self._check_not_closed() + raise io.UnsupportedOperation("File not open for reading") + + def _check_can_write(self): + if self._mode != _MODE_WRITE: + self._check_not_closed() + raise io.UnsupportedOperation("File not open for writing") + + def _check_can_seek(self): + if self._mode not in (_MODE_READ, _MODE_READ_EOF): + self._check_not_closed() + raise io.UnsupportedOperation("Seeking is only supported " + "on files open for reading") + if hasattr(self._fp, "seekable") and not self._fp.seekable(): + raise io.UnsupportedOperation("The underlying file object " + "does not support seeking") + + # Fill the readahead buffer if it is empty. Returns False on EOF. + def _fill_buffer(self): + if self._mode == _MODE_READ_EOF: + return False + # Depending on the input data, our call to the decompressor may not + # return any data. In this case, try again after reading another block. + while self._buffer_offset == len(self._buffer): + rawblock = (self._decompressor.unused_data or + self._fp.read(_BUFFER_SIZE)) + + if not rawblock: + try: + self._decompressor.decompress(b"") + except EOFError: + # End-of-stream marker and end of file. We're good. + self._mode = _MODE_READ_EOF + self._size = self._pos + return False + else: + # Problem - we were expecting more compressed data. + raise EOFError("Compressed file ended before the " + "end-of-stream marker was reached") + + try: + self._buffer = self._decompressor.decompress(rawblock) + except EOFError: + # Continue to next stream. + self._decompressor = BZ2Decompressor() + try: + self._buffer = self._decompressor.decompress(rawblock) + except IOError: + # Trailing data isn't a valid bzip2 stream. We're done here. + self._mode = _MODE_READ_EOF + self._size = self._pos + return False + self._buffer_offset = 0 + return True + + # Read data until EOF. + # If return_data is false, consume the data without returning it. + def _read_all(self, return_data=True): + # The loop assumes that _buffer_offset is 0. Ensure that this is true. + self._buffer = self._buffer[self._buffer_offset:] + self._buffer_offset = 0 + + blocks = [] + while self._fill_buffer(): + if return_data: + blocks.append(self._buffer) + self._pos += len(self._buffer) + self._buffer = b"" + if return_data: + return b"".join(blocks) + + # Read a block of up to n bytes. + # If return_data is false, consume the data without returning it. + def _read_block(self, n, return_data=True): + # If we have enough data buffered, return immediately. + end = self._buffer_offset + n + if end <= len(self._buffer): + data = self._buffer[self._buffer_offset : end] + self._buffer_offset = end + self._pos += len(data) + return data if return_data else None + + # The loop assumes that _buffer_offset is 0. Ensure that this is true. + self._buffer = self._buffer[self._buffer_offset:] + self._buffer_offset = 0 + + blocks = [] + while n > 0 and self._fill_buffer(): + if n < len(self._buffer): + data = self._buffer[:n] + self._buffer_offset = n + else: + data = self._buffer + self._buffer = b"" + if return_data: + blocks.append(data) + self._pos += len(data) + n -= len(data) + if return_data: + return b"".join(blocks) + + def peek(self, n=0): + """Return buffered data without advancing the file position. + + Always returns at least one byte of data, unless at EOF. + The exact number of bytes returned is unspecified. + """ + with self._lock: + self._check_can_read() + if not self._fill_buffer(): + return b"" + return self._buffer[self._buffer_offset:] + + def read(self, size=-1): + """Read up to size uncompressed bytes from the file. + + If size is negative or omitted, read until EOF is reached. + Returns b'' if the file is already at EOF. + """ + if size is None: + raise TypeError() + with self._lock: + self._check_can_read() + if size == 0: + return b"" + elif size < 0: + return self._read_all() + else: + return self._read_block(size) + + def read1(self, size=-1): + """Read up to size uncompressed bytes, while trying to avoid + making multiple reads from the underlying stream. + + Returns b'' if the file is at EOF. + """ + # Usually, read1() calls _fp.read() at most once. However, sometimes + # this does not give enough data for the decompressor to make progress. + # In this case we make multiple reads, to avoid returning b"". + with self._lock: + self._check_can_read() + if (size == 0 or + # Only call _fill_buffer() if the buffer is actually empty. + # This gives a significant speedup if *size* is small. + (self._buffer_offset == len(self._buffer) and not self._fill_buffer())): + return b"" + if size > 0: + data = self._buffer[self._buffer_offset : + self._buffer_offset + size] + self._buffer_offset += len(data) + else: + data = self._buffer[self._buffer_offset:] + self._buffer = b"" + self._buffer_offset = 0 + self._pos += len(data) + return data + + def readinto(self, b): + """Read up to len(b) bytes into b. + + Returns the number of bytes read (0 for EOF). + """ + with self._lock: + return io.BufferedIOBase.readinto(self, b) + + def readline(self, size=-1): + """Read a line of uncompressed bytes from the file. + + The terminating newline (if present) is retained. If size is + non-negative, no more than size bytes will be read (in which + case the line may be incomplete). Returns b'' if already at EOF. + """ + if not isinstance(size, int): + if not hasattr(size, "__index__"): + raise TypeError("Integer argument expected") + size = size.__index__() + with self._lock: + self._check_can_read() + # Shortcut for the common case - the whole line is in the buffer. + if size < 0: + end = self._buffer.find(b"\n", self._buffer_offset) + 1 + if end > 0: + line = self._buffer[self._buffer_offset : end] + self._buffer_offset = end + self._pos += len(line) + return line + return io.BufferedIOBase.readline(self, size) + + def readlines(self, size=-1): + """Read a list of lines of uncompressed bytes from the file. + + size can be specified to control the number of lines read: no + further lines will be read once the total size of the lines read + so far equals or exceeds size. + """ + if not isinstance(size, int): + if not hasattr(size, "__index__"): + raise TypeError("Integer argument expected") + size = size.__index__() + with self._lock: + return io.BufferedIOBase.readlines(self, size) + + def write(self, data): + """Write a byte string to the file. + + Returns the number of uncompressed bytes written, which is + always len(data). Note that due to buffering, the file on disk + may not reflect the data written until close() is called. + """ + with self._lock: + self._check_can_write() + compressed = self._compressor.compress(data) + self._fp.write(compressed) + self._pos += len(data) + return len(data) + + def writelines(self, seq): + """Write a sequence of byte strings to the file. + + Returns the number of uncompressed bytes written. + seq can be any iterable yielding byte strings. + + Line separators are not added between the written byte strings. + """ + with self._lock: + return io.BufferedIOBase.writelines(self, seq) + + # Rewind the file to the beginning of the data stream. + def _rewind(self): + self._fp.seek(0, 0) + self._mode = _MODE_READ + self._pos = 0 + self._decompressor = BZ2Decompressor() + self._buffer = b"" + self._buffer_offset = 0 + + def seek(self, offset, whence=0): + """Change the file position. + + The new position is specified by offset, relative to the + position indicated by whence. Values for whence are: + + 0: start of stream (default); offset must not be negative + 1: current stream position + 2: end of stream; offset must not be positive + + Returns the new file position. + + Note that seeking is emulated, so depending on the parameters, + this operation may be extremely slow. + """ + with self._lock: + self._check_can_seek() + + # Recalculate offset as an absolute file position. + if whence == 0: + pass + elif whence == 1: + offset = self._pos + offset + elif whence == 2: + # Seeking relative to EOF - we need to know the file's size. + if self._size < 0: + self._read_all(return_data=False) + offset = self._size + offset + else: + raise ValueError("Invalid value for whence: %s" % (whence,)) + + # Make it so that offset is the number of bytes to skip forward. + if offset < self._pos: + self._rewind() + else: + offset -= self._pos + + # Read and discard data until we reach the desired position. + self._read_block(offset, return_data=False) + + return self._pos + + def tell(self): + """Return the current file position.""" + with self._lock: + self._check_not_closed() + return self._pos + + +def open(filename, mode="rb", compresslevel=9, + encoding=None, errors=None, newline=None): + """Open a bzip2-compressed file in binary or text mode. + + The filename argument can be an actual filename (a str, bytes or unicode + object), or an existing file object to read from or write to. + + The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or + "ab" for binary mode, or "rt", "wt", "xt" or "at" for text mode. + The default mode is "rb", and the default compresslevel is 9. + + For binary mode, this function is equivalent to the BZ2File + constructor: BZ2File(filename, mode, compresslevel). In this case, + the encoding, errors and newline arguments must not be provided. + + For text mode, a BZ2File object is created, and wrapped in an + io.TextIOWrapper instance with the specified encoding, error + handling behavior, and line ending(s). + + """ + if "t" in mode: + if "b" in mode: + raise ValueError("Invalid mode: %r" % (mode,)) + else: + if encoding is not None: + raise ValueError("Argument 'encoding' not supported in binary mode") + if errors is not None: + raise ValueError("Argument 'errors' not supported in binary mode") + if newline is not None: + raise ValueError("Argument 'newline' not supported in binary mode") + + bz_mode = mode.replace("t", "") + binary_file = BZ2File(filename, bz_mode, compresslevel=compresslevel) + + if "t" in mode: + return io.TextIOWrapper(binary_file, encoding, errors, newline) + else: + return binary_file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..d31f317 --- /dev/null +++ b/setup.py @@ -0,0 +1,34 @@ +from distutils.core import setup + +with open("README.rst") as f: + readme = f.read() + +setup( + name="bz2file", + version="0.98", + description="Read and write bzip2-compressed files.", + long_description=readme, + author="Nadeem Vawda", + author_email="[email protected]", + url="https://github.com/nvawda/bz2file", + py_modules=["bz2file"], + license="Apache License, Version 2.0", + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.0", + "Programming Language :: Python :: 3.1", + "Programming Language :: Python :: 3.2", + "Programming Language :: Python :: 3.3", + "Programming Language :: Python :: 3.4", + "Topic :: Software Development :: Libraries :: Python Modules", + "Topic :: System :: Archiving :: Compression", + ], +) diff --git a/test_bz2file.py b/test_bz2file.py new file mode 100644 index 0000000..7ad58ff --- /dev/null +++ b/test_bz2file.py @@ -0,0 +1,732 @@ +import unittest +import bz2 +import bz2file +from bz2file import BZ2File +from io import BytesIO +import os +import platform + +try: + import threading +except ImportError: + threading = None + +try: + from test import support +except ImportError: + from test import test_support as support + + +class BaseTest(unittest.TestCase): + "Base for other testcases." + + TEXT_LINES = [ + b'root:x:0:0:root:/root:/bin/bash\n', + b'bin:x:1:1:bin:/bin:\n', + b'daemon:x:2:2:daemon:/sbin:\n', + b'adm:x:3:4:adm:/var/adm:\n', + b'lp:x:4:7:lp:/var/spool/lpd:\n', + b'sync:x:5:0:sync:/sbin:/bin/sync\n', + b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n', + b'halt:x:7:0:halt:/sbin:/sbin/halt\n', + b'mail:x:8:12:mail:/var/spool/mail:\n', + b'news:x:9:13:news:/var/spool/news:\n', + b'uucp:x:10:14:uucp:/var/spool/uucp:\n', + b'operator:x:11:0:operator:/root:\n', + b'games:x:12:100:games:/usr/games:\n', + b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n', + b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', + b'nobody:x:65534:65534:Nobody:/home:\n', + b'postfix:x:100:101:postfix:/var/spool/postfix:\n', + b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', + b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', + b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', + b'www:x:103:104::/var/www:/bin/false\n', + ] + TEXT = b''.join(TEXT_LINES) + DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7 [...] + BAD_DATA = b'this is not a valid bzip2 file' + + def setUp(self): + self.filename = support.TESTFN + + def tearDown(self): + if os.path.isfile(self.filename): + os.unlink(self.filename) + + # In some of the tests, we need to verify the contents of multi-stream + # files, but bz2.decompress() could not handle this case prior to 3.3. + def decompress(self, data): + results = [] + while True: + decomp = bz2.BZ2Decompressor() + results.append(decomp.decompress(data)) + try: + decomp.decompress(b"") + except EOFError: + if not decomp.unused_data: + return b"".join(results) + data = decomp.unused_data + else: + raise ValueError("Compressed data ended before the " + "end-of-stream marker was reached") + + +class BZ2FileTest(BaseTest): + "Test the BZ2File class." + + def createTempFile(self, streams=1, suffix=b""): + with open(self.filename, "wb") as f: + f.write(self.DATA * streams) + f.write(suffix) + + def testBadArgs(self): + self.assertRaises(TypeError, BZ2File, 123.456) + self.assertRaises(ValueError, BZ2File, "/dev/null", "z") + self.assertRaises(ValueError, BZ2File, "/dev/null", "rx") + self.assertRaises(ValueError, BZ2File, "/dev/null", "rbt") + self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=0) + self.assertRaises(ValueError, BZ2File, "/dev/null", compresslevel=10) + + def testRead(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT) + + def testReadBadFile(self): + self.createTempFile(streams=0, suffix=self.BAD_DATA) + with BZ2File(self.filename) as bz2f: + self.assertRaises(IOError, bz2f.read) + + def testReadMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT * 5) + + def testReadMonkeyMultiStream(self): + # Test BZ2File.read() on a multi-stream archive where a stream + # boundary coincides with the end of the raw read buffer. + buffer_size = bz2file._BUFFER_SIZE + bz2file._BUFFER_SIZE = len(self.DATA) + try: + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT * 5) + finally: + bz2file._BUFFER_SIZE = buffer_size + + def testReadTrailingJunk(self): + self.createTempFile(suffix=self.BAD_DATA) + with BZ2File(self.filename) as bz2f: + self.assertEqual(bz2f.read(), self.TEXT) + + def testReadMultiStreamTrailingJunk(self): + self.createTempFile(streams=5, suffix=self.BAD_DATA) + with BZ2File(self.filename) as bz2f: + self.assertEqual(bz2f.read(), self.TEXT * 5) + + def testRead0(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(0), b"") + + def testReadChunk10(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + text = b'' + while True: + str = bz2f.read(10) + if not str: + break + text += str + self.assertEqual(text, self.TEXT) + + def testReadChunk10MultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + text = b'' + while True: + str = bz2f.read(10) + if not str: + break + text += str + self.assertEqual(text, self.TEXT * 5) + + def testRead100(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertEqual(bz2f.read(100), self.TEXT[:100]) + + def testPeek(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + pdata = bz2f.peek() + self.assertNotEqual(len(pdata), 0) + self.assertTrue(self.TEXT.startswith(pdata)) + self.assertEqual(bz2f.read(), self.TEXT) + + def testReadInto(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + n = 128 + b = bytearray(n) + self.assertEqual(bz2f.readinto(b), n) + self.assertEqual(b, self.TEXT[:n]) + n = len(self.TEXT) - n + b = bytearray(len(self.TEXT)) + self.assertEqual(bz2f.readinto(b), n) + self.assertEqual(b[:n], self.TEXT[-n:]) + + def testReadLine(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readline, None) + for line in self.TEXT_LINES: + self.assertEqual(bz2f.readline(), line) + + def testReadLineMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readline, None) + for line in self.TEXT_LINES * 5: + self.assertEqual(bz2f.readline(), line) + + def testReadLines(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readlines, None) + self.assertEqual(bz2f.readlines(), self.TEXT_LINES) + + def testReadLinesMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.readlines, None) + self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5) + + def testIterator(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertEqual(list(iter(bz2f)), self.TEXT_LINES) + + def testIteratorMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5) + + def testClosedIteratorDeadlock(self): + # Issue #3309: Iteration on a closed BZ2File should release the lock. + self.createTempFile() + bz2f = BZ2File(self.filename) + bz2f.close() + self.assertRaises(ValueError, next, bz2f) + # This call will deadlock if the above call failed to release the lock. + self.assertRaises(ValueError, bz2f.readlines) + + def testWrite(self): + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) + + def testWriteChunks10(self): + with BZ2File(self.filename, "w") as bz2f: + n = 0 + while True: + str = self.TEXT[n*10:(n+1)*10] + if not str: + break + bz2f.write(str) + n += 1 + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) + + def testWriteNonDefaultCompressLevel(self): + expected = bz2.compress(self.TEXT, compresslevel=5) + with BZ2File(self.filename, "w", compresslevel=5) as bz2f: + bz2f.write(self.TEXT) + with open(self.filename, "rb") as f: + self.assertEqual(f.read(), expected) + + def testWriteLines(self): + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.writelines) + bz2f.writelines(self.TEXT_LINES) + # Issue #1535500: Calling writelines() on a closed BZ2File + # should raise an exception. + self.assertRaises(ValueError, bz2f.writelines, ["a"]) + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT) + + def testWriteMethodsOnReadOnlyFile(self): + with BZ2File(self.filename, "w") as bz2f: + bz2f.write(b"abc") + + with BZ2File(self.filename, "r") as bz2f: + self.assertRaises(IOError, bz2f.write, b"a") + self.assertRaises(IOError, bz2f.writelines, [b"a"]) + + def testAppend(self): + with BZ2File(self.filename, "w") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with BZ2File(self.filename, "a") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + with open(self.filename, 'rb') as f: + self.assertEqual(self.decompress(f.read()), self.TEXT * 2) + + def testSeekForward(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.seek) + bz2f.seek(150) + self.assertEqual(bz2f.read(), self.TEXT[150:]) + + def testSeekForwardAcrossStreams(self): + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + self.assertRaises(TypeError, bz2f.seek) + bz2f.seek(len(self.TEXT) + 150) + self.assertEqual(bz2f.read(), self.TEXT[150:]) + + def testSeekBackwards(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + bz2f.read(500) + bz2f.seek(-150, 1) + self.assertEqual(bz2f.read(), self.TEXT[500-150:]) + + def testSeekBackwardsAcrossStreams(self): + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + readto = len(self.TEXT) + 100 + while readto > 0: + readto -= len(bz2f.read(readto)) + bz2f.seek(-150, 1) + self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) + + def testSeekBackwardsFromEnd(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150, 2) + self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) + + def testSeekBackwardsFromEndAcrossStreams(self): + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + bz2f.seek(-1000, 2) + self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) + + def testSeekPostEnd(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT)) + self.assertEqual(bz2f.read(), b"") + + def testSeekPostEndMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) + self.assertEqual(bz2f.read(), b"") + + def testSeekPostEndTwice(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT)) + self.assertEqual(bz2f.read(), b"") + + def testSeekPostEndTwiceMultiStream(self): + self.createTempFile(streams=5) + with BZ2File(self.filename) as bz2f: + bz2f.seek(150000) + bz2f.seek(150000) + self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) + self.assertEqual(bz2f.read(), b"") + + def testSeekPreStart(self): + self.createTempFile() + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150) + self.assertEqual(bz2f.tell(), 0) + self.assertEqual(bz2f.read(), self.TEXT) + + def testSeekPreStartMultiStream(self): + self.createTempFile(streams=2) + with BZ2File(self.filename) as bz2f: + bz2f.seek(-150) + self.assertEqual(bz2f.tell(), 0) + self.assertEqual(bz2f.read(), self.TEXT * 2) + + def testFileno(self): + self.createTempFile() + with open(self.filename, 'rb') as rawf: + bz2f = BZ2File(rawf) + try: + self.assertEqual(bz2f.fileno(), rawf.fileno()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.fileno) + + def testSeekable(self): + bz2f = BZ2File(BytesIO(self.DATA)) + try: + self.assertTrue(bz2f.seekable()) + bz2f.read() + self.assertTrue(bz2f.seekable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.seekable) + + bz2f = BZ2File(BytesIO(), "w") + try: + self.assertFalse(bz2f.seekable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.seekable) + + src = BytesIO(self.DATA) + src.seekable = lambda: False + bz2f = BZ2File(src) + try: + self.assertFalse(bz2f.seekable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.seekable) + + def testReadable(self): + bz2f = BZ2File(BytesIO(self.DATA)) + try: + self.assertTrue(bz2f.readable()) + bz2f.read() + self.assertTrue(bz2f.readable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.readable) + + bz2f = BZ2File(BytesIO(), "w") + try: + self.assertFalse(bz2f.readable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.readable) + + def testWritable(self): + bz2f = BZ2File(BytesIO(self.DATA)) + try: + self.assertFalse(bz2f.writable()) + bz2f.read() + self.assertFalse(bz2f.writable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.writable) + + bz2f = BZ2File(BytesIO(), "w") + try: + self.assertTrue(bz2f.writable()) + finally: + bz2f.close() + self.assertRaises(ValueError, bz2f.writable) + + def testOpenDel(self): + if platform.python_implementation() != "CPython": + self.skipTest("Test depends on CPython refcounting semantics") + self.createTempFile() + for i in range(10000): + o = BZ2File(self.filename) + del o + + def testOpenNonexistent(self): + self.assertRaises(IOError, BZ2File, "/non/existent") + + def testReadlinesNoNewline(self): + # Issue #1191043: readlines() fails on a file containing no newline. + data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' + with open(self.filename, "wb") as f: + f.write(data) + with BZ2File(self.filename) as bz2f: + lines = bz2f.readlines() + self.assertEqual(lines, [b'Test']) + with BZ2File(self.filename) as bz2f: + xlines = list(bz2f.readlines()) + self.assertEqual(xlines, [b'Test']) + + def testContextProtocol(self): + f = None + with BZ2File(self.filename, "wb") as f: + f.write(b"xxx") + f = BZ2File(self.filename, "rb") + f.close() + try: + with f: + pass + except ValueError: + pass + else: + self.fail("__enter__ on a closed file didn't raise an exception") + try: + with BZ2File(self.filename, "wb") as f: + 1/0 + except ZeroDivisionError: + pass + else: + self.fail("1/0 didn't raise an exception") + + def testThreading(self): + if not threading: + return + # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. + data = b"1" * 2**20 + nthreads = 10 + with BZ2File(self.filename, 'wb') as f: + def comp(): + for i in range(5): + f.write(data) + threads = [threading.Thread(target=comp) for i in range(nthreads)] + for t in threads: + t.start() + for t in threads: + t.join() + + def testWithoutThreading(self): + if not hasattr(support, "import_fresh_module"): + return + module = support.import_fresh_module("bz2file", blocked=("threading",)) + with module.BZ2File(self.filename, "wb") as f: + f.write(b"abc") + with module.BZ2File(self.filename, "rb") as f: + self.assertEqual(f.read(), b"abc") + + def testMixedIterationAndReads(self): + self.createTempFile() + linelen = len(self.TEXT_LINES[0]) + halflen = linelen // 2 + with BZ2File(self.filename) as bz2f: + bz2f.read(halflen) + self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) + self.assertEqual(bz2f.read(), self.TEXT[linelen:]) + with BZ2File(self.filename) as bz2f: + bz2f.readline() + self.assertEqual(next(bz2f), self.TEXT_LINES[1]) + self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) + with BZ2File(self.filename) as bz2f: + bz2f.readlines() + self.assertRaises(StopIteration, next, bz2f) + self.assertEqual(bz2f.readlines(), []) + + def testMultiStreamOrdering(self): + # Test the ordering of streams when reading a multi-stream archive. + data1 = b"foo" * 1000 + data2 = b"bar" * 1000 + with BZ2File(self.filename, "w") as bz2f: + bz2f.write(data1) + with BZ2File(self.filename, "a") as bz2f: + bz2f.write(data2) + with BZ2File(self.filename) as bz2f: + self.assertEqual(bz2f.read(), data1 + data2) + + def testOpenBytesFilename(self): + str_filename = self.filename + try: + bytes_filename = str_filename.encode("ascii") + except UnicodeEncodeError: + self.skipTest("Temporary file name needs to be ASCII") + with BZ2File(bytes_filename, "wb") as f: + f.write(self.DATA) + with BZ2File(bytes_filename, "rb") as f: + self.assertEqual(f.read(), self.DATA) + # Sanity check that we are actually operating on the right file. + with BZ2File(str_filename, "rb") as f: + self.assertEqual(f.read(), self.DATA) + + + # Tests for a BZ2File wrapping another file object: + + def testReadBytesIO(self): + with BytesIO(self.DATA) as bio: + with BZ2File(bio) as bz2f: + self.assertRaises(TypeError, bz2f.read, None) + self.assertEqual(bz2f.read(), self.TEXT) + self.assertFalse(bio.closed) + + def testPeekBytesIO(self): + with BytesIO(self.DATA) as bio: + with BZ2File(bio) as bz2f: + pdata = bz2f.peek() + self.assertNotEqual(len(pdata), 0) + self.assertTrue(self.TEXT.startswith(pdata)) + self.assertEqual(bz2f.read(), self.TEXT) + + def testWriteBytesIO(self): + with BytesIO() as bio: + with BZ2File(bio, "w") as bz2f: + self.assertRaises(TypeError, bz2f.write) + bz2f.write(self.TEXT) + self.assertEqual(self.decompress(bio.getvalue()), self.TEXT) + self.assertFalse(bio.closed) + + def testSeekForwardBytesIO(self): + with BytesIO(self.DATA) as bio: + with BZ2File(bio) as bz2f: + self.assertRaises(TypeError, bz2f.seek) + bz2f.seek(150) + self.assertEqual(bz2f.read(), self.TEXT[150:]) + + def testSeekBackwardsBytesIO(self): + with BytesIO(self.DATA) as bio: + with BZ2File(bio) as bz2f: + bz2f.read(500) + bz2f.seek(-150, 1) + self.assertEqual(bz2f.read(), self.TEXT[500-150:]) + + def test_read_truncated(self): + # Drop the eos_magic field (6 bytes) and CRC (4 bytes). + truncated = self.DATA[:-10] + with BZ2File(BytesIO(truncated)) as f: + self.assertRaises(EOFError, f.read) + with BZ2File(BytesIO(truncated)) as f: + self.assertEqual(f.read(len(self.TEXT)), self.TEXT) + self.assertRaises(EOFError, f.read, 1) + # Incomplete 4-byte file header, and block header of at least 146 bits. + for i in range(22): + with BZ2File(BytesIO(truncated[:i])) as f: + self.assertRaises(EOFError, f.read, 1) + + +class OpenTest(BaseTest): + "Test the open function." + + def open(self, *args, **kwargs): + return bz2file.open(*args, **kwargs) + + def test_binary_modes(self): + modes = ["wb", "xb"] if bz2file._HAS_OPEN_X_MODE else ["wb"] + for mode in modes: + if mode == "xb": + support.unlink(self.filename) + with self.open(self.filename, mode) as f: + f.write(self.TEXT) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()) + self.assertEqual(file_data, self.TEXT) + with self.open(self.filename, "rb") as f: + self.assertEqual(f.read(), self.TEXT) + with self.open(self.filename, "ab") as f: + f.write(self.TEXT) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()) + self.assertEqual(file_data, self.TEXT * 2) + + def test_implicit_binary_modes(self): + # Test implicit binary modes (no "b" or "t" in mode string). + modes = ["w", "x"] if bz2file._HAS_OPEN_X_MODE else ["w"] + for mode in modes: + if mode == "x": + support.unlink(self.filename) + with self.open(self.filename, mode) as f: + f.write(self.TEXT) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()) + self.assertEqual(file_data, self.TEXT) + with self.open(self.filename, "r") as f: + self.assertEqual(f.read(), self.TEXT) + with self.open(self.filename, "a") as f: + f.write(self.TEXT) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()) + self.assertEqual(file_data, self.TEXT * 2) + + def test_text_modes(self): + text = self.TEXT.decode("ascii") + text_native_eol = text.replace("\n", os.linesep) + modes = ["wt", "xt"] if bz2file._HAS_OPEN_X_MODE else ["wt"] + for mode in modes: + if mode == "xt": + support.unlink(self.filename) + if not bz2file._HAS_OPEN_X_MODE: + continue + with self.open(self.filename, mode) as f: + f.write(text) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()).decode("ascii") + self.assertEqual(file_data, text_native_eol) + with self.open(self.filename, "rt") as f: + self.assertEqual(f.read(), text) + with self.open(self.filename, "at") as f: + f.write(text) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()).decode("ascii") + self.assertEqual(file_data, text_native_eol * 2) + + def test_x_mode(self): + if not bz2file._HAS_OPEN_X_MODE: + return + for mode in ("x", "xb", "xt"): + support.unlink(self.filename) + with self.open(self.filename, mode) as f: + pass + with self.assertRaises(FileExistsError): + with self.open(self.filename, mode) as f: + pass + + def test_fileobj(self): + with self.open(BytesIO(self.DATA), "r") as f: + self.assertEqual(f.read(), self.TEXT) + with self.open(BytesIO(self.DATA), "rb") as f: + self.assertEqual(f.read(), self.TEXT) + text = self.TEXT.decode("ascii") + with self.open(BytesIO(self.DATA), "rt") as f: + self.assertEqual(f.read(), text) + + def test_bad_params(self): + # Test invalid parameter combinations. + self.assertRaises(ValueError, + self.open, self.filename, "wbt") + self.assertRaises(ValueError, + self.open, self.filename, "xbt") + self.assertRaises(ValueError, + self.open, self.filename, "rb", encoding="utf-8") + self.assertRaises(ValueError, + self.open, self.filename, "rb", errors="ignore") + self.assertRaises(ValueError, + self.open, self.filename, "rb", newline="\n") + + def test_encoding(self): + # Test non-default encoding. + text = self.TEXT.decode("ascii") + text_native_eol = text.replace("\n", os.linesep) + with self.open(self.filename, "wt", encoding="utf-16-le") as f: + f.write(text) + with open(self.filename, "rb") as f: + file_data = self.decompress(f.read()).decode("utf-16-le") + self.assertEqual(file_data, text_native_eol) + with self.open(self.filename, "rt", encoding="utf-16-le") as f: + self.assertEqual(f.read(), text) + + def test_encoding_error_handler(self): + # Test with non-default encoding error handler. + with self.open(self.filename, "wb") as f: + f.write(b"foo\xffbar") + with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ + as f: + self.assertEqual(f.read(), "foobar") + + def test_newline(self): + # Test with explicit newline (universal newline mode disabled). + text = self.TEXT.decode("ascii") + with self.open(self.filename, "wt", newline="\n") as f: + f.write(text) + with self.open(self.filename, "rt", newline="\r") as f: + self.assertEqual(f.readlines(), [text]) + + +if __name__ == '__main__': + unittest.main() -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-bz2file.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
