This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f193975 Add send error logic for connection (#566)
f193975 is described below
commit f1939752ded912cc651e58e7b3c1bd12a2798724
Author: xiaolong ran <[email protected]>
AuthorDate: Fri Jul 16 17:58:10 2021 +0800
Add send error logic for connection (#566)
Fixes #564
Motivation
Add command of SendError logic for connection
Signed-off-by: xiaolongran <[email protected]>
---
pulsar/internal/commands.go | 2 +
pulsar/internal/connection.go | 54 ++++++++++++++++------
.../internal/pulsartracing/producer_interceptor.go | 4 +-
.../pulsartracing/producer_interceptor_test.go | 3 +-
4 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 1adfba4..af6bac5 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -176,6 +176,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg
proto.Message) *pb.BaseCommand
cmd.Pong = msg.(*pb.CommandPong)
case pb.BaseCommand_SEND:
cmd.Send = msg.(*pb.CommandSend)
+ case pb.BaseCommand_SEND_ERROR:
+ cmd.SendError = msg.(*pb.CommandSendError)
case pb.BaseCommand_CLOSE_PRODUCER:
cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
case pb.BaseCommand_CLOSE_CONSUMER:
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a632816..9873ec8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -535,6 +535,9 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
case pb.BaseCommand_ERROR:
c.handleResponseError(cmd.GetError())
+ case pb.BaseCommand_SEND_ERROR:
+ c.handleSendError(cmd.GetError())
+
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
@@ -547,8 +550,6 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
case pb.BaseCommand_SEND_RECEIPT:
c.handleSendReceipt(cmd.GetSendReceipt())
- case pb.BaseCommand_SEND_ERROR:
-
case pb.BaseCommand_MESSAGE:
c.handleMessage(cmd.GetMessage(), headersAndPayload)
@@ -631,33 +632,25 @@ func (c *connection) internalSendRequest(req *request) {
}
func (c *connection) handleResponse(requestID uint64, response
*pb.BaseCommand) {
- c.pendingLock.Lock()
- request, ok := c.pendingReqs[requestID]
+ request, ok := c.deletePendingRequest(requestID)
if !ok {
c.log.Warnf("Received unexpected response for request %d of
type %s", requestID, response.Type)
- c.pendingLock.Unlock()
return
}
- delete(c.pendingReqs, requestID)
- c.pendingLock.Unlock()
request.callback(response, nil)
}
func (c *connection) handleResponseError(serverError *pb.CommandError) {
requestID := serverError.GetRequestId()
- c.pendingLock.Lock()
- request, ok := c.pendingReqs[requestID]
+
+ request, ok := c.deletePendingRequest(requestID)
if !ok {
c.log.Warnf("Received unexpected error response for request %d
of type %s",
requestID, serverError.GetError())
- c.pendingLock.Unlock()
return
}
- delete(c.pendingReqs, requestID)
- c.pendingLock.Unlock()
-
errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(),
serverError.GetMessage())
request.callback(nil, errors.New(errMsg))
}
@@ -694,6 +687,16 @@ func (c *connection) handleMessage(response
*pb.CommandMessage, payload Buffer)
}
}
+func (c *connection) deletePendingRequest(requestID uint64) (*request, bool) {
+ c.pendingLock.Lock()
+ defer c.pendingLock.Unlock()
+ request, ok := c.pendingReqs[requestID]
+ if ok {
+ delete(c.pendingReqs, requestID)
+ }
+ return request, ok
+}
+
func (c *connection) lastDataReceived() time.Time {
c.lastDataReceivedLock.Lock()
defer c.lastDataReceivedLock.Unlock()
@@ -744,6 +747,31 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(cmdError *pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
cmdError.GetError(), cmdError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+
+ switch *cmdError.Error {
+ case pb.ServerError_NotAllowedError:
+ request, ok := c.deletePendingRequest(requestID)
+ if !ok {
+ c.log.Warnf("Received unexpected error response for
request %d of type %s",
+ requestID, cmdError.GetError())
+ return
+ }
+
+ errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+ case pb.ServerError_TopicTerminatedError:
+ // TODO: no-op
+ default:
+ // By default, for transient error, let the reconnection logic
+ // to take place and re-establish the produce again
+ c.Close()
+ }
+}
+
func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer) {
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go
b/pulsar/internal/pulsartracing/producer_interceptor.go
index b400e57..6c7728c 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -33,7 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer
pulsar.Producer, message *puls
buildAndInjectSpan(message, producer).Finish()
}
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
+ message *pulsar.ProducerMessage,
+ msgID pulsar.MessageID) {
}
func buildAndInjectSpan(message *pulsar.ProducerMessage, producer
pulsar.Producer) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index b146e4e..8d8e696 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -55,7 +55,8 @@ func (p *mockProducer) Send(context.Context,
*pulsar.ProducerMessage) (pulsar.Me
return nil, nil
}
-func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage,
func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
+func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage,
+ func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
}
func (p *mockProducer) LastSequenceID() int64 {