[ 
https://issues.apache.org/jira/browse/ARROW-1782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264422#comment-16264422
 ] 

ASF GitHub Bot commented on ARROW-1782:
---------------------------------------

wesm closed pull request #1345: ARROW-1782: [Python] Add pyarrow.compress, 
decompress APIs
URL: https://github.com/apache/arrow/pull/1345
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index ae187a7fc..de3837ec7 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -27,7 +27,7 @@
 namespace arrow {
 
 struct Compression {
-  enum type { UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, ZSTD, LZ4 };
+  enum type { UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO };
 };
 
 class ARROW_EXPORT Codec {
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index fb2a28677..bb2a0420b 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -195,7 +195,11 @@ Input / Output and Shared Memory
    :toctree: generated/
 
    allocate_buffer
+   compress
+   decompress
+   frombuffer
    Buffer
+   ResizableBuffer
    BufferReader
    BufferOutputStream
    NativeFile
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index c4db36e55..0456a658f 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -71,18 +71,21 @@
 # ARROW-1683: Remove after 0.8.0?
 from pyarrow.lib import TimestampType
 
-from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
-                         FixedSizeBufferWriter,
-                         Buffer, BufferReader, BufferOutputStream,
-                         OSFile, MemoryMappedFile, memory_map,
-                         allocate_buffer, frombuffer,
-                         memory_map, create_memory_map,
-                         have_libhdfs, have_libhdfs3, MockOutputStream)
+# Buffers, allocation
+from pyarrow.lib import (Buffer, ResizableBuffer, compress, decompress,
+                         allocate_buffer, frombuffer)
 
 from pyarrow.lib import (MemoryPool, total_allocated_bytes,
                          set_memory_pool, default_memory_pool,
                          log_memory_allocations)
 
+from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
+                         FixedSizeBufferWriter,
+                         BufferReader, BufferOutputStream,
+                         OSFile, MemoryMappedFile, memory_map,
+                         create_memory_map, have_libhdfs, have_libhdfs3,
+                         MockOutputStream)
+
 from pyarrow.lib import (ChunkedArray, Column, RecordBatch, Table,
                          concat_tables)
 
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 324648178..5d68607ef 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -169,26 +169,27 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
     cdef cppclass CBuffer" arrow::Buffer":
         CBuffer(const uint8_t* data, int64_t size)
-        uint8_t* data()
+        const uint8_t* data()
+        uint8_t* mutable_data()
         int64_t size()
         shared_ptr[CBuffer] parent()
         c_bool is_mutable() const
+        c_bool Equals(const CBuffer& other)
 
     cdef cppclass CMutableBuffer" arrow::MutableBuffer"(CBuffer):
         CMutableBuffer(const uint8_t* data, int64_t size)
-        uint8_t* mutable_data()
+
+    cdef cppclass CResizableBuffer" arrow::ResizableBuffer"(CMutableBuffer):
+        CStatus Resize(const int64_t new_size, c_bool shrink_to_fit)
+        CStatus Reserve(const int64_t new_size)
 
     CStatus AllocateBuffer(CMemoryPool* pool, const int64_t size,
                            shared_ptr[CBuffer]* out)
 
     CStatus AllocateResizableBuffer(CMemoryPool* pool, const int64_t size,
-                                    shared_ptr[ResizableBuffer]* out)
+                                    shared_ptr[CResizableBuffer]* out)
 
-    cdef cppclass ResizableBuffer(CBuffer):
-        CStatus Resize(int64_t nbytes)
-        CStatus Reserve(int64_t nbytes)
-
-    cdef cppclass PoolBuffer(ResizableBuffer):
+    cdef cppclass PoolBuffer(CResizableBuffer):
         PoolBuffer()
         PoolBuffer(CMemoryPool*)
 
@@ -635,7 +636,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" 
nogil:
 
     cdef cppclass CBufferOutputStream \
             " arrow::io::BufferOutputStream"(OutputStream):
-        CBufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
+        CBufferOutputStream(const shared_ptr[CResizableBuffer]& buffer)
 
     cdef cppclass CMockOutputStream \
             " arrow::io::MockOutputStream"(OutputStream):
@@ -661,6 +662,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" 
nogil:
         MessageType_V1" arrow::ipc::MetadataVersion::V1"
         MessageType_V2" arrow::ipc::MetadataVersion::V2"
         MessageType_V3" arrow::ipc::MetadataVersion::V3"
+        MessageType_V4" arrow::ipc::MetadataVersion::V4"
 
     cdef cppclass CMessage" arrow::ipc::Message":
         CStatus Open(const shared_ptr[CBuffer]& metadata,
@@ -926,3 +928,26 @@ cdef extern from 'arrow/python/init.h':
 
 cdef extern from 'arrow/python/config.h' namespace 'arrow::py':
     void set_numpy_nan(object o)
+
+
+cdef extern from 'arrow/util/compression.h' namespace 'arrow' nogil:
+    enum CompressionType" arrow::Compression::type":
+        CompressionType_UNCOMPRESSED" arrow::Compression::UNCOMPRESSED"
+        CompressionType_SNAPPY" arrow::Compression::SNAPPY"
+        CompressionType_GZIP" arrow::Compression::GZIP"
+        CompressionType_BROTLI" arrow::Compression::BROTLI"
+        CompressionType_ZSTD" arrow::Compression::ZSTD"
+        CompressionType_LZ4" arrow::Compression::LZ4"
+
+    cdef cppclass CCodec" arrow::Codec":
+        @staticmethod
+        CStatus Create(CompressionType codec, unique_ptr[CCodec]* out)
+
+        CStatus Decompress(int64_t input_len, const uint8_t* input,
+                           int64_t output_len, uint8_t* output_buffer)
+
+        CStatus Compress(int64_t input_len, const uint8_t* input,
+                         int64_t output_buffer_len, uint8_t* output_buffer,
+                         int64_t* output_length)
+
+        int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 495e31b5a..619ba365c 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -600,6 +600,23 @@ cdef class Buffer:
         # TODO(wesm): buffer slicing
         raise NotImplementedError
 
+    def equals(self, Buffer other):
+        """
+        Determine if two buffers contain exactly the same data
+
+        Parameters
+        ----------
+        other : Buffer
+
+        Returns
+        -------
+        are_equal : True if buffer contents and size are equal
+        """
+        cdef c_bool result = False
+        with nogil:
+            result = self.buffer.get().Equals(deref(other.buffer.get()))
+        return result
+
     def to_pybytes(self):
         return cp.PyBytes_FromStringAndSize(
             <const char*>self.buffer.get().data(),
@@ -644,13 +661,37 @@ cdef class Buffer:
         return self.size
 
 
+cdef class ResizableBuffer(Buffer):
+
+    cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer):
+        self.init(<shared_ptr[CBuffer]> buffer)
+
+    def resize(self, int64_t new_size, shrink_to_fit=False):
+        """
+        Resize buffer to indicated size
+
+        Parameters
+        ----------
+        new_size : int64_t
+            New size of buffer (padding may be added internally)
+        shrink_to_fit : boolean, default False
+            If new_size is less than the current size, shrink internal
+            capacity, otherwise leave at current capacity
+        """
+        cdef c_bool c_shrink_to_fit = shrink_to_fit
+        with nogil:
+            check_status((<CResizableBuffer*> self.buffer.get())
+                         .Resize(new_size, c_shrink_to_fit))
+
+
 cdef shared_ptr[PoolBuffer] _allocate_buffer(CMemoryPool* pool):
     cdef shared_ptr[PoolBuffer] result
     result.reset(new PoolBuffer(pool))
     return result
 
 
-def allocate_buffer(int64_t size, MemoryPool pool=None):
+def allocate_buffer(int64_t size, MemoryPool memory_pool=None,
+                    resizable=False):
     """
     Allocate mutable fixed-size buffer
 
@@ -658,17 +699,27 @@ def allocate_buffer(int64_t size, MemoryPool pool=None):
     ----------
     size : int
         Number of bytes to allocate (plus internal padding)
-    pool : MemoryPool, optional
+    memory_pool : MemoryPool, optional
         Uses default memory pool if not provided
+    resizable : boolean, default False
+
+    Returns
+    -------
+    buffer : Buffer or ResizableBuffer
     """
     cdef:
         shared_ptr[CBuffer] buffer
-        CMemoryPool* cpool = maybe_unbox_memory_pool(pool)
+        shared_ptr[CResizableBuffer] rz_buffer
+        CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool)
 
-    with nogil:
-        check_status(AllocateBuffer(cpool, size, &buffer))
-
-    return pyarrow_wrap_buffer(buffer)
+    if resizable:
+        with nogil:
+            check_status(AllocateResizableBuffer(cpool, size, &rz_buffer))
+        return pyarrow_wrap_resizable_buffer(rz_buffer)
+    else:
+        with nogil:
+            check_status(AllocateBuffer(cpool, size, &buffer))
+        return pyarrow_wrap_buffer(buffer)
 
 
 cdef class BufferOutputStream(NativeFile):
@@ -679,7 +730,7 @@ cdef class BufferOutputStream(NativeFile):
     def __cinit__(self, MemoryPool memory_pool=None):
         self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool))
         self.wr_file.reset(new CBufferOutputStream(
-            <shared_ptr[ResizableBuffer]> self.buffer))
+            <shared_ptr[CResizableBuffer]> self.buffer))
         self.is_readable = 0
         self.is_writeable = 1
         self.is_open = True
@@ -783,3 +834,145 @@ cdef get_writer(object source, shared_ptr[OutputStream]* 
writer):
     else:
         raise TypeError('Unable to read from object of type: {0}'
                         .format(type(source)))
+
+
+# ---------------------------------------------------------------------
+
+cdef CompressionType _get_compression_type(object name):
+    if name is None or name == 'uncompressed':
+        return CompressionType_UNCOMPRESSED
+    elif name == 'snappy':
+        return CompressionType_SNAPPY
+    elif name == 'gzip':
+        return CompressionType_GZIP
+    elif name == 'brotli':
+        return CompressionType_BROTLI
+    elif name == 'zstd':
+        return CompressionType_ZSTD
+    elif name == 'lz4':
+        return CompressionType_LZ4
+    else:
+        raise ValueError("Unrecognized compression type: {0}"
+                         .format(str(name)))
+
+
+def compress(object buf, codec='lz4', asbytes=False, memory_pool=None):
+    """
+    Compress pyarrow.Buffer or Python object supporting the buffer (memoryview)
+    protocol
+
+    Parameters
+    ----------
+    buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol
+    codec : string, default 'lz4'
+        Compression codec.
+        Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'}
+    asbytes : boolean, default False
+        Return result as Python bytes object, otherwise Buffer
+    memory_pool : MemoryPool, default None
+        Memory pool to use for buffer allocations, if any
+
+    Returns
+    -------
+    compressed : pyarrow.Buffer or bytes (if asbytes=True)
+    """
+    cdef:
+        CompressionType c_codec = _get_compression_type(codec)
+        unique_ptr[CCodec] compressor
+        cdef CBuffer* c_buf
+        cdef PyObject* pyobj
+        cdef ResizableBuffer out_buf
+
+    with nogil:
+        check_status(CCodec.Create(c_codec, &compressor))
+
+    if not isinstance(buf, Buffer):
+        buf = frombuffer(buf)
+
+    c_buf = (<Buffer> buf).buffer.get()
+
+    cdef int64_t max_output_size = (compressor.get()
+                                    .MaxCompressedLen(c_buf.size(),
+                                                      c_buf.data()))
+    cdef uint8_t* output_buffer = NULL
+
+    if asbytes:
+        pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size)
+        output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj)
+    else:
+        out_buf = allocate_buffer(max_output_size, memory_pool=memory_pool,
+                                  resizable=True)
+        output_buffer = out_buf.buffer.get().mutable_data()
+
+    cdef int64_t output_length = 0
+    with nogil:
+        check_status(compressor.get()
+                     .Compress(c_buf.size(), c_buf.data(),
+                               max_output_size, output_buffer,
+                               &output_length))
+
+    if asbytes:
+        cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length)
+        return PyObject_to_object(pyobj)
+    else:
+        out_buf.resize(output_length)
+        return out_buf
+
+
+def decompress(object buf, decompressed_size=None, codec='lz4',
+               asbytes=False, memory_pool=None):
+    """
+    Decompress data from buffer-like object
+
+    Parameters
+    ----------
+    buf : pyarrow.Buffer, bytes, or memoryview-compatible object
+    decompressed_size : int64_t, default None
+        If not specified, will be computed if the codec is able to determine
+        the uncompressed buffer size
+    codec : string, default 'lz4'
+        Compression codec.
+        Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'}
+    asbytes : boolean, default False
+        Return result as Python bytes object, otherwise Buffer
+    memory_pool : MemoryPool, default None
+        Memory pool to use for buffer allocations, if any
+
+    Returns
+    -------
+    uncompressed : pyarrow.Buffer or bytes (if asbytes=True)
+    """
+    cdef:
+        CompressionType c_codec = _get_compression_type(codec)
+        unique_ptr[CCodec] compressor
+        cdef CBuffer* c_buf
+        cdef Buffer out_buf
+
+    with nogil:
+        check_status(CCodec.Create(c_codec, &compressor))
+
+    if not isinstance(buf, Buffer):
+        buf = frombuffer(buf)
+
+    c_buf = (<Buffer> buf).buffer.get()
+
+    if decompressed_size is None:
+        raise ValueError("Must pass decompressed_size for {0} codec"
+                         .format(codec))
+
+    cdef int64_t output_size = decompressed_size
+    cdef uint8_t* output_buffer = NULL
+
+    if asbytes:
+        pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size)
+        output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf)
+    else:
+        out_buf = allocate_buffer(output_size, memory_pool=memory_pool)
+        output_buffer = out_buf.buffer.get().mutable_data()
+
+    with nogil:
+        check_status(compressor.get()
+                     .Decompress(c_buf.size(), c_buf.data(),
+                                 output_size, output_buffer))
+
+    return pybuf if asbytes else out_buf
diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd
index 5abb72ba4..90f749d6d 100644
--- a/python/pyarrow/lib.pxd
+++ b/python/pyarrow/lib.pxd
@@ -323,6 +323,11 @@ cdef class Buffer:
     cdef void init(self, const shared_ptr[CBuffer]& buffer)
 
 
+cdef class ResizableBuffer(Buffer):
+
+    cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer)
+
+
 cdef class NativeFile:
     cdef:
         shared_ptr[RandomAccessFile] rd_file
@@ -343,6 +348,8 @@ cdef get_reader(object source, 
shared_ptr[RandomAccessFile]* reader)
 cdef get_writer(object source, shared_ptr[OutputStream]* writer)
 
 cdef public object pyarrow_wrap_buffer(const shared_ptr[CBuffer]& buf)
+cdef public object pyarrow_wrap_resizable_buffer(
+    const shared_ptr[CResizableBuffer]& buf)
 cdef public object pyarrow_wrap_data_type(const shared_ptr[CDataType]& type)
 cdef public object pyarrow_wrap_field(const shared_ptr[CField]& field)
 cdef public object pyarrow_wrap_schema(const shared_ptr[CSchema]& type)
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx
index bc0e94e64..f2e8653d8 100644
--- a/python/pyarrow/plasma.pyx
+++ b/python/pyarrow/plasma.pyx
@@ -30,7 +30,7 @@ import collections
 import pyarrow
 
 from pyarrow.lib cimport Buffer, NativeFile, check_status
-from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer,
+from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer,
                                         CFixedSizeBufferWriter, CStatus)
 
 
diff --git a/python/pyarrow/public-api.pxi b/python/pyarrow/public-api.pxi
index bf670c5c4..9776f2ad7 100644
--- a/python/pyarrow/public-api.pxi
+++ b/python/pyarrow/public-api.pxi
@@ -43,6 +43,13 @@ cdef public api object pyarrow_wrap_buffer(const 
shared_ptr[CBuffer]& buf):
     return result
 
 
+cdef public api object pyarrow_wrap_resizable_buffer(
+    const shared_ptr[CResizableBuffer]& buf):
+    cdef ResizableBuffer result = ResizableBuffer()
+    result.init_rz(buf)
+    return result
+
+
 cdef public api bint pyarrow_is_data_type(object type_):
     return isinstance(type_, DataType)
 
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 98c465adc..e60dd35de 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -182,6 +182,42 @@ def test_allocate_buffer():
     assert buf.to_pybytes()[:5] == bit
 
 
+def test_allocate_buffer_resizable():
+    buf = pa.allocate_buffer(100, resizable=True)
+    assert isinstance(buf, pa.ResizableBuffer)
+
+    buf.resize(200)
+    assert buf.size == 200
+
+
+def test_compress_decompress():
+    INPUT_SIZE = 10000
+    test_data = (np.random.randint(0, 255, size=INPUT_SIZE)
+                 .astype(np.uint8)
+                 .tostring())
+    test_buf = pa.frombuffer(test_data)
+
+    codecs = ['lz4', 'snappy', 'gzip', 'zstd', 'brotli']
+    for codec in codecs:
+        compressed_buf = pa.compress(test_buf, codec=codec)
+        compressed_bytes = pa.compress(test_data, codec=codec, asbytes=True)
+
+        assert isinstance(compressed_bytes, bytes)
+
+        decompressed_buf = pa.decompress(compressed_buf, INPUT_SIZE,
+                                         codec=codec)
+        decompressed_bytes = pa.decompress(compressed_bytes, INPUT_SIZE,
+                                           codec=codec, asbytes=True)
+
+        assert isinstance(decompressed_bytes, bytes)
+
+        assert decompressed_buf.equals(test_buf)
+        assert decompressed_bytes == test_data
+
+        with pytest.raises(ValueError):
+            pa.decompress(compressed_bytes, codec=codec)
+
+
 def test_buffer_memoryview_is_immutable():
     val = b'some data'
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [Python] Expose compressors as pyarrow.compress, pyarrow.decompress
> -------------------------------------------------------------------
>
>                 Key: ARROW-1782
>                 URL: https://issues.apache.org/jira/browse/ARROW-1782
>             Project: Apache Arrow
>          Issue Type: New Feature
>          Components: Python
>            Reporter: Wes McKinney
>            Assignee: Wes McKinney
>              Labels: pull-request-available
>             Fix For: 0.8.0
>
>
> These should release the GIL, and serve as an alternative to the various 
> compressor wrapper libraries out there. They should have the ability to work 
> with {{pyarrow.Buffer}} or {{PyBytes}} as the user prefers



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to