This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 41093516 fix(producer): fail all messages that are pending requests
when closing (#1059)
41093516 is described below
commit 41093516af21cf2238e11532d877e1aab73f95bf
Author: grayson <[email protected]>
AuthorDate: Wed Aug 30 15:26:38 2023 +0800
fix(producer): fail all messages that are pending requests when closing
(#1059)
This aligns the manner with Java client.
---
pulsar/producer_partition.go | 60 ++++++++++++++++++++++++++++++++++++++++++++
pulsar/producer_test.go | 30 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d3f61ef1..720c7df4 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1291,6 +1291,8 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
return
}
+ defer close(p.dataChan)
+ defer close(p.cmdChan)
p.log.Info("Closing producer")
id := p.client.rpcClient.NewRequestID()
@@ -1304,6 +1306,7 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
} else {
p.log.Info("Closed producer")
}
+ p.failPendingMessages()
if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
@@ -1316,6 +1319,63 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
p.batchFlushTicker.Stop()
}
+func (p *partitionProducer) failPendingMessages() {
+ curViewItems := p.pendingQueue.ReadableSlice()
+ viewSize := len(curViewItems)
+ if viewSize <= 0 {
+ return
+ }
+ p.log.Infof("Failing %d messages on closing producer", viewSize)
+ lastViewItem := curViewItems[viewSize-1].(*pendingItem)
+
+ // iterate at most viewSize items
+ for i := 0; i < viewSize; i++ {
+ item := p.pendingQueue.CompareAndPoll(
+ func(m interface{}) bool {
+ return m != nil
+ })
+
+ if item == nil {
+ return
+ }
+
+ pi := item.(*pendingItem)
+ pi.Lock()
+
+ for _, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ if sr.msg != nil {
+ size := len(sr.msg.Payload)
+ p.releaseSemaphoreAndMem(sr.reservedMem)
+ p.metrics.MessagesPending.Dec()
+ p.metrics.BytesPending.Sub(float64(size))
+ p.log.WithError(errProducerClosed).
+ WithField("size", size).
+ WithField("properties",
sr.msg.Properties)
+ }
+
+ if sr.callback != nil {
+ sr.callbackOnce.Do(func() {
+ runCallback(sr.callback, nil, sr.msg,
errProducerClosed)
+ })
+ }
+ if sr.transaction != nil {
+ sr.transaction.endSendOrAckOp(nil)
+ }
+ }
+
+ // flag the sending has completed with error, flush make no
effect
+ pi.Complete(errProducerClosed)
+ pi.Unlock()
+
+ // finally reached the last view item, current iteration ends
+ if pi == lastViewItem {
+ p.log.Infof("%d messages complete failed", viewSize)
+ return
+ }
+ }
+}
+
func (p *partitionProducer) LastSequenceID() int64 {
return atomic.LoadInt64(&p.lastSequenceID)
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a9a7b819..38fec576 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2268,3 +2268,33 @@ func TestProducerSendWithContext(t *testing.T) {
// producer.Send should fail and return err context.Canceled
assert.True(t, errors.Is(err, context.Canceled))
}
+
+func TestFailPendingMessageWithClose(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+ testProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ DisableBlockIfQueueFull: false,
+ BatchingMaxPublishDelay: 100000,
+ BatchingMaxMessages: 1000,
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, testProducer)
+ for i := 0; i < 3; i++ {
+ testProducer.SendAsync(context.Background(), &ProducerMessage{
+ Payload: make([]byte, 1024),
+ }, func(id MessageID, message *ProducerMessage, e error) {
+ if e != nil {
+ assert.Equal(t, errProducerClosed, e)
+ }
+ })
+ }
+ partitionProducerImp :=
testProducer.(*producer).producers[0].(*partitionProducer)
+ partitionProducerImp.pendingQueue.Put(&pendingItem{})
+ testProducer.Close()
+ assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
+}