This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/branch-0.12.0 by this push:
     new 0f0d5a86 Fix SIGSEGV with zstd compression enabled (#1164)
0f0d5a86 is described below

commit 0f0d5a86919495b846f6201d9df7b157c69c15b5
Author: Zike Yang <[email protected]>
AuthorDate: Fri Feb 2 01:24:50 2024 +0800

    Fix SIGSEGV with zstd compression enabled (#1164)
    
    * Fix SIGSEGV with zstd compression enabled
    
    * Use sync.Pool to cache zstd ctx
    
    * Fix race in sequenceID assignment
    
    * Fix GetAndAdd
    
    (cherry picked from commit 877613503b70c4ee67a8408f31881d88fc086456)
---
 pulsar/internal/compression/zstd_cgo.go | 16 ++++++++++++----
 pulsar/internal/utils.go                |  2 +-
 pulsar/producer_test.go                 | 28 ++++++++++++++++++++++++++++
 3 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/pulsar/internal/compression/zstd_cgo.go 
b/pulsar/internal/compression/zstd_cgo.go
index 25429e25..dde54ae2 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -25,19 +25,23 @@
 package compression
 
 import (
+       "sync"
+
        "github.com/DataDog/zstd"
        log "github.com/sirupsen/logrus"
 )
 
 type zstdCGoProvider struct {
-       ctx       zstd.Ctx
+       ctxPool   sync.Pool
        level     Level
        zstdLevel int
 }
 
 func newCGoZStdProvider(level Level) Provider {
        z := &zstdCGoProvider{
-               ctx: zstd.NewCtx(),
+               ctxPool: sync.Pool{New: func() any {
+                       return zstd.NewCtx()
+               }},
        }
 
        switch level {
@@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int) 
int {
 }
 
 func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
-       out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
+       ctx := z.ctxPool.Get().(zstd.Ctx)
+       defer z.ctxPool.Put(ctx)
+       out, err := ctx.CompressLevel(dst, src, z.zstdLevel)
        if err != nil {
                log.WithError(err).Fatal("Failed to compress")
        }
@@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
 }
 
 func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) 
([]byte, error) {
-       return z.ctx.Decompress(dst, src)
+       ctx := z.ctxPool.Get().(zstd.Ctx)
+       defer z.ctxPool.Put(ctx)
+       return ctx.Decompress(dst, src)
 }
 
 func (z *zstdCGoProvider) Close() error {
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index 9378d9dc..2dc82101 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 {
 // GetAndAdd perform atomic read and update
 func GetAndAdd(n *uint64, diff uint64) uint64 {
        for {
-               v := *n
+               v := atomic.LoadUint64(n)
                if atomic.CompareAndSwapUint64(n, v, v+diff) {
                        return v
                }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0d74cdee..3b9ea7e8 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) {
        assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
 }
 
+func TestSendConcurrently(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+       testProducer, err := client.CreateProducer(ProducerOptions{
+               Topic:            newTopicName(),
+               CompressionType:  ZSTD,
+               CompressionLevel: Better,
+               DisableBatching:  true,
+       })
+       assert.NoError(t, err)
+
+       var wg sync.WaitGroup
+       for i := 0; i < 100; i++ {
+               wg.Add(1)
+               go func() {
+                       _, err := testProducer.Send(context.Background(), 
&ProducerMessage{
+                               Payload: make([]byte, 100),
+                       })
+                       assert.NoError(t, err)
+                       wg.Done()
+               }()
+       }
+       wg.Wait()
+}
+
 type pendingQueueWrapper struct {
        pendingQueue   internal.BlockingQueue
        writtenBuffers *[]internal.Buffer

Reply via email to