This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit a6b15dc39e3abd3391772d5c2c92162031971fe4 Author: grayson <[email protected]> AuthorDate: Wed Aug 30 15:26:38 2023 +0800 fix(producer): fail all messages that are pending requests when closing (#1059) This aligns the manner with Java client. (cherry picked from commit 41093516af21cf2238e11532d877e1aab73f95bf) --- pulsar/producer_partition.go | 60 ++++++++++++++++++++++++++++++++++++++++++++ pulsar/producer_test.go | 30 ++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8af75bad..bc379afb 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1286,6 +1286,8 @@ func (p *partitionProducer) internalClose(req *closeProducer) { return } + defer close(p.dataChan) + defer close(p.cmdChan) p.log.Info("Closing producer") id := p.client.rpcClient.NewRequestID() @@ -1299,6 +1301,7 @@ func (p *partitionProducer) internalClose(req *closeProducer) { } else { p.log.Info("Closed producer") } + p.failPendingMessages() if p.batchBuilder != nil { if err = p.batchBuilder.Close(); err != nil { @@ -1311,6 +1314,63 @@ func (p *partitionProducer) internalClose(req *closeProducer) { p.batchFlushTicker.Stop() } +func (p *partitionProducer) failPendingMessages() { + curViewItems := p.pendingQueue.ReadableSlice() + viewSize := len(curViewItems) + if viewSize <= 0 { + return + } + p.log.Infof("Failing %d messages on closing producer", viewSize) + lastViewItem := curViewItems[viewSize-1].(*pendingItem) + + // iterate at most viewSize items + for i := 0; i < viewSize; i++ { + item := p.pendingQueue.CompareAndPoll( + func(m interface{}) bool { + return m != nil + }) + + if item == nil { + return + } + + pi := item.(*pendingItem) + pi.Lock() + + for _, i := range pi.sendRequests { + sr := i.(*sendRequest) + if sr.msg != nil { + size := len(sr.msg.Payload) + p.releaseSemaphoreAndMem(sr.reservedMem) + p.metrics.MessagesPending.Dec() + p.metrics.BytesPending.Sub(float64(size)) + p.log.WithError(errProducerClosed). + WithField("size", size). + WithField("properties", sr.msg.Properties) + } + + if sr.callback != nil { + sr.callbackOnce.Do(func() { + runCallback(sr.callback, nil, sr.msg, errProducerClosed) + }) + } + if sr.transaction != nil { + sr.transaction.endSendOrAckOp(nil) + } + } + + // flag the sending has completed with error, flush make no effect + pi.Complete(errProducerClosed) + pi.Unlock() + + // finally reached the last view item, current iteration ends + if pi == lastViewItem { + p.log.Infof("%d messages complete failed", viewSize) + return + } + } +} + func (p *partitionProducer) LastSequenceID() int64 { return atomic.LoadInt64(&p.lastSequenceID) } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index be9885fa..1c7bfd8f 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2224,3 +2224,33 @@ func TestProducerSendWithContext(t *testing.T) { // producer.Send should fail and return err context.Canceled assert.True(t, errors.Is(err, context.Canceled)) } + +func TestFailPendingMessageWithClose(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + testProducer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + DisableBlockIfQueueFull: false, + BatchingMaxPublishDelay: 100000, + BatchingMaxMessages: 1000, + }) + + assert.NoError(t, err) + assert.NotNil(t, testProducer) + for i := 0; i < 3; i++ { + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: make([]byte, 1024), + }, func(id MessageID, message *ProducerMessage, e error) { + if e != nil { + assert.Equal(t, errProducerClosed, e) + } + }) + } + partitionProducerImp := testProducer.(*producer).producers[0].(*partitionProducer) + partitionProducerImp.pendingQueue.Put(&pendingItem{}) + testProducer.Close() + assert.Equal(t, 0, partitionProducerImp.pendingQueue.Size()) +}
