cckellogg commented on a change in pull request #566:
URL: https://github.com/apache/pulsar-client-go/pull/566#discussion_r668847719



##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError 
*pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
+
+SendError:
+       switch *sendError.Error {
+       case pb.ServerError_NotAllowedError:
+               c.pendingLock.Lock()
+               request, ok := c.pendingReqs[requestID]
+               if !ok {
+                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
+                               requestID, cmdError.GetError())
+                       c.pendingLock.Unlock()
+                       return
+               }
+
+               delete(c.pendingReqs, requestID)
+               c.pendingLock.Unlock()
+
+               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
+               request.callback(nil, errors.New(errMsg))
+               break SendError
+       case pb.ServerError_TopicTerminatedError:
+               c.listenersLock.RLock()
+               producer, ok := c.listeners[producerID]
+               c.listenersLock.RUnlock()
+
+               if ok {
+                       producer.ConnectionClosed()

Review comment:
       Why is this called and the connection is not closed? What should the 
behavior be when this error happens? What do the other clients do? Calling 
producer.ConnectionClosed() when we don't close connection make the code 
confusing in my opinion. 

##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError 
*pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
+
+SendError:
+       switch *sendError.Error {
+       case pb.ServerError_NotAllowedError:
+               c.pendingLock.Lock()
+               request, ok := c.pendingReqs[requestID]
+               if !ok {
+                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
+                               requestID, cmdError.GetError())
+                       c.pendingLock.Unlock()
+                       return
+               }
+
+               delete(c.pendingReqs, requestID)
+               c.pendingLock.Unlock()
+
+               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
+               request.callback(nil, errors.New(errMsg))
+               break SendError

Review comment:
       I'm not following why there needs to be a break to SendError?

##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError 
*pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
+
+SendError:
+       switch *sendError.Error {
+       case pb.ServerError_NotAllowedError:
+               c.pendingLock.Lock()
+               request, ok := c.pendingReqs[requestID]
+               if !ok {
+                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
+                               requestID, cmdError.GetError())
+                       c.pendingLock.Unlock()
+                       return
+               }
+
+               delete(c.pendingReqs, requestID)
+               c.pendingLock.Unlock()
+
+               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
+               request.callback(nil, errors.New(errMsg))
+               break SendError
+       case pb.ServerError_TopicTerminatedError:
+               c.listenersLock.RLock()
+               producer, ok := c.listeners[producerID]
+               c.listenersLock.RUnlock()
+
+               if ok {
+                       producer.ConnectionClosed()
+               } else {
+                       c.log.
+                               WithField("producerID", producerID).
+                               Warn("[HandleSendError] connection closed")
+               }
+               break SendError
+       default:

Review comment:
       Is the pending request callback not needed in the default case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to