This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f193975  Add send error logic for connection (#566)
f193975 is described below

commit f1939752ded912cc651e58e7b3c1bd12a2798724
Author: xiaolong ran <[email protected]>
AuthorDate: Fri Jul 16 17:58:10 2021 +0800

    Add send error logic for connection (#566)
    
    Fixes #564
    
    Motivation
    Add command of SendError logic for connection
    
    Signed-off-by: xiaolongran <[email protected]>
---
 pulsar/internal/commands.go                        |  2 +
 pulsar/internal/connection.go                      | 54 ++++++++++++++++------
 .../internal/pulsartracing/producer_interceptor.go |  4 +-
 .../pulsartracing/producer_interceptor_test.go     |  3 +-
 4 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 1adfba4..af6bac5 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -176,6 +176,8 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg 
proto.Message) *pb.BaseCommand
                cmd.Pong = msg.(*pb.CommandPong)
        case pb.BaseCommand_SEND:
                cmd.Send = msg.(*pb.CommandSend)
+       case pb.BaseCommand_SEND_ERROR:
+               cmd.SendError = msg.(*pb.CommandSendError)
        case pb.BaseCommand_CLOSE_PRODUCER:
                cmd.CloseProducer = msg.(*pb.CommandCloseProducer)
        case pb.BaseCommand_CLOSE_CONSUMER:
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a632816..9873ec8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -535,6 +535,9 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
        case pb.BaseCommand_ERROR:
                c.handleResponseError(cmd.GetError())
 
+       case pb.BaseCommand_SEND_ERROR:
+               c.handleSendError(cmd.GetError())
+
        case pb.BaseCommand_CLOSE_PRODUCER:
                c.handleCloseProducer(cmd.GetCloseProducer())
 
@@ -547,8 +550,6 @@ func (c *connection) internalReceivedCommand(cmd 
*pb.BaseCommand, headersAndPayl
        case pb.BaseCommand_SEND_RECEIPT:
                c.handleSendReceipt(cmd.GetSendReceipt())
 
-       case pb.BaseCommand_SEND_ERROR:
-
        case pb.BaseCommand_MESSAGE:
                c.handleMessage(cmd.GetMessage(), headersAndPayload)
 
@@ -631,33 +632,25 @@ func (c *connection) internalSendRequest(req *request) {
 }
 
 func (c *connection) handleResponse(requestID uint64, response 
*pb.BaseCommand) {
-       c.pendingLock.Lock()
-       request, ok := c.pendingReqs[requestID]
+       request, ok := c.deletePendingRequest(requestID)
        if !ok {
                c.log.Warnf("Received unexpected response for request %d of 
type %s", requestID, response.Type)
-               c.pendingLock.Unlock()
                return
        }
 
-       delete(c.pendingReqs, requestID)
-       c.pendingLock.Unlock()
        request.callback(response, nil)
 }
 
 func (c *connection) handleResponseError(serverError *pb.CommandError) {
        requestID := serverError.GetRequestId()
-       c.pendingLock.Lock()
-       request, ok := c.pendingReqs[requestID]
+
+       request, ok := c.deletePendingRequest(requestID)
        if !ok {
                c.log.Warnf("Received unexpected error response for request %d 
of type %s",
                        requestID, serverError.GetError())
-               c.pendingLock.Unlock()
                return
        }
 
-       delete(c.pendingReqs, requestID)
-       c.pendingLock.Unlock()
-
        errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(), 
serverError.GetMessage())
        request.callback(nil, errors.New(errMsg))
 }
@@ -694,6 +687,16 @@ func (c *connection) handleMessage(response 
*pb.CommandMessage, payload Buffer)
        }
 }
 
+func (c *connection) deletePendingRequest(requestID uint64) (*request, bool) {
+       c.pendingLock.Lock()
+       defer c.pendingLock.Unlock()
+       request, ok := c.pendingReqs[requestID]
+       if ok {
+               delete(c.pendingReqs, requestID)
+       }
+       return request, ok
+}
+
 func (c *connection) lastDataReceived() time.Time {
        c.lastDataReceivedLock.Lock()
        defer c.lastDataReceivedLock.Unlock()
@@ -744,6 +747,31 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(cmdError *pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
cmdError.GetError(), cmdError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+
+       switch *cmdError.Error {
+       case pb.ServerError_NotAllowedError:
+               request, ok := c.deletePendingRequest(requestID)
+               if !ok {
+                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
+                               requestID, cmdError.GetError())
+                       return
+               }
+
+               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
+               request.callback(nil, errors.New(errMsg))
+       case pb.ServerError_TopicTerminatedError:
+               // TODO: no-op
+       default:
+               // By default, for transient error, let the reconnection logic
+               // to take place and re-establish the produce again
+               c.Close()
+       }
+}
+
 func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer) {
        consumerID := closeConsumer.GetConsumerId()
        c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go 
b/pulsar/internal/pulsartracing/producer_interceptor.go
index b400e57..6c7728c 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor.go
@@ -33,7 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer 
pulsar.Producer, message *puls
        buildAndInjectSpan(message, producer).Finish()
 }
 
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, 
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
+       message *pulsar.ProducerMessage,
+       msgID pulsar.MessageID) {
 }
 
 func buildAndInjectSpan(message *pulsar.ProducerMessage, producer 
pulsar.Producer) opentracing.Span {
diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go 
b/pulsar/internal/pulsartracing/producer_interceptor_test.go
index b146e4e..8d8e696 100644
--- a/pulsar/internal/pulsartracing/producer_interceptor_test.go
+++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go
@@ -55,7 +55,8 @@ func (p *mockProducer) Send(context.Context, 
*pulsar.ProducerMessage) (pulsar.Me
        return nil, nil
 }
 
-func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage, 
func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
+func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage,
+       func(pulsar.MessageID, *pulsar.ProducerMessage, error)) {
 }
 
 func (p *mockProducer) LastSequenceID() int64 {

Reply via email to