This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new bd115818 [Fix][Producer] handle
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced
when reconnecting (#1134)
bd115818 is described below
commit bd11581867c88f93e4b6f247d82914f9eb4ee476
Author: gunli <[email protected]>
AuthorDate: Fri Dec 8 17:54:35 2023 +0800
[Fix][Producer] handle
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced
when reconnecting (#1134)
Master Issue: #1128
### Motivation
In Java client, when we get
TopicNotFound/TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced,
we should failPendingMessages, and close producer. But in Go client, we forget
to handle ProducerBlockedQuotaExceededException/ProducerFenced, and in #1128,
we just call sr.done(), actually we should call failPendingMessages().
https://github.com/apache/pulsar-client-go/pull/1128/files
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L1663
### Modifications
1. rename `errMsgTopicNotFount` to `errMsgTopicNotFound`
2. handle
TopicTerminated/ProducerBlockedQuotaExceededException/ProducerFenced, call
`failPendingMessages()`;
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/consumer_partition.go | 2 +-
pulsar/error.go | 6 ++++
pulsar/producer_partition.go | 74 +++++++++++++++++++++++++++-----------------
pulsar/producer_test.go | 13 +++++---
4 files changed, 60 insertions(+), 35 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 2ba84274..fd6441c1 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1659,7 +1659,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
}
pc.log.WithError(err).Error("Failed to create consumer at
reconnect")
errMsg := err.Error()
- if strings.Contains(errMsg, errTopicNotFount) {
+ if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
pc.log.Warn("Topic Not Found.")
break
diff --git a/pulsar/error.go b/pulsar/error.go
index 73a0b606..25498cfb 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -114,6 +114,12 @@ const (
TransactionNoFoundError
// ClientMemoryBufferIsFull client limit buffer is full
ClientMemoryBufferIsFull
+ // ProducerFenced When a producer asks and fail to get exclusive
producer access,
+ // or loses the exclusive status after a reconnection, the broker will
+ // use this error to indicate that this producer is now permanently
+ // fenced. Applications are now supposed to close it and create a
+ // new producer
+ ProducerFenced
)
// Error implement error interface, composed of two parts: msg and result.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 65eef5b6..46167d0c 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -64,7 +64,12 @@ var (
sendRequestPool *sync.Pool
)
-var errTopicNotFount = "TopicNotFound"
+const (
+ errMsgTopicNotFound = "TopicNotFound"
+ errMsgTopicTerminated = "TopicTerminatedError"
+ errMsgProducerBlockedQuotaExceededException =
"ProducerBlockedQuotaExceededException"
+ errMsgProducerFenced = "ProducerFenced"
+)
func init() {
sendRequestPool = &sync.Pool{
@@ -441,30 +446,28 @@ func (p *partitionProducer) reconnectToBroker() {
}
p.log.WithError(err).Error("Failed to create producer at
reconnect")
errMsg := err.Error()
- if strings.Contains(errMsg, errTopicNotFount) {
+ if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
- p.log.Warn("Topic Not Found.")
+ p.log.Warn("Topic not found, stop reconnecting, close
the producer")
+ p.doClose(newError(TopicNotFound, err.Error()))
break
}
- if strings.Contains(errMsg, "TopicTerminatedError") {
- p.log.Info("Topic was terminated, failing pending
messages, will not reconnect")
- pendingItems := p.pendingQueue.ReadableSlice()
- for _, item := range pendingItems {
- pi := item.(*pendingItem)
- if pi != nil {
- pi.Lock()
- requests := pi.sendRequests
- for _, req := range requests {
- sr := req.(*sendRequest)
- if sr != nil {
- sr.done(nil,
newError(TopicTerminated, err.Error()))
- }
- }
- pi.Unlock()
- }
- }
- p.setProducerState(producerClosing)
+ if strings.Contains(errMsg, errMsgTopicTerminated) {
+ p.log.Warn("Topic was terminated, failing pending
messages, stop reconnecting, close the producer")
+ p.doClose(newError(TopicTerminated, err.Error()))
+ break
+ }
+
+ if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
+ p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
+
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException,
err.Error()))
+ break
+ }
+
+ if strings.Contains(errMsg, errMsgProducerFenced) {
+ p.log.Warn("Producer was fenced, failing pending
messages, stop reconnecting")
+ p.doClose(newError(ProducerFenced, err.Error()))
break
}
@@ -481,10 +484,18 @@ func (p *partitionProducer) reconnectToBroker() {
func (p *partitionProducer) runEventsLoop() {
for {
select {
- case data := <-p.dataChan:
+ case data, ok := <-p.dataChan:
+ // when doClose() is call, p.dataChan will be closed,
data will be nil
+ if !ok {
+ return
+ }
p.internalSend(data)
- case i := <-p.cmdChan:
- switch v := i.(type) {
+ case cmd, ok := <-p.cmdChan:
+ // when doClose() is call, p.dataChan will be closed,
cmd will be nil
+ if !ok {
+ return
+ }
+ switch v := cmd.(type) {
case *flushRequest:
p.internalFlush(v)
case *closeProducer:
@@ -1321,13 +1332,18 @@ func (p *partitionProducer)
ReceivedSendReceipt(response *pb.CommandSendReceipt)
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)
+
+ p.doClose(errProducerClosed)
+}
+
+func (p *partitionProducer) doClose(reason error) {
if !p.casProducerState(producerReady, producerClosing) {
return
}
+ p.log.Info("Closing producer")
defer close(p.dataChan)
defer close(p.cmdChan)
- p.log.Info("Closing producer")
id := p.client.rpcClient.NewRequestID()
_, err := p.client.rpcClient.RequestOnCnx(p._getConn(), id,
pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
@@ -1340,7 +1356,7 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
} else {
p.log.Info("Closed producer")
}
- p.failPendingMessages()
+ p.failPendingMessages(reason)
if p.batchBuilder != nil {
if err = p.batchBuilder.Close(); err != nil {
@@ -1353,7 +1369,7 @@ func (p *partitionProducer) internalClose(req
*closeProducer) {
p.batchFlushTicker.Stop()
}
-func (p *partitionProducer) failPendingMessages() {
+func (p *partitionProducer) failPendingMessages(err error) {
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
@@ -1378,11 +1394,11 @@ func (p *partitionProducer) failPendingMessages() {
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
- sr.done(nil, errProducerClosed)
+ sr.done(nil, err)
}
// flag the sending has completed with error, flush make no
effect
- pi.done(errProducerClosed)
+ pi.done(err)
pi.Unlock()
// finally reached the last view item, current iteration ends
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f30ae65f..0f890692 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -29,14 +29,16 @@ import (
"testing"
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal"
- pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+
+ log "github.com/sirupsen/logrus"
+
"github.com/apache/pulsar-client-go/pulsar/crypto"
plog "github.com/apache/pulsar-client-go/pulsar/log"
- log "github.com/sirupsen/logrus"
)
func TestInvalidURL(t *testing.T) {
@@ -1168,7 +1170,7 @@ func TestTopicTermination(t *testing.T) {
topicName := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
- SubscriptionName: "send_timeout_sub",
+ SubscriptionName: "topic_terminated_sub",
})
assert.Nil(t, err)
defer consumer.Close() // subscribe but do nothing
@@ -1189,7 +1191,7 @@ func TestTopicTermination(t *testing.T) {
})
if err != nil {
e := err.(*Error)
- if e.result == TopicTerminated {
+ if e.result == TopicTerminated || err ==
errProducerClosed {
terminatedChan <- true
} else {
terminatedChan <- false
@@ -1210,6 +1212,7 @@ func TestTopicTermination(t *testing.T) {
return
case <-afterCh:
assert.Fail(t, "Time is up. Topic should have been
terminated by now")
+ return
}
}
}