This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 045f9ea  [BEAM-6211] Support deflate (zlib) in CompressedFile
     new 32ca551a Merge pull request #7253: [BEAM-6211] Support deflate (zlib) 
in CompressedFile
045f9ea is described below

commit 045f9ea0564fd67b2c038d720de440640fdb1677
Author: Brian Martin <brianmar...@gmail.com>
AuthorDate: Tue Dec 11 11:39:14 2018 -0500

    [BEAM-6211] Support deflate (zlib) in CompressedFile
    
    `.deflate` files are quite common in Hadoop and also supported by
    TensorFlow in TFRecord file format. Moreover, `.deflate` is already
    supported since 0.6.0 by the Java SDK (see BEAM-1518).
---
 sdks/python/apache_beam/io/filesystem.py      |  32 ++++++--
 sdks/python/apache_beam/io/filesystem_test.py |  33 +++++---
 sdks/python/apache_beam/io/textio_test.py     | 113 ++++++++++++++++++++++++++
 sdks/python/apache_beam/io/tfrecordio_test.py |  32 ++++++++
 4 files changed, 191 insertions(+), 19 deletions(-)

diff --git a/sdks/python/apache_beam/io/filesystem.py 
b/sdks/python/apache_beam/io/filesystem.py
index 7dd3a5b..015af8b 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -58,12 +58,16 @@ class CompressionTypes(object):
   # The following extensions are currently recognized by auto-detection:
   #   .bz2 (implies BZIP2 as described below).
   #   .gz  (implies GZIP as described below)
+  #   .deflate (implies DEFLATE as described below)
   # Any non-recognized extension implies UNCOMPRESSED as described below.
   AUTO = 'auto'
 
   # BZIP2 compression.
   BZIP2 = 'bzip2'
 
+  # DEFLATE compression
+  DEFLATE = 'deflate'
+
   # GZIP compression (deflate with GZIP headers).
   GZIP = 'gzip'
 
@@ -76,6 +80,7 @@ class CompressionTypes(object):
     types = set([
         CompressionTypes.AUTO,
         CompressionTypes.BZIP2,
+        CompressionTypes.DEFLATE,
         CompressionTypes.GZIP,
         CompressionTypes.UNCOMPRESSED
     ])
@@ -85,6 +90,7 @@ class CompressionTypes(object):
   def mime_type(cls, compression_type, default='application/octet-stream'):
     mime_types_by_compression_type = {
         cls.BZIP2: 'application/x-bz2',
+        cls.DEFLATE: 'application/x-deflate',
         cls.GZIP: 'application/x-gzip',
     }
     return mime_types_by_compression_type.get(compression_type, default)
@@ -92,7 +98,8 @@ class CompressionTypes(object):
   @classmethod
   def detect_compression_type(cls, file_path):
     """Returns the compression type of a file (based on its suffix)."""
-    compression_types_by_suffix = {'.bz2': cls.BZIP2, '.gz': cls.GZIP}
+    compression_types_by_suffix = {'.bz2': cls.BZIP2, '.deflate': cls.DEFLATE,
+                                   '.gz': cls.GZIP}
     lowercased_path = file_path.lower()
     for suffix, compression_type in compression_types_by_suffix.items():
       if lowercased_path.endswith(suffix):
@@ -150,6 +157,8 @@ class CompressedFile(object):
   def _initialize_decompressor(self):
     if self._compression_type == CompressionTypes.BZIP2:
       self._decompressor = bz2.BZ2Decompressor()
+    elif self._compression_type == CompressionTypes.DEFLATE:
+      self._decompressor = zlib.decompressobj()
     else:
       assert self._compression_type == CompressionTypes.GZIP
       self._decompressor = zlib.decompressobj(self._gzip_mask)
@@ -157,6 +166,9 @@ class CompressedFile(object):
   def _initialize_compressor(self):
     if self._compression_type == CompressionTypes.BZIP2:
       self._compressor = bz2.BZ2Compressor()
+    elif self._compression_type == CompressionTypes.DEFLATE:
+      self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
+                                          zlib.DEFLATED)
     else:
       assert self._compression_type == CompressionTypes.GZIP
       self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION,
@@ -208,19 +220,25 @@ class CompressedFile(object):
         # file. We read concatenated files by recursively creating decompressor
         # objects for the unused compressed data.
         if (self._compression_type == CompressionTypes.BZIP2 or
+            self._compression_type == CompressionTypes.DEFLATE or
             self._compression_type == CompressionTypes.GZIP):
           if self._decompressor.unused_data != b'':
             buf = self._decompressor.unused_data
-            self._decompressor = (
-                bz2.BZ2Decompressor()
-                if self._compression_type == CompressionTypes.BZIP2
-                else zlib.decompressobj(self._gzip_mask))
+
+            if self._compression_type == CompressionTypes.BZIP2:
+              self._decompressor = bz2.BZ2Decompressor()
+            elif self._compression_type == CompressionTypes.DEFLATE:
+              self._decompressor = zlib.decompressobj()
+            else:
+              self._decompressor = zlib.decompressobj(self._gzip_mask)
+
             decompressed = self._decompressor.decompress(buf)
             self._read_buffer.write(decompressed)
             continue
         else:
-          # Gzip and bzip2 formats do not require flushing remaining data in 
the
-          # decompressor into the read buffer when fully decompressing files.
+          # Deflate, Gzip and bzip2 formats do not require flushing
+          # remaining data in the decompressor into the read buffer when
+          # fully decompressing files.
           self._read_buffer.write(self._decompressor.flush())
 
         # Record that we have hit the end of file, so we won't unnecessarily
diff --git a/sdks/python/apache_beam/io/filesystem_test.py 
b/sdks/python/apache_beam/io/filesystem_test.py
index abbadde..b26d79d 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -29,6 +29,7 @@ import posixpath
 import sys
 import tempfile
 import unittest
+import zlib
 from builtins import range
 from io import BytesIO
 
@@ -294,16 +295,19 @@ atomized in instants hammered around the
   def _create_compressed_file(self, compression_type, content):
     file_name = self._create_temp_file()
 
-    if compression_type == CompressionTypes.BZIP2:
-      compress_factory = bz2.BZ2File
-    elif compression_type == CompressionTypes.GZIP:
-      compress_factory = gzip.open
+    if compression_type == CompressionTypes.DEFLATE:
+      with open(file_name, 'wb') as f:
+        f.write(zlib.compress(content))
+    elif compression_type == CompressionTypes.BZIP2 or \
+            compression_type == CompressionTypes.GZIP:
+      compress_open = bz2.BZ2File \
+          if compression_type == CompressionTypes.BZIP2 \
+          else gzip.open
+      with compress_open(file_name, 'wb') as f:
+        f.write(content)
     else:
       assert False, "Invalid compression type: %s" % compression_type
 
-    with compress_factory(file_name, 'wb') as f:
-      f.write(content)
-
     return file_name
 
   def test_seekable_enabled_on_read(self):
@@ -322,7 +326,8 @@ atomized in instants hammered around the
       self.assertFalse(writeable.seekable)
 
   def test_seek_set(self):
-    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE,
+                             CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(f, compression_type,
@@ -352,7 +357,8 @@ atomized in instants hammered around the
           self.assertEqual(uncompressed_position, reference_position)
 
   def test_seek_cur(self):
-    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE,
+                             CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(f, compression_type,
@@ -382,7 +388,8 @@ atomized in instants hammered around the
           self.assertEqual(uncompressed_position, reference_position)
 
   def test_read_from_end_returns_no_data(self):
-    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE,
+                             CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(f, compression_type,
@@ -397,7 +404,8 @@ atomized in instants hammered around the
         self.assertEqual(uncompressed_data, expected_data)
 
   def test_seek_outside(self):
-    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE,
+                             CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(f, compression_type,
@@ -419,7 +427,8 @@ atomized in instants hammered around the
           self.assertEqual(uncompressed_position, expected_position)
 
   def test_read_and_seek_back_to_beginning(self):
-    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.GZIP]:
+    for compression_type in [CompressionTypes.BZIP2, CompressionTypes.DEFLATE,
+                             CompressionTypes.GZIP]:
       file_name = self._create_compressed_file(compression_type, self.content)
       with open(file_name, 'rb') as f:
         compressed_fd = CompressedFile(f, compression_type,
diff --git a/sdks/python/apache_beam/io/textio_test.py 
b/sdks/python/apache_beam/io/textio_test.py
index d3e32c8..780107a 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -29,6 +29,7 @@ import shutil
 import sys
 import tempfile
 import unittest
+import zlib
 from builtins import range
 
 import apache_beam as beam
@@ -554,6 +555,18 @@ class TextSourceTest(unittest.TestCase):
       assert_that(pcoll, equal_to(lines))
       pipeline.run()
 
+  def test_read_auto_deflate(self):
+    _, lines = write_data(15)
+    with TempDir() as tempdir:
+      file_name = tempdir.create_temp_file(suffix='.deflate')
+      with open(file_name, 'wb') as f:
+        f.write(zlib.compress('\n'.join(lines).encode('utf-8')))
+
+      pipeline = TestPipeline()
+      pcoll = pipeline | 'Read' >> ReadFromText(file_name)
+      assert_that(pcoll, equal_to(lines))
+      pipeline.run()
+
   def test_read_auto_gzip(self):
     _, lines = write_data(15)
     with TempDir() as tempdir:
@@ -641,6 +654,82 @@ class TextSourceTest(unittest.TestCase):
       assert_that(lines, equal_to(expected))
       pipeline.run()
 
+  def test_read_deflate(self):
+    _, lines = write_data(15)
+    with TempDir() as tempdir:
+      file_name = tempdir.create_temp_file()
+      with open(file_name, 'wb') as f:
+        f.write(zlib.compress('\n'.join(lines).encode('utf-8')))
+
+      pipeline = TestPipeline()
+      pcoll = pipeline | 'Read' >> ReadFromText(
+          file_name,
+          0, CompressionTypes.DEFLATE,
+          True, coders.StrUtf8Coder())
+      assert_that(pcoll, equal_to(lines))
+      pipeline.run()
+
+  def test_read_corrupted_deflate_fails(self):
+    _, lines = write_data(15)
+    with TempDir() as tempdir:
+      file_name = tempdir.create_temp_file()
+      with open(file_name, 'wb') as f:
+        f.write(zlib.compress('\n'.join(lines).encode('utf-8')))
+
+      with open(file_name, 'wb') as f:
+        f.write(b'corrupt')
+
+      pipeline = TestPipeline()
+      pcoll = pipeline | 'Read' >> ReadFromText(
+          file_name,
+          0, CompressionTypes.DEFLATE,
+          True, coders.StrUtf8Coder())
+      assert_that(pcoll, equal_to(lines))
+
+      with self.assertRaises(Exception):
+        pipeline.run()
+
+  def test_read_deflate_concat(self):
+    with TempDir() as tempdir:
+      deflate_file_name1 = tempdir.create_temp_file()
+      lines = ['a', 'b', 'c']
+      with open(deflate_file_name1, 'wb') as dst:
+        data = '\n'.join(lines) + '\n'
+        dst.write(zlib.compress(data.encode('utf-8')))
+
+      deflate_file_name2 = tempdir.create_temp_file()
+      lines = ['p', 'q', 'r']
+      with open(deflate_file_name2, 'wb') as dst:
+        data = '\n'.join(lines) + '\n'
+        dst.write(zlib.compress(data.encode('utf-8')))
+
+      deflate_file_name3 = tempdir.create_temp_file()
+      lines = ['x', 'y', 'z']
+      with open(deflate_file_name3, 'wb') as dst:
+        data = '\n'.join(lines) + '\n'
+        dst.write(zlib.compress(data.encode('utf-8')))
+
+      final_deflate_file = tempdir.create_temp_file()
+      with open(deflate_file_name1, 'rb') as src, \
+              open(final_deflate_file, 'wb') as dst:
+        dst.writelines(src.readlines())
+
+      with open(deflate_file_name2, 'rb') as src, \
+              open(final_deflate_file, 'ab') as dst:
+        dst.writelines(src.readlines())
+
+      with open(deflate_file_name3, 'rb') as src, \
+              open(final_deflate_file, 'ab') as dst:
+        dst.writelines(src.readlines())
+
+      pipeline = TestPipeline()
+      lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
+          final_deflate_file,
+          compression_type=beam.io.filesystem.CompressionTypes.DEFLATE)
+
+      expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']
+      assert_that(lines, equal_to(expected))
+
   def test_read_gzip(self):
     _, lines = write_data(15)
     with TempDir() as tempdir:
@@ -975,6 +1064,30 @@ class TextSinkTest(unittest.TestCase):
     with gzip.GzipFile(self.path, 'rb') as f:
       self.assertEqual(f.read().splitlines(), [])
 
+  def test_write_deflate_file(self):
+    sink = TextSink(
+        self.path, compression_type=CompressionTypes.DEFLATE)
+    self._write_lines(sink, self.lines)
+
+    with open(self.path, 'rb') as f:
+      self.assertEqual(zlib.decompress(f.read()).splitlines(), self.lines)
+
+  def test_write_deflate_file_auto(self):
+    self.path = self._create_temp_file(suffix='.deflate')
+    sink = TextSink(self.path)
+    self._write_lines(sink, self.lines)
+
+    with open(self.path, 'rb') as f:
+      self.assertEqual(zlib.decompress(f.read()).splitlines(), self.lines)
+
+  def test_write_deflate_file_empty(self):
+    sink = TextSink(
+        self.path, compression_type=CompressionTypes.DEFLATE)
+    self._write_lines(sink, [])
+
+    with open(self.path, 'rb') as f:
+      self.assertEqual(zlib.decompress(f.read()).splitlines(), [])
+
   def test_write_text_file_with_header(self):
     header = b'header1\nheader2'
     sink = TextSink(self.path, header=header)
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py 
b/sdks/python/apache_beam/io/tfrecordio_test.py
index 49956ea..f003c34 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -28,6 +28,7 @@ import random
 import re
 import sys
 import unittest
+import zlib
 from builtins import range
 
 import crcmod
@@ -76,6 +77,12 @@ def _write_file(path, base64_records):
     f.write(record)
 
 
+def _write_file_deflate(path, base64_records):
+  record = binascii.a2b_base64(base64_records)
+  with open(path, 'wb') as f:
+    f.write(zlib.compress(record))
+
+
 def _write_file_gzip(path, base64_records):
   record = binascii.a2b_base64(base64_records)
   with gzip.GzipFile(path, 'wb') as f:
@@ -266,6 +273,19 @@ class TestReadFromTFRecord(unittest.TestCase):
                       validate=True))
         assert_that(result, equal_to([b'foo', b'bar']))
 
+  def test_process_deflate(self):
+    with TempDir() as temp_dir:
+      path = temp_dir.create_temp_file('result')
+      _write_file_deflate(path, FOO_BAR_RECORD_BASE64)
+      with TestPipeline() as p:
+        result = (p
+                  | ReadFromTFRecord(
+                      path,
+                      coder=coders.BytesCoder(),
+                      compression_type=CompressionTypes.DEFLATE,
+                      validate=True))
+        assert_that(result, equal_to([b'foo', b'bar']))
+
   def test_process_gzip(self):
     with TempDir() as temp_dir:
       path = temp_dir.create_temp_file('result')
@@ -372,6 +392,18 @@ class TestReadAllFromTFRecord(unittest.TestCase):
                       compression_type=CompressionTypes.AUTO))
         assert_that(result, equal_to([b'foo', b'bar'] * 9))
 
+  def test_process_deflate(self):
+    with TempDir() as temp_dir:
+      path = temp_dir.create_temp_file('result')
+      _write_file_deflate(path, FOO_BAR_RECORD_BASE64)
+      with TestPipeline() as p:
+        result = (p
+                  | Create([path])
+                  | ReadAllFromTFRecord(
+                      coder=coders.BytesCoder(),
+                      compression_type=CompressionTypes.DEFLATE))
+        assert_that(result, equal_to([b'foo', b'bar']))
+
   def test_process_gzip(self):
     with TempDir() as temp_dir:
       path = temp_dir.create_temp_file('result')

Reply via email to