This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new bd115818 [Fix][Producer] handle 
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced
 when reconnecting (#1134)
bd115818 is described below

commit bd11581867c88f93e4b6f247d82914f9eb4ee476
Author: gunli <[email protected]>
AuthorDate: Fri Dec 8 17:54:35 2023 +0800

    [Fix][Producer] handle 
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced
 when reconnecting (#1134)
    
    Master Issue: #1128
    
    ### Motivation
    
    In Java client, when we get 
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced,
 we should failPendingMessages, and close producer. But in Go client, we forget 
to handle ProducerBlockedQuotaExceededException/ProducerFenced, and in #1128, 
we just call sr.done(), actually we should call failPendingMessages().
    
    https://github.com/apache/pulsar-client-go/pull/1128/files
    
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1663
    
    ### Modifications
    1. rename `errMsgTopicNotFount` to `errMsgTopicNotFound`
    2. handle 
TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, call 
`failPendingMessages()`;
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/consumer_partition.go |  2 +-
 pulsar/error.go              |  6 ++++
 pulsar/producer_partition.go | 74 +++++++++++++++++++++++++++-----------------
 pulsar/producer_test.go      | 13 +++++---
 4 files changed, 60 insertions(+), 35 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2ba84274..fd6441c1 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
                }
                pc.log.WithError(err).Error("Failed to create consumer at 
reconnect")
                errMsg := err.Error()
-               if strings.Contains(errMsg, errTopicNotFount) {
+               if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
                        pc.log.Warn("Topic Not Found.")
                        break
diff --git a/pulsar/error.go b/pulsar/error.go
index 73a0b606..25498cfb 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -114,6 +114,12 @@ const (
        TransactionNoFoundError
        // ClientMemoryBufferIsFull client limit buffer is full
        ClientMemoryBufferIsFull
+       // ProducerFenced When a producer asks and fail to get exclusive 
producer access,
+       // or loses the exclusive status after a reconnection, the broker will
+       // use this error to indicate that this producer is now permanently
+       // fenced. Applications are now supposed to close it and create a
+       // new producer
+       ProducerFenced
 )
 
 // Error implement error interface, composed of two parts: msg and result.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 65eef5b6..46167d0c 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -64,7 +64,12 @@ var (
        sendRequestPool *sync.Pool
 )
 
-var errTopicNotFount = "TopicNotFound"
+const (
+       errMsgTopicNotFound                         = "TopicNotFound"
+       errMsgTopicTerminated                       = "TopicTerminatedError"
+       errMsgProducerBlockedQuotaExceededException = 
"ProducerBlockedQuotaExceededException"
+       errMsgProducerFenced                        = "ProducerFenced"
+)
 
 func init() {
        sendRequestPool = &sync.Pool{
@@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() {
                }
                p.log.WithError(err).Error("Failed to create producer at 
reconnect")
                errMsg := err.Error()
-               if strings.Contains(errMsg, errTopicNotFount) {
+               if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
-                       p.log.Warn("Topic Not Found.")
+                       p.log.Warn("Topic not found, stop reconnecting, close 
the producer")
+                       p.doClose(newError(TopicNotFound, err.Error()))
                        break
                }
 
-               if strings.Contains(errMsg, "TopicTerminatedError") {
-                       p.log.Info("Topic was terminated, failing pending 
messages, will not reconnect")
-                       pendingItems := p.pendingQueue.ReadableSlice()
-                       for _, item := range pendingItems {
-                               pi := item.(*pendingItem)
-                               if pi != nil {
-                                       pi.Lock()
-                                       requests := pi.sendRequests
-                                       for _, req := range requests {
-                                               sr := req.(*sendRequest)
-                                               if sr != nil {
-                                                       sr.done(nil, 
newError(TopicTerminated, err.Error()))
-                                               }
-                                       }
-                                       pi.Unlock()
-                               }
-                       }
-                       p.setProducerState(producerClosing)
+               if strings.Contains(errMsg, errMsgTopicTerminated) {
+                       p.log.Warn("Topic was terminated, failing pending 
messages, stop reconnecting, close the producer")
+                       p.doClose(newError(TopicTerminated, err.Error()))
+                       break
+               }
+
+               if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
+                       p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")
+                       
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, 
err.Error()))
+                       break
+               }
+
+               if strings.Contains(errMsg, errMsgProducerFenced) {
+                       p.log.Warn("Producer was fenced, failing pending 
messages, stop reconnecting")
+                       p.doClose(newError(ProducerFenced, err.Error()))
                        break
                }
 
@@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() {
 func (p *partitionProducer) runEventsLoop() {
        for {
                select {
-               case data := <-p.dataChan:
+               case data, ok := <-p.dataChan:
+                       // when doClose() is call, p.dataChan will be closed, 
data will be nil
+                       if !ok {
+                               return
+                       }
                        p.internalSend(data)
-               case i := <-p.cmdChan:
-                       switch v := i.(type) {
+               case cmd, ok := <-p.cmdChan:
+                       // when doClose() is call, p.dataChan will be closed, 
cmd will be nil
+                       if !ok {
+                               return
+                       }
+                       switch v := cmd.(type) {
                        case *flushRequest:
                                p.internalFlush(v)
                        case *closeProducer:
@@ -1321,13 +1332,18 @@ func (p *partitionProducer) 
ReceivedSendReceipt(response *pb.CommandSendReceipt)
 
 func (p *partitionProducer) internalClose(req *closeProducer) {
        defer close(req.doneCh)
+
+       p.doClose(errProducerClosed)
+}
+
+func (p *partitionProducer) doClose(reason error) {
        if !p.casProducerState(producerReady, producerClosing) {
                return
        }
 
+       p.log.Info("Closing producer")
        defer close(p.dataChan)
        defer close(p.cmdChan)
-       p.log.Info("Closing producer")
 
        id := p.client.rpcClient.NewRequestID()
        _, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id, 
pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
@@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        } else {
                p.log.Info("Closed producer")
        }
-       p.failPendingMessages()
+       p.failPendingMessages(reason)
 
        if p.batchBuilder != nil {
                if err = p.batchBuilder.Close(); err != nil {
@@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req 
*closeProducer) {
        p.batchFlushTicker.Stop()
 }
 
-func (p *partitionProducer) failPendingMessages() {
+func (p *partitionProducer) failPendingMessages(err error) {
        curViewItems := p.pendingQueue.ReadableSlice()
        viewSize := len(curViewItems)
        if viewSize <= 0 {
@@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() {
 
                for _, i := range pi.sendRequests {
                        sr := i.(*sendRequest)
-                       sr.done(nil, errProducerClosed)
+                       sr.done(nil, err)
                }
 
                // flag the sending has completed with error, flush make no 
effect
-               pi.done(errProducerClosed)
+               pi.done(err)
                pi.Unlock()
 
                // finally reached the last view item, current iteration ends
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f30ae65f..0f890692 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -29,14 +29,16 @@ import (
        "testing"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal"
-       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/stretchr/testify/assert"
        "google.golang.org/protobuf/proto"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+
+       log "github.com/sirupsen/logrus"
+
        "github.com/apache/pulsar-client-go/pulsar/crypto"
        plog "github.com/apache/pulsar-client-go/pulsar/log"
-       log "github.com/sirupsen/logrus"
 )
 
 func TestInvalidURL(t *testing.T) {
@@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) {
        topicName := newTopicName()
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topicName,
-               SubscriptionName: "send_timeout_sub",
+               SubscriptionName: "topic_terminated_sub",
        })
        assert.Nil(t, err)
        defer consumer.Close() // subscribe but do nothing
@@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
                        })
                        if err != nil {
                                e := err.(*Error)
-                               if e.result == TopicTerminated {
+                               if e.result == TopicTerminated || err == 
errProducerClosed {
                                        terminatedChan <- true
                                } else {
                                        terminatedChan <- false
@@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) {
                        return
                case <-afterCh:
                        assert.Fail(t, "Time is up. Topic should have been 
terminated by now")
+                       return
                }
        }
 }

Reply via email to