This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 41093516 fix(producer): fail all messages that are pending requests 
when closing (#1059)
41093516 is described below

commit 41093516af21cf2238e11532d877e1aab73f95bf
Author: grayson <[email protected]>
AuthorDate: Wed Aug 30 15:26:38 2023 +0800

    fix(producer): fail all messages that are pending requests when closing 
(#1059)
    
    This aligns the manner with Java client.
---
 pulsar/producer_partition.go | 60 ++++++++++++++++++++++++++++++++++++++++++++
 pulsar/producer_test.go      | 30 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d3f61ef1..720c7df4 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1291,6 +1291,8 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
                return
        }
 
+       defer close(p.dataChan)
+       defer close(p.cmdChan)
        p.log.Info("Closing producer")
 
        id := p.client.rpcClient.NewRequestID()
@@ -1304,6 +1306,7 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        } else {
                p.log.Info("Closed producer")
        }
+       p.failPendingMessages()
 
        if p.batchBuilder != nil {
                if err = p.batchBuilder.Close(); err != nil {
@@ -1316,6 +1319,63 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        p.batchFlushTicker.Stop()
 }
 
+func (p *partitionProducer) failPendingMessages() {
+       curViewItems := p.pendingQueue.ReadableSlice()
+       viewSize := len(curViewItems)
+       if viewSize <= 0 {
+               return
+       }
+       p.log.Infof("Failing %d messages on closing producer", viewSize)
+       lastViewItem := curViewItems[viewSize-1].(*pendingItem)
+
+       // iterate at most viewSize items
+       for i := 0; i < viewSize; i++ {
+               item := p.pendingQueue.CompareAndPoll(
+                       func(m interface{}) bool {
+                               return m != nil
+                       })
+
+               if item == nil {
+                       return
+               }
+
+               pi := item.(*pendingItem)
+               pi.Lock()
+
+               for _, i := range pi.sendRequests {
+                       sr := i.(*sendRequest)
+                       if sr.msg != nil {
+                               size := len(sr.msg.Payload)
+                               p.releaseSemaphoreAndMem(sr.reservedMem)
+                               p.metrics.MessagesPending.Dec()
+                               p.metrics.BytesPending.Sub(float64(size))
+                               p.log.WithError(errProducerClosed).
+                                       WithField("size", size).
+                                       WithField("properties", 
sr.msg.Properties)
+                       }
+
+                       if sr.callback != nil {
+                               sr.callbackOnce.Do(func() {
+                                       runCallback(sr.callback, nil, sr.msg, 
errProducerClosed)
+                               })
+                       }
+                       if sr.transaction != nil {
+                               sr.transaction.endSendOrAckOp(nil)
+                       }
+               }
+
+               // flag the sending has completed with error, flush make no 
effect
+               pi.Complete(errProducerClosed)
+               pi.Unlock()
+
+               // finally reached the last view item, current iteration ends
+               if pi == lastViewItem {
+                       p.log.Infof("%d messages complete failed", viewSize)
+                       return
+               }
+       }
+}
+
 func (p *partitionProducer) LastSequenceID() int64 {
        return atomic.LoadInt64(&p.lastSequenceID)
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a9a7b819..38fec576 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -2268,3 +2268,33 @@ func TestProducerSendWithContext(t *testing.T) {
        //  producer.Send should fail and return err context.Canceled
        assert.True(t, errors.Is(err, context.Canceled))
 }
+
+func TestFailPendingMessageWithClose(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+       testProducer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   newTopicName(),
+               DisableBlockIfQueueFull: false,
+               BatchingMaxPublishDelay: 100000,
+               BatchingMaxMessages:     1000,
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, testProducer)
+       for i := 0; i < 3; i++ {
+               testProducer.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: make([]byte, 1024),
+               }, func(id MessageID, message *ProducerMessage, e error) {
+                       if e != nil {
+                               assert.Equal(t, errProducerClosed, e)
+                       }
+               })
+       }
+       partitionProducerImp := 
testProducer.(*producer).producers[0].(*partitionProducer)
+       partitionProducerImp.pendingQueue.Put(&pendingItem{})
+       testProducer.Close()
+       assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size())
+}

Reply via email to