This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch branch-0.12.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/branch-0.12.0 by this push:
new 0f0d5a86 Fix SIGSEGV with zstd compression enabled (#1164)
0f0d5a86 is described below
commit 0f0d5a86919495b846f6201d9df7b157c69c15b5
Author: Zike Yang <[email protected]>
AuthorDate: Fri Feb 2 01:24:50 2024 +0800
Fix SIGSEGV with zstd compression enabled (#1164)
* Fix SIGSEGV with zstd compression enabled
* Use sync.Pool to cache zstd ctx
* Fix race in sequenceID assignment
* Fix GetAndAdd
(cherry picked from commit 877613503b70c4ee67a8408f31881d88fc086456)
---
pulsar/internal/compression/zstd_cgo.go | 16 ++++++++++++----
pulsar/internal/utils.go | 2 +-
pulsar/producer_test.go | 28 ++++++++++++++++++++++++++++
3 files changed, 41 insertions(+), 5 deletions(-)
diff --git a/pulsar/internal/compression/zstd_cgo.go
b/pulsar/internal/compression/zstd_cgo.go
index 25429e25..dde54ae2 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -25,19 +25,23 @@
package compression
import (
+ "sync"
+
"github.com/DataDog/zstd"
log "github.com/sirupsen/logrus"
)
type zstdCGoProvider struct {
- ctx zstd.Ctx
+ ctxPool sync.Pool
level Level
zstdLevel int
}
func newCGoZStdProvider(level Level) Provider {
z := &zstdCGoProvider{
- ctx: zstd.NewCtx(),
+ ctxPool: sync.Pool{New: func() any {
+ return zstd.NewCtx()
+ }},
}
switch level {
@@ -61,7 +65,9 @@ func (z *zstdCGoProvider) CompressMaxSize(originalSize int)
int {
}
func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
- out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
+ ctx := z.ctxPool.Get().(zstd.Ctx)
+ defer z.ctxPool.Put(ctx)
+ out, err := ctx.CompressLevel(dst, src, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
@@ -70,7 +76,9 @@ func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
}
func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int)
([]byte, error) {
- return z.ctx.Decompress(dst, src)
+ ctx := z.ctxPool.Get().(zstd.Ctx)
+ defer z.ctxPool.Put(ctx)
+ return ctx.Decompress(dst, src)
}
func (z *zstdCGoProvider) Close() error {
diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go
index 9378d9dc..2dc82101 100644
--- a/pulsar/internal/utils.go
+++ b/pulsar/internal/utils.go
@@ -40,7 +40,7 @@ func TimestampMillis(t time.Time) uint64 {
// GetAndAdd perform atomic read and update
func GetAndAdd(n *uint64, diff uint64) uint64 {
for {
- v := *n
+ v := atomic.LoadUint64(n)
if atomic.CompareAndSwapUint64(n, v, v+diff) {
return v
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0d74cdee..3b9ea7e8 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2357,6 +2357,34 @@ func TestFailPendingMessageWithClose(t *testing.T) {
assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
}
+func TestSendConcurrently(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+ testProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ CompressionType: ZSTD,
+ CompressionLevel: Better,
+ DisableBatching: true,
+ })
+ assert.NoError(t, err)
+
+ var wg sync.WaitGroup
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func() {
+ _, err := testProducer.Send(context.Background(),
&ProducerMessage{
+ Payload: make([]byte, 100),
+ })
+ assert.NoError(t, err)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+}
+
type pendingQueueWrapper struct {
pendingQueue internal.BlockingQueue
writtenBuffers *[]internal.Buffer