This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a913fc6  Use pure Go LZ4 compression
     new b7700c0  Merge pull request #2 from merlimat/lz4-go
a913fc6 is described below

commit a913fc686980249ffac5ab92a2f02cbdd1039bf8
Author: Matteo Merli <[email protected]>
AuthorDate: Sun May 12 09:09:10 2019 -0700

    Use pure Go LZ4 compression
---
 pulsar/internal/compression/lz4.go | 40 +++++++++++++++++++++++++++++++++-----
 1 file changed, 35 insertions(+), 5 deletions(-)

diff --git a/pulsar/internal/compression/lz4.go 
b/pulsar/internal/compression/lz4.go
index da1868c..f5a6d06 100644
--- a/pulsar/internal/compression/lz4.go
+++ b/pulsar/internal/compression/lz4.go
@@ -20,7 +20,7 @@
 package compression
 
 import (
-       "github.com/cloudflare/golz4"
+       "github.com/pierrec/lz4"
 )
 
 type lz4Provider struct {
@@ -31,17 +31,47 @@ func NewLz4Provider() Provider {
 }
 
 func (lz4Provider) Compress(data []byte) []byte {
-       maxSize := lz4.CompressBound(data)
+       const tableSize = 1 << 16
+       hashTable := make([]int, tableSize)
+
+       maxSize := lz4.CompressBlockBound(len(data))
        compressed := make([]byte, maxSize)
-       size, err := lz4.Compress(data, compressed)
+       size, err := lz4.CompressBlock(data, compressed, hashTable)
        if err != nil {
                panic("Failed to compress")
        }
-       return compressed[:size]
+
+       if size == 0 {
+               // The data block was not compressed. Just repeat it with
+               // the block header flag to signal it's not compressed
+               headerSize := writeSize(len(data), compressed)
+               copy(compressed[headerSize:], data)
+               return compressed[:len(data)+headerSize]
+       } else {
+               return compressed[:size]
+       }
+}
+
+// Write the encoded size for the uncompressed payload
+func writeSize(size int, dst []byte) int {
+       if size < 0xF {
+               dst[0] |= byte(size << 4)
+               return 1
+       } else {
+               dst[0] |= 0xF0
+               l := size - 0xF
+               i := 1
+               for ; l >= 0xFF; l -= 0xFF {
+                       dst[i] = 0xFF
+                       i++
+               }
+               dst[i] = byte(l)
+               return i + 1
+       }
 }
 
 func (lz4Provider) Decompress(compressedData []byte, originalSize int) 
([]byte, error) {
        uncompressed := make([]byte, originalSize)
-       err := lz4.Uncompress(compressedData, uncompressed)
+       _, err := lz4.UncompressBlock(compressedData, uncompressed)
        return uncompressed, err
 }

Reply via email to