cckellogg commented on a change in pull request #566:
URL: https://github.com/apache/pulsar-client-go/pull/566#discussion_r669576203



##########
File path: pulsar/internal/pulsartracing/producer_interceptor.go
##########
@@ -33,7 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer 
pulsar.Producer, message *puls
        buildAndInjectSpan(message, producer).Finish()
 }
 
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, 
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,

Review comment:
       Can we revert these formatting changes since they have nothing to do 
with this change?

##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,46 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError 
*pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
+
+       switch *sendError.Error {
+       case pb.ServerError_NotAllowedError:
+               c.pendingLock.Lock()

Review comment:
       This block of code is used in a couple of places now maybe we should add 
a function?  Something like this?
   
   ```
   func deletePendingRequest(requestID) (request, bool) {
      c.pendingLock.Lock()
      defer c.pendingLock.Unlock()
      request, ok := c.pendingReqs[requestID]
       if ok {
        delete(c.pendingReqs, requestID)
      }
      return request, ok
                
   }
   ```

##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge 
*pb.CommandAuthChallenge)
        c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, 
cmdAuthResponse))
 }
 
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError 
*pb.CommandError) {
+       c.log.Warnf("Received send error from server: [%v] : [%s]", 
sendError.GetError(), sendError.GetMessage())
+
+       requestID := cmdError.GetRequestId()
+       producerID := sendError.GetProducerId()
+
+SendError:
+       switch *sendError.Error {
+       case pb.ServerError_NotAllowedError:
+               c.pendingLock.Lock()
+               request, ok := c.pendingReqs[requestID]
+               if !ok {
+                       c.log.Warnf("Received unexpected error response for 
request %d of type %s",
+                               requestID, cmdError.GetError())
+                       c.pendingLock.Unlock()
+                       return
+               }
+
+               delete(c.pendingReqs, requestID)
+               c.pendingLock.Unlock()
+
+               errMsg := fmt.Sprintf("server error: %s: %s", 
cmdError.GetError(), cmdError.GetMessage())
+               request.callback(nil, errors.New(errMsg))
+               break SendError
+       case pb.ServerError_TopicTerminatedError:
+               c.listenersLock.RLock()
+               producer, ok := c.listeners[producerID]
+               c.listenersLock.RUnlock()
+
+               if ok {
+                       producer.ConnectionClosed()

Review comment:
       The java client closes the connection in the default case.
   
   
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L649
   
   Should we do the same. What should the expected client behavior be here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to