Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 56ab1a4d5 -> 24b7bcc26


Enhancements and deprecation cleanups related to file compression.

- Getting rid of CompressionTypes.ZLIB and
  CompressionTypes.NO_COMPRESSION.
- Introducing BZIP2 compression in analogy to Dataflow Java's BZIP2,
  towards resolution of https://issues.apache.org/jira/browse/BEAM-570.
- Introducing SNAPPY codec support for AVRO conciseness and in order
  to fully resolve https://issues.apache.org/jira/browse/BEAM-570.
- Moving avroio from compression_type to codec as per various
  discussions.
- A few cleanups in avroio.
- Making textio more DRY and doing a few cleanups.
- Raising exceptions when splitting is requested for
  compressed source since that should never happen
  (guaranteed by the service for the supported compression types).
- Using cStringIO instead of StringIO in various places as
  decided in some other discussions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/88461ab8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/88461ab8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/88461ab8

Branch: refs/heads/python-sdk
Commit: 88461ab8161839f2fd8bd47a548201ed7bffb2a3
Parents: 56ab1a4
Author: Gus Katsiapis <katsia...@katsiapis-linux.mtv.corp.google.com>
Authored: Tue Oct 4 19:41:07 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Sat Oct 15 13:28:27 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py            |  54 ++---
 sdks/python/apache_beam/io/avroio_test.py       |  20 ++
 .../apache_beam/io/filebasedsource_test.py      |  47 ++--
 sdks/python/apache_beam/io/fileio.py            | 123 +++++-----
 sdks/python/apache_beam/io/fileio_test.py       | 238 ++-----------------
 sdks/python/apache_beam/io/textio.py            |  71 +++---
 sdks/python/apache_beam/io/textio_test.py       |  86 ++++---
 7 files changed, 233 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index fdf8dae..7de00df 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -16,8 +16,8 @@
 #
 """Implements a source for reading Avro files."""
 
+import cStringIO as StringIO
 import os
-import StringIO
 import zlib
 
 import avro
@@ -73,15 +73,10 @@ class ReadFromAvro(PTransform):
       **kwargs: Additional keyword arguments to be passed to the base class.
     """
     super(ReadFromAvro, self).__init__()
+    self._args = (file_pattern, min_bundle_size)
 
-    self._file_pattern = file_pattern
-    self._min_bundle_size = min_bundle_size
-
-  def apply(self, pcoll):
-    return pcoll.pipeline | Read(
-        _AvroSource(
-            file_pattern=self._file_pattern,
-            min_bundle_size=self._min_bundle_size))
+  def apply(self, pvalue):
+    return pvalue.pipeline | Read(_AvroSource(*self._args))
 
 
 class _AvroUtils(object):
@@ -247,24 +242,17 @@ class _AvroSource(filebasedsource.FileBasedSource):
           yield record
 
 
-_avro_codecs = {
-    fileio.CompressionTypes.UNCOMPRESSED: 'null',
-    fileio.CompressionTypes.ZLIB: 'deflate',
-    # fileio.CompressionTypes.SNAPPY: 'snappy',
-}
-
-
 class WriteToAvro(beam.transforms.PTransform):
   """A ``PTransform`` for writing avro files."""
 
   def __init__(self,
                file_path_prefix,
                schema,
+               codec='deflate',
                file_name_suffix='',
                num_shards=0,
                shard_name_template=None,
-               mime_type='application/x-avro',
-               compression_type=fileio.CompressionTypes.ZLIB):
+               mime_type='application/x-avro'):
     """Initialize a WriteToAvro transform.
 
     Args:
@@ -274,6 +262,8 @@ class WriteToAvro(beam.transforms.PTransform):
         only this argument is specified and num_shards, shard_name_template, 
and
         file_name_suffix use default values.
       schema: The schema to use, as returned by avro.schema.parse
+      codec: The codec to use for block-level compression. Any string supported
+        by the Avro specification is accepted (for example 'null').
       file_name_suffix: Suffix for the files written.
       append_trailing_newlines: indicate whether this sink should write an
         additional newline char after writing each element.
@@ -292,21 +282,15 @@ class WriteToAvro(beam.transforms.PTransform):
         generated. The default pattern used is '-SSSSS-of-NNNNN'.
       mime_type: The MIME type to use for the produced files, if the filesystem
         supports specifying MIME types.
-      compression_type: Used to handle compressed output files. Defaults to
-        CompressionTypes.ZLIB
 
     Returns:
       A WriteToAvro transform usable for writing.
     """
-    if compression_type not in _avro_codecs:
-      raise ValueError(
-          'Compression type %s not supported by avro.' % compression_type)
-    self.args = (file_path_prefix, schema, file_name_suffix, num_shards,
-                 shard_name_template, mime_type, compression_type)
+    self._args = (file_path_prefix, schema, codec, file_name_suffix, 
num_shards,
+                 shard_name_template, mime_type)
 
   def apply(self, pcoll):
-    # pylint: disable=expression-not-assigned
-    pcoll | beam.io.iobase.Write(_AvroSink(*self.args))
+    return pcoll | beam.io.iobase.Write(_AvroSink(*self._args))
 
 
 class _AvroSink(fileio.FileSink):
@@ -315,11 +299,11 @@ class _AvroSink(fileio.FileSink):
   def __init__(self,
                file_path_prefix,
                schema,
+               codec,
                file_name_suffix,
                num_shards,
                shard_name_template,
-               mime_type,
-               compression_type):
+               mime_type):
     super(_AvroSink, self).__init__(
         file_path_prefix,
         file_name_suffix=file_name_suffix,
@@ -327,16 +311,16 @@ class _AvroSink(fileio.FileSink):
         shard_name_template=shard_name_template,
         coder=None,
         mime_type=mime_type,
-        # Compression happens at the block level, not the file level.
+        # Compression happens at the block level using the supplied codec, and
+        # not at the file level.
         compression_type=fileio.CompressionTypes.UNCOMPRESSED)
-    self.schema = schema
-    self.avro_compression_type = compression_type
+    self._schema = schema
+    self._codec = codec
 
   def open(self, temp_path):
     file_handle = super(_AvroSink, self).open(temp_path)
     return avro.datafile.DataFileWriter(
-        file_handle, avro.io.DatumWriter(), self.schema,
-        _avro_codecs[self.avro_compression_type])
+        file_handle, avro.io.DatumWriter(), self._schema, self._codec)
 
   def write_record(self, writer, value):
-    writer.append(value)
+    writer.append(value)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/avroio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio_test.py 
b/sdks/python/apache_beam/io/avroio_test.py
index 697bd2c..1c96d72 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -246,6 +246,26 @@ class TestAvro(unittest.TestCase):
         readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
         assert_that(readback, equal_to([json.dumps(r) for r in self.RECORDS]))
 
+  def test_sink_transform_snappy(self):
+    try:
+      import snappy  # pylint: disable=unused-variable
+      with tempfile.NamedTemporaryFile() as dst:
+        path = dst.name
+        with beam.Pipeline('DirectPipelineRunner') as p:
+          # pylint: disable=expression-not-assigned
+          p | beam.Create(self.RECORDS) | avroio.WriteToAvro(
+              path,
+              self.SCHEMA,
+              codec='snappy')
+        with beam.Pipeline('DirectPipelineRunner') as p:
+          # json used for stable sortability
+          readback = p | avroio.ReadFromAvro(path + '*') | beam.Map(json.dumps)
+          assert_that(readback, equal_to([json.dumps(r) for r in 
self.RECORDS]))
+    except ImportError:
+      logging.warning(
+          'Skipped test_sink_transform_snappy since snappy appears to not be '
+          'installed.')
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py 
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 91b1ffb..f1ac482 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -15,14 +15,14 @@
 # limitations under the License.
 #
 
+import bz2
+import cStringIO as StringIO
 import gzip
 import logging
 import math
 import os
-import StringIO
 import tempfile
 import unittest
-import zlib
 
 import apache_beam as beam
 from apache_beam.io import filebasedsource
@@ -335,44 +335,43 @@ class TestFileBasedSource(unittest.TestCase):
     assert len(expected_data) == 200
     self._run_dataflow_test(pattern, expected_data, False)
 
-  def test_read_gzip_file(self):
+  def test_read_file_bzip2(self):
     _, lines = write_data(10)
     filename = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template).name
-    with gzip.GzipFile(filename, 'wb') as f:
+    with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
     pipeline = beam.Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
-        compression_type=fileio.CompressionTypes.GZIP))
+        compression_type=fileio.CompressionTypes.BZIP2))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
-  def test_read_zlib_file(self):
+  def test_read_file_gzip(self):
     _, lines = write_data(10)
-    compressobj = zlib.compressobj(
-        zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
-    compressed = compressobj.compress('\n'.join(lines)) + compressobj.flush()
-    filename = _write_prepared_data(compressed)
+    filename = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template).name
+    with gzip.GzipFile(filename, 'wb') as f:
+      f.write('\n'.join(lines))
 
     pipeline = beam.Pipeline('DirectPipelineRunner')
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         filename,
         splittable=False,
-        compression_type=fileio.CompressionTypes.ZLIB))
+        compression_type=fileio.CompressionTypes.GZIP))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
-  def test_read_zlib_pattern(self):
+  def test_read_pattern_bzip2(self):
     _, lines = write_data(200)
     splits = [0, 34, 100, 140, 164, 188, 200]
     chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
     compressed_chunks = []
     for c in chunks:
-      compressobj = zlib.compressobj(
-          zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, zlib.MAX_WBITS)
+      compressobj = bz2.BZ2Compressor()
       compressed_chunks.append(
           compressobj.compress('\n'.join(c)) + compressobj.flush())
     file_pattern = write_prepared_pattern(compressed_chunks)
@@ -380,11 +379,11 @@ class TestFileBasedSource(unittest.TestCase):
     pcoll = pipeline | 'Read' >> beam.Read(LineSource(
         file_pattern,
         splittable=False,
-        compression_type=fileio.CompressionTypes.ZLIB))
+        compression_type=fileio.CompressionTypes.BZIP2))
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
-  def test_read_gzip_pattern(self):
+  def test_read_pattern_gzip(self):
     _, lines = write_data(200)
     splits = [0, 34, 100, 140, 164, 188, 200]
     chunks = [lines[splits[i-1]:splits[i]] for i in xrange(1, len(splits))]
@@ -403,7 +402,21 @@ class TestFileBasedSource(unittest.TestCase):
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
-  def test_read_auto_single_file(self):
+  def test_read_auto_single_file_bzip2(self):
+    _, lines = write_data(10)
+    filename = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template, suffix='.bz2').name
+    with gzip.GzipFile(filename, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> beam.Read(LineSource(
+        filename,
+        compression_type=fileio.CompressionTypes.AUTO))
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_auto_single_file_gzip(self):
     _, lines = write_data(10)
     filename = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template, suffix='.gz').name

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 2670470..e6575b0 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -62,31 +62,23 @@ class _CompressionType(object):
 
 class CompressionTypes(object):
   """Enum-like class representing known compression types."""
-
   # Detect compression based on filename extension.
   #
   # The following extensions are currently recognized by auto-detection:
-  #   .gz (implies GZIP as described below)
-  #   .z  (implies ZLIB as described below).
+  #   .bz2 (implies BZIP2 as described below).
+  #   .gz  (implies GZIP as described below)
   # Any non-recognized extension implies UNCOMPRESSED as described below.
   AUTO = _CompressionType('auto')
 
+  # BZIP2 compression.
+  BZIP2 = _CompressionType('bzip2')
+
   # GZIP compression (deflate with GZIP headers).
   GZIP = _CompressionType('gzip')
 
-  # ZLIB compression (deflate with ZLIB headers).
-  ZLIB = _CompressionType('zlib')
-
-  # BZIP2 compression  (deflate with BZIP2 headers)
-  BZIP2 = _CompressionType('bz2')
-
   # Uncompressed (i.e., may be split).
   UNCOMPRESSED = _CompressionType('uncompressed')
 
-  # TODO: Remove this backwards-compatibility soon.
-  # Deprecated. Use UNCOMPRESSED instead.
-  NO_COMPRESSION = UNCOMPRESSED
-
   @classmethod
   def is_valid_compression_type(cls, compression_type):
     """Returns true for valid compression types, false otherwise."""
@@ -95,18 +87,15 @@ class CompressionTypes(object):
   @classmethod
   def mime_type(cls, compression_type, default='application/octet-stream'):
     mime_types_by_compression_type = {
-        cls.GZIP: 'application/x-gzip',
-        cls.ZLIB: 'application/octet-stream',
         cls.BZIP2: 'application/x-bz2',
+        cls.GZIP: 'application/x-gzip',
     }
     return mime_types_by_compression_type.get(compression_type, default)
 
   @classmethod
   def detect_compression_type(cls, file_path):
     """Returns the compression type of a file (based on its suffix)"""
-    compression_types_by_suffix = {'.gz': cls.GZIP,
-                                   '.z': cls.ZLIB,
-                                   '.bz2': cls.BZIP2}
+    compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
     lowercased_path = file_path.lower()
     for suffix, compression_type in compression_types_by_suffix.iteritems():
       if lowercased_path.endswith(suffix):
@@ -173,16 +162,10 @@ class NativeFileSource(dataflow_io.NativeSource):
     self.mime_type = mime_type
 
   def __eq__(self, other):
-    # TODO: Remove this backwards-compatibility soon.
-    def equiv_autos(lhs, rhs):
-      return ((lhs == 'AUTO' and rhs == CompressionTypes.AUTO) or
-              (lhs == CompressionTypes.AUTO and rhs == 'AUTO'))
-
     return (self.file_path == other.file_path and
             self.start_offset == other.start_offset and
             self.end_offset == other.end_offset and
-            (self.compression_type == other.compression_type or
-             equiv_autos(self.compression_type, other.compression_type)) and
+            self.compression_type == other.compression_type and
             self.coder == other.coder and self.mime_type == other.mime_type)
 
   @property
@@ -234,12 +217,10 @@ class 
NativeFileSourceReader(dataflow_io.NativeSourceReader,
                                                            self.end_offset)
 
     # Position to the appropriate start_offset.
-    if self.start_offset > 0:
-      if ChannelFactory.is_compressed(self.file):
-        # TODO: Turns this warning into an exception soon.
-        logging.warning(
-            'Encountered initial split starting at (%s) for compressed 
source.',
-            self.start_offset)
+    if self.start_offset > 0 and ChannelFactory.is_compressed(self.file):
+      raise ValueError(
+          'Unexpected positive start_offset (%s) for a compressed source: %s',
+          self.start_offset, self.source)
     self.seek_to_true_start_offset()
 
     return self
@@ -249,14 +230,11 @@ class 
NativeFileSourceReader(dataflow_io.NativeSourceReader,
 
   def __iter__(self):
     if self.current_offset > 0 and ChannelFactory.is_compressed(self.file):
-      # When compression is enabled both initial and dynamic splitting should 
be
-      # prevented. Here we prevent initial splitting by ignoring all splits
-      # other than the split that starts at byte 0.
-      #
-      # TODO: Turns this warning into an exception soon.
-      logging.warning('Ignoring split starting at (%s) for compressed source.',
-                      self.current_offset)
-      return
+      # When compression is enabled both initial and dynamic splitting should
+      # not be allowed.
+      raise ValueError(
+          'Unespected split starting at (%s) for compressed source: %s',
+          self.current_offset, self.source)
 
     while True:
       if not self.range_tracker.try_claim(record_start=self.current_offset):
@@ -318,10 +296,6 @@ class 
NativeFileSourceReader(dataflow_io.NativeSourceReader,
       # When compression is enabled both initial and dynamic splitting should 
be
       # prevented. Here we prevent dynamic splitting by ignoring all dynamic
       # split requests at the reader.
-      #
-      # TODO: Turns this warning into an exception soon.
-      logging.warning('FileBasedReader cannot be split since it is compressed. 
'
-                      'Requested: %r', dynamic_split_request)
       return
 
     assert dynamic_split_request is not None
@@ -463,13 +437,9 @@ class ChannelFactory(object):
            compression_type=CompressionTypes.AUTO):
     if compression_type == CompressionTypes.AUTO:
       compression_type = CompressionTypes.detect_compression_type(path)
-    elif compression_type == 'AUTO':
-      # TODO: Remove this backwards-compatibility soon.
-      compression_type = CompressionTypes.detect_compression_type(path)
-    else:
-      if not CompressionTypes.is_valid_compression_type(compression_type):
-        raise TypeError('compression_type must be CompressionType object but '
-                        'was %s' % type(compression_type))
+    elif not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
 
     if path.startswith('gs://'):
       # pylint: disable=wrong-import-order, wrong-import-position
@@ -584,10 +554,10 @@ class ChannelFactory(object):
 
 class _CompressedFile(object):
   """Somewhat limited file wrapper for easier handling of compressed files."""
-  _type_mask = {
-      CompressionTypes.GZIP: zlib.MAX_WBITS | 16,
-      CompressionTypes.ZLIB: zlib.MAX_WBITS,
-  }
+
+  # The bit mask to use for the wbits parameters of the GZIP compressor and
+  # decompressor objects.
+  _gzip_mask = zlib.MAX_WBITS | 16
 
   def __init__(self,
                fileobj,
@@ -606,8 +576,7 @@ class _CompressedFile(object):
       if self._compression_type == CompressionTypes.BZIP2:
         self._decompressor = bz2.BZ2Decompressor()
       else:
-        self._decompressor = zlib.decompressobj(
-            self._type_mask[compression_type])
+        self._decompressor = zlib.decompressobj(self._gzip_mask)
     else:
       self._decompressor = None
 
@@ -616,8 +585,7 @@ class _CompressedFile(object):
         self._compressor = bz2.BZ2Compressor()
       else:
         self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
-                                            zlib.DEFLATED,
-                                            self._type_mask[compression_type])
+                                            zlib.DEFLATED, self._gzip_mask)
     else:
       self._compressor = None
 
@@ -625,10 +593,14 @@ class _CompressedFile(object):
     if not CompressionTypes.is_valid_compression_type(compression_type):
       raise TypeError('compression_type must be CompressionType object but '
                       'was %s' % type(compression_type))
-    if (compression_type == CompressionTypes.AUTO or
-        compression_type == CompressionTypes.UNCOMPRESSED):
+    if compression_type in (CompressionTypes.AUTO, 
CompressionTypes.UNCOMPRESSED
+                           ):
+      raise ValueError(
+          'Cannot create object with unspecified or no compression')
+    if compression_type not in (CompressionTypes.BZIP2, CompressionTypes.GZIP):
       raise ValueError(
-          'cannot create object with unspecified or no compression')
+          'compression_type %s not supported for whole-file compression',
+          compression_type)
 
   def _readable(self):
     mode = self._file.mode
@@ -652,12 +624,23 @@ class _CompressedFile(object):
       buf = self._file.read(self._read_size)
       if buf:
         self._data += self._decompressor.decompress(buf)
-      elif self._compression_type != CompressionTypes.BZIP2:
-        # EOF reached, flush. BZIP2 does not have this flush method,
-        # because of it's block nature
-        self._data += self._decompressor.flush()
-        return
       else:
+        # EOF reached.
+        # Verify completeness and no corruption and flush (if needed by
+        # the underlying algorithm).
+        if self._compression_type == CompressionTypes.BZIP2:
+          # Having unused_data past end of stream would imply file corruption.
+          assert not self._decompressor.unused_data, 'Possible file 
corruption.'
+          try:
+            # EOF implies that the underlying BZIP2 stream must also have
+            # reached EFO. We expect this to raise an EOFError and we catch it
+            # below. Any other kind of error though would be problematic.
+            self._decompressor.decompress('dummy')
+            assert False, 'Possible file corruption.'
+          except EOFError:
+            pass  # All is as expected!
+        else:
+          self._data += self._decompressor.flush()
         return
 
   def _read_from_internal_buffer(self, num_bytes):
@@ -697,8 +680,7 @@ class _CompressedFile(object):
     if self._file is None:
       return
 
-    if self._writeable():
-      self._file.write(self._compressor.flush())
+    self.flush()
     self._file.close()
 
   def flush(self):
@@ -1014,8 +996,9 @@ class NativeFileSink(dataflow_io.NativeSink):
     self.file_name_suffix = file_name_suffix
     self.num_shards = num_shards
     # TODO: Update this when the service supports more patterns.
-    self.shard_name_template = ('-SSSSS-of-NNNNN' if shard_name_template is 
None
-                                else shard_name_template)
+    self.shard_name_template = (DEFAULT_SHARD_NAME_TEMPLATE if
+                                shard_name_template is None else
+                                shard_name_template)
     # TODO: Implement sink validation.
     self.validate = validate
     self.mime_type = mime_type

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index 45435e6..15daf04 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -138,7 +138,7 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(read_lines, [])
 
   def test_read_entire_file_gzip_large(self):
-    lines = ['Line %d' % d for d in range(10 * 1000)]
+    lines = ['Line %d' % d for d in range(100 * 1000)]
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
@@ -188,7 +188,7 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(read_lines, [])
 
   def test_read_entire_file_bzip2_large(self):
-    lines = ['Line %d' % d for d in range(10 * 1000)]
+    lines = ['Line %d' % d for d in range(100 * 1000)]
     compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
     source = fileio.TextFileSource(
@@ -200,140 +200,6 @@ class TestTextFileSource(unittest.TestCase):
         read_lines.append(line)
     self.assertEqual(read_lines, lines)
 
-  def test_read_entire_file_zlib(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.ZLIB)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_zlib_auto(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(file_path=self.create_temp_file(
-        data, suffix='.Z'))
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_read_entire_file_zlib_empty(self):
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('') + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.ZLIB)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_read_entire_file_zlib_large(self):
-    lines = ['Line %d' % d for d in range(10 * 1000)]
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.ZLIB)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_skip_entire_file_zlib(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        start_offset=1,  # Anything other than 0 should lead to a null-read.
-        compression_type=fileio.CompressionTypes.ZLIB)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_skip_entire_file_bzip2(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        start_offset=1,  # Anything other than 0 should lead to a null-read.
-        compression_type=fileio.CompressionTypes.BZIP2)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_skip_entire_file_gzip(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        start_offset=1,  # Anything other than 0 should lead to a null-read.
-        compression_type=fileio.CompressionTypes.GZIP)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, [])
-
-  def test_consume_entire_file_gzip(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        end_offset=1,  # Any end_offset should effectively be ignored.
-        compression_type=fileio.CompressionTypes.GZIP)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_consume_entire_file_bzip2(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = bz2.BZ2Compressor()
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        end_offset=1,  # Any end_offset should effectively be ignored.
-        compression_type=fileio.CompressionTypes.BZIP2)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
-  def test_consume_entire_file_zlib(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        end_offset=1,  # Any end_offset should effectively be ignored.
-        compression_type=fileio.CompressionTypes.ZLIB)
-    read_lines = []
-    with source.reader() as reader:
-      for line in reader:
-        read_lines.append(line)
-    self.assertEqual(read_lines, lines)
-
   def test_progress_entire_file(self):
     lines = ['First', 'Second', 'Third']
     source = fileio.TextFileSource(
@@ -387,25 +253,6 @@ class TestTextFileSource(unittest.TestCase):
     self.assertEqual(len(progress_record), 3)
     self.assertEqual(progress_record, [0, 6, 13])
 
-  def test_progress_entire_file_zlib(self):
-    lines = ['First', 'Second', 'Third']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.ZLIB)
-    progress_record = []
-    with source.reader() as reader:
-      self.assertEqual(-1, reader.get_progress().position.byte_offset)
-      for line in reader:
-        self.assertIsNotNone(line)
-        progress_record.append(reader.get_progress().position.byte_offset)
-      self.assertEqual(18,  # Reading the entire contents before we decide EOF.
-                       reader.get_progress().position.byte_offset)
-
-    self.assertEqual(len(progress_record), 3)
-    self.assertEqual(progress_record, [0, 6, 13])
-
   def try_splitting_reader_at(self, reader, split_request, expected_response):
     actual_response = reader.request_dynamic_split(split_request)
 
@@ -421,10 +268,20 @@ class TestTextFileSource(unittest.TestCase):
 
       return actual_response
 
-  def test_gzip_file_unsplittable(self):
+  def test_file_unsplittable_gzip(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
     compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS | 16)
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
+
+    with self.assertRaises(ValueError):  # Unsplittable initially.
+      source = fileio.TextFileSource(
+          file_path=self.create_temp_file(data),
+          compression_type=fileio.CompressionTypes.GZIP,
+          start_offset=1)  # Anything other than 0 will do.
+      with source.reader():
+        pass
+
+    # Unsplittable dynamically.
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
         compression_type=fileio.CompressionTypes.GZIP)
@@ -451,44 +308,23 @@ class TestTextFileSource(unittest.TestCase):
                 dataflow_io.ReaderProgress(percent_complete=percent_complete)),
             None)
 
-  def test_bzip2_file_unsplittable(self):
+  def test_file_unsplittable_bzip2(self):
     lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
     compressor = bz2.BZ2Compressor()
     data = compressor.compress('\n'.join(lines)) + compressor.flush()
-    source = fileio.TextFileSource(
-        file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.BZIP2)
 
-    with source.reader() as reader:
-      percents_complete = [x / 100.0 for x in range(101)]
+    with self.assertRaises(ValueError):  # Unsplittable initially.
+      source = fileio.TextFileSource(
+          file_path=self.create_temp_file(data),
+          compression_type=fileio.CompressionTypes.BZIP2,
+          start_offset=1)  # Anything other than 0 will do.
+      with source.reader():
+        pass
 
-      # Cursor at beginning of file.
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-      # Cursor passed beginning of file.
-      reader_iter = iter(reader)
-      next(reader_iter)
-      next(reader_iter)
-      for percent_complete in percents_complete:
-        self.try_splitting_reader_at(
-            reader,
-            dataflow_io.DynamicSplitRequest(
-                dataflow_io.ReaderProgress(percent_complete=percent_complete)),
-            None)
-
-  def test_zlib_file_unsplittable(self):
-    lines = ['aaaa', 'bbbb', 'cccc', 'dddd', 'eeee']
-    compressor = zlib.compressobj(-1, zlib.DEFLATED, zlib.MAX_WBITS)
-    data = compressor.compress('\n'.join(lines)) + compressor.flush()
+    # Unsplittable dynamically.
     source = fileio.TextFileSource(
         file_path=self.create_temp_file(data),
-        compression_type=fileio.CompressionTypes.ZLIB)
-
+        compression_type=fileio.CompressionTypes.BZIP2)
     with source.reader() as reader:
       percents_complete = [x / 100.0 for x in range(101)]
 
@@ -811,33 +647,6 @@ class TestNativeTextFileSink(unittest.TestCase):
     with bz2.BZ2File(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
-  def test_write_text_zlib_file(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.ZLIB)
-    self._write_lines(sink, self.lines)
-
-    with open(self.path, 'r') as f:
-      self.assertEqual(
-          zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), self.lines)
-
-  def test_write_text_zlib_file_auto(self):
-    self.path = tempfile.NamedTemporaryFile(suffix='.Z').name
-    sink = fileio.NativeTextFileSink(self.path)
-    self._write_lines(sink, self.lines)
-
-    with open(self.path, 'r') as f:
-      self.assertEqual(
-          zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), self.lines)
-
-  def test_write_text_zlib_file_empty(self):
-    sink = fileio.NativeTextFileSink(
-        self.path, compression_type=fileio.CompressionTypes.ZLIB)
-    self._write_lines(sink, [])
-
-    with open(self.path, 'r') as f:
-      self.assertEqual(
-          zlib.decompress(f.read(), zlib.MAX_WBITS).splitlines(), [])
-
 
 class MyFileSink(fileio.FileSink):
 
@@ -982,6 +791,7 @@ class TestFileSink(unittest.TestCase):
     with self.assertRaises(IOError):
       list(sink.finalize_write(init_token, [res1, res2]))
 
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index 6042576..f1f5a25 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -174,10 +174,14 @@ class ReadFromText(PTransform):
   This implementation only supports reading text encoded using UTF-8 or ASCII.
   This does not support other encodings such as UTF-16 or UTF-32."""
 
-  def __init__(self, file_pattern=None, min_bundle_size=0,
-               compression_type=fileio.CompressionTypes.UNCOMPRESSED,
-               strip_trailing_newlines=True,
-               coder=coders.StrUtf8Coder(), **kwargs):
+  def __init__(
+      self,
+      file_pattern=None,
+      min_bundle_size=0,
+      compression_type=fileio.CompressionTypes.AUTO,
+      strip_trailing_newlines=True,
+      coder=coders.StrUtf8Coder(),
+      **kwargs):
     """Initialize the ReadFromText transform.
 
     Args:
@@ -187,8 +191,9 @@ class ReadFromText(PTransform):
       min_bundle_size: Minimum size of bundles that should be generated when
                        splitting this source into bundles. See
                        ``FileBasedSource`` for more details.
-      compression_type: Used to handle compressed input files. Should be an
-                        object of type fileio.CompressionTypes.
+      compression_type: Used to handle compressed input files. Typical value
+          is CompressionTypes.AUTO, in which case the underlying file_path's
+          extension will be used to detect the compression.
       strip_trailing_newlines: Indicates whether this source should remove
                                the newline char in each line it reads before
                                decoding that line.
@@ -196,33 +201,25 @@ class ReadFromText(PTransform):
     """
 
     super(ReadFromText, self).__init__(**kwargs)
-    self._file_pattern = file_pattern
-    self._min_bundle_size = min_bundle_size
-    self._compression_type = compression_type
-    self._strip_trailing_newlines = strip_trailing_newlines
-    self._coder = coder
+    self._args = (file_pattern, min_bundle_size, compression_type,
+                  strip_trailing_newlines, coder)
 
-  def apply(self, pcoll):
-    return pcoll | Read(_TextSource(
-        self._file_pattern,
-        self._min_bundle_size,
-        self._compression_type,
-        self._strip_trailing_newlines,
-        self._coder))
+  def apply(self, pvalue):
+    return pvalue.pipeline | Read(_TextSource(*self._args))
 
 
 class WriteToText(PTransform):
   """A PTransform for writing to text files."""
 
-  def __init__(self,
-               file_path_prefix,
-               file_name_suffix='',
-               append_trailing_newlines=True,
-               num_shards=0,
-               shard_name_template=None,
-               coder=coders.ToStringCoder(),
-               compression_type=fileio.CompressionTypes.NO_COMPRESSION,
-              ):
+  def __init__(
+      self,
+      file_path_prefix,
+      file_name_suffix='',
+      append_trailing_newlines=True,
+      num_shards=0,
+      shard_name_template=None,
+      coder=coders.ToStringCoder(),
+      compression_type=fileio.CompressionTypes.AUTO):
     """Initialize a WriteToText PTransform.
 
     Args:
@@ -248,19 +245,15 @@ class WriteToText(PTransform):
         case it behaves as if num_shards was set to 1 and only one file will be
         generated. The default pattern used is '-SSSSS-of-NNNNN'.
       coder: Coder used to encode each line.
-      compression_type: Type of compression to use for this sink.
+      compression_type: Used to handle compressed output files. Typical value
+          is CompressionTypes.AUTO, in which case the final file path's
+          extension (as determined by file_path_prefix, file_name_suffix,
+          num_shards and shard_name_template) will be used to detect the
+          compression.
     """
 
-    self._file_path_prefix = file_path_prefix
-    self._file_name_suffix = file_name_suffix
-    self._append_trailing_newlines = append_trailing_newlines
-    self._num_shards = num_shards
-    self._shard_name_template = shard_name_template
-    self._coder = coder
-    self._compression_type = compression_type
+    self._args = (file_path_prefix, file_name_suffix, append_trailing_newlines,
+                  num_shards, shard_name_template, coder, compression_type)
 
   def apply(self, pcoll):
-    return pcoll | Write(_TextSink(
-        self._file_path_prefix, self._file_name_suffix,
-        self._append_trailing_newlines, self._num_shards,
-        self._shard_name_template, self._coder, self._compression_type))
+    return pcoll | Write(_TextSink(*self._args))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/88461ab8/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index fef2b79..d42de2b 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -17,13 +17,13 @@
 
 """Tests for textio module."""
 
+import bz2
 import glob
 import gzip
 import logging
 import os
 import tempfile
 import unittest
-import zlib
 
 import apache_beam as beam
 import apache_beam.io.source_test_utils as source_test_utils
@@ -261,6 +261,44 @@ class TextSourceTest(unittest.TestCase):
     assert_that(pcoll, equal_to(expected_data))
     pipeline.run()
 
+  def test_read_auto_bzip2(self):
+    _, lines = write_data(15)
+    file_name = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template, suffix='.bz2').name
+    with bz2.BZFile(file_name, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> ReadFromText(file_name)
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_auto_gzip(self):
+    _, lines = write_data(15)
+    file_name = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template, suffix='.gz').name
+    with gzip.GzipFile(file_name, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> ReadFromText(file_name)
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
+  def test_read_bzip2(self):
+    _, lines = write_data(15)
+    file_name = tempfile.NamedTemporaryFile(
+        delete=False, prefix=tempfile.template).name
+    with bz2.BZFile(file_name, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    pipeline = beam.Pipeline('DirectPipelineRunner')
+    pcoll = pipeline | 'Read' >> ReadFromText(
+        file_name,
+        compression_type=CompressionTypes.BZIP2)
+    assert_that(pcoll, equal_to(lines))
+    pipeline.run()
+
   def test_read_gzip(self):
     _, lines = write_data(15)
     file_name = tempfile.NamedTemporaryFile(
@@ -355,6 +393,22 @@ class TextSinkTest(unittest.TestCase):
     with open(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_bzip2_file(self):
+    sink = TextSink(
+        self.path, compression_type=CompressionTypes.BZIP2)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
+  def test_write_bzip2_file_auto(self):
+    self.path = tempfile.NamedTemporaryFile(suffix='.bz2').name
+    sink = TextSink(self.path)
+    self._write_lines(sink, self.lines)
+
+    with bz2.BZ2File(self.path, 'r') as f:
+      self.assertEqual(f.read().splitlines(), self.lines)
+
   def test_write_gzip_file(self):
     sink = TextSink(
         self.path, compression_type=CompressionTypes.GZIP)
@@ -379,36 +433,6 @@ class TextSinkTest(unittest.TestCase):
     with gzip.GzipFile(self.path, 'r') as f:
       self.assertEqual(f.read().splitlines(), [])
 
-  def test_write_zlib_file(self):
-    sink = TextSink(
-        self.path, compression_type=CompressionTypes.ZLIB)
-    self._write_lines(sink, self.lines)
-
-    with open(self.path, 'r') as f:
-      content = f.read()
-      self.assertEqual(
-          zlib.decompress(content, zlib.MAX_WBITS).splitlines(), self.lines)
-
-  def test_write_zlib_file_auto(self):
-    self.path = tempfile.NamedTemporaryFile(suffix='.Z').name
-    sink = TextSink(self.path)
-    self._write_lines(sink, self.lines)
-
-    with open(self.path, 'r') as f:
-      content = f.read()
-      self.assertEqual(
-          zlib.decompress(content, zlib.MAX_WBITS).splitlines(), self.lines)
-
-  def test_write_zlib_file_empty(self):
-    sink = TextSink(
-        self.path, compression_type=CompressionTypes.ZLIB)
-    self._write_lines(sink, [])
-
-    with open(self.path, 'r') as f:
-      content = f.read()
-      self.assertEqual(
-          zlib.decompress(content, zlib.MAX_WBITS).splitlines(), [])
-
   def test_write_dataflow(self):
     pipeline = beam.Pipeline('DirectPipelineRunner')
     pcoll = pipeline | beam.core.Create('Create', self.lines)


Reply via email to