This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7912594  Switched to DataDog zstd wrapper, reusing the compression ctx 
(#287)
7912594 is described below

commit 7912594f37e6805c4496b4c3d6d803de59917e66
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 18 20:48:04 2020 -0700

    Switched to DataDog zstd wrapper, reusing the compression ctx (#287)
    
    --- Before
    
    BenchmarkCompression/zstd-cgo-level-fastest-16                  2272        
    513861 ns/op         195.78 MB/s
    BenchmarkCompression/zstd-cgo-level-default-16                  1477        
    772904 ns/op         130.17 MB/s
    BenchmarkCompression/zstd-cgo-level-best-16                      254        
   4670399 ns/op          21.54 MB/s
    BenchmarkDecompression/zstd-cgo-level-fastest-16                8382        
    154040 ns/op         653.11 MB/s
    BenchmarkDecompression/zstd-cgo-level-default-16                7524        
    156934 ns/op         641.07 MB/s
    BenchmarkDecompression/zstd-cgo-level-best-16                   7748        
    162531 ns/op         619.00 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-fastest-16                 
22719             54002 ns/op        1862.99 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-default-16                 
13485             88328 ns/op        1139.01 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-best-16                     
2161            561107 ns/op         179.30 MB/s
    
    --- After
    
    BenchmarkCompression/zstd-cgo-level-fastest-16                  2366        
    472138 ns/op         213.09 MB/s
    BenchmarkCompression/zstd-cgo-level-default-16                  1576        
    756111 ns/op         133.06 MB/s
    BenchmarkCompression/zstd-cgo-level-best-16                      271        
   4452809 ns/op          22.59 MB/s
    BenchmarkDecompression/zstd-cgo-level-fastest-16                9352        
    132880 ns/op         757.12 MB/s
    BenchmarkDecompression/zstd-cgo-level-default-16                8473        
    142471 ns/op         706.15 MB/s
    BenchmarkDecompression/zstd-cgo-level-best-16                   7413        
    147722 ns/op         681.05 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-fastest-16                 
23857             50493 ns/op        1992.49 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-default-16                 
13832             87510 ns/op        1149.66 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-best-16                     
2220            526170 ns/op         191.20 MB/s
---
 go.mod                                  |  2 +-
 go.sum                                  |  4 ++--
 pulsar/internal/compression/zstd_cgo.go | 16 ++++++++++++----
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod
index f807d50..21c774d 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/apache/pulsar-client-go
 go 1.12
 
 require (
+       github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
        github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
        github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
        github.com/golang/protobuf v1.3.1
@@ -15,6 +16,5 @@ require (
        github.com/spf13/cobra v0.0.3
        github.com/spf13/pflag v1.0.3 // indirect
        github.com/stretchr/testify v1.4.0
-       github.com/valyala/gozstd v1.7.0
        github.com/yahoo/athenz v1.8.55
 )
diff --git a/go.sum b/go.sum
index 393865f..333286d 100644
--- a/go.sum
+++ b/go.sum
@@ -1,4 +1,6 @@
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 
h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
+github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod 
h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/ardielle/ardielle-go v1.5.2 
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod 
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/ardielle/ardielle-tools v1.5.4/go.mod 
h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
@@ -49,8 +51,6 @@ github.com/stretchr/testify v1.3.0 
h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/valyala/gozstd v1.7.0 
h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ=
-github.com/valyala/gozstd v1.7.0/go.mod 
h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
 github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod 
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
diff --git a/pulsar/internal/compression/zstd_cgo.go 
b/pulsar/internal/compression/zstd_cgo.go
index bf396a5..f6f6c45 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -24,29 +24,37 @@
 package compression
 
 import (
-       zstd "github.com/valyala/gozstd"
+       "github.com/DataDog/zstd"
+       log "github.com/sirupsen/logrus"
 )
 
 type zstdCGoProvider struct {
+       ctx              zstd.Ctx
        compressionLevel int
 }
 
 func newCGoZStdProvider(compressionLevel int) Provider {
        return &zstdCGoProvider{
                compressionLevel: compressionLevel,
+               ctx:              zstd.NewCtx(),
        }
 }
 
 func NewZStdProvider() Provider {
-       return newCGoZStdProvider(zstd.DefaultCompressionLevel)
+       return newCGoZStdProvider(zstd.DefaultCompression)
 }
 
 func (z *zstdCGoProvider) Compress(data []byte) []byte {
-       return zstd.CompressLevel(nil, data, z.compressionLevel)
+       out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
+       if err != nil {
+               log.WithError(err).Fatal("Failed to compress")
+       }
+
+       return out
 }
 
 func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) 
([]byte, error) {
-       return zstd.Decompress(nil, compressedData)
+       return z.ctx.Decompress(nil, compressedData)
 }
 
 func (z *zstdCGoProvider) Close() error {

Reply via email to