cckellogg commented on a change in pull request #566:
URL: https://github.com/apache/pulsar-client-go/pull/566#discussion_r668847719
##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError
*pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
+
+SendError:
+ switch *sendError.Error {
+ case pb.ServerError_NotAllowedError:
+ c.pendingLock.Lock()
+ request, ok := c.pendingReqs[requestID]
+ if !ok {
+ c.log.Warnf("Received unexpected error response for
request %d of type %s",
+ requestID, cmdError.GetError())
+ c.pendingLock.Unlock()
+ return
+ }
+
+ delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
+
+ errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+ break SendError
+ case pb.ServerError_TopicTerminatedError:
+ c.listenersLock.RLock()
+ producer, ok := c.listeners[producerID]
+ c.listenersLock.RUnlock()
+
+ if ok {
+ producer.ConnectionClosed()
Review comment:
Why is this called and the connection is not closed? What should the
behavior be when this error happens? What do the other clients do? Calling
producer.ConnectionClosed() when we don't close connection make the code
confusing in my opinion.
##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError
*pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
+
+SendError:
+ switch *sendError.Error {
+ case pb.ServerError_NotAllowedError:
+ c.pendingLock.Lock()
+ request, ok := c.pendingReqs[requestID]
+ if !ok {
+ c.log.Warnf("Received unexpected error response for
request %d of type %s",
+ requestID, cmdError.GetError())
+ c.pendingLock.Unlock()
+ return
+ }
+
+ delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
+
+ errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+ break SendError
Review comment:
I'm not following why there needs to be a break to SendError?
##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError
*pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
+
+SendError:
+ switch *sendError.Error {
+ case pb.ServerError_NotAllowedError:
+ c.pendingLock.Lock()
+ request, ok := c.pendingReqs[requestID]
+ if !ok {
+ c.log.Warnf("Received unexpected error response for
request %d of type %s",
+ requestID, cmdError.GetError())
+ c.pendingLock.Unlock()
+ return
+ }
+
+ delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
+
+ errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+ break SendError
+ case pb.ServerError_TopicTerminatedError:
+ c.listenersLock.RLock()
+ producer, ok := c.listeners[producerID]
+ c.listenersLock.RUnlock()
+
+ if ok {
+ producer.ConnectionClosed()
+ } else {
+ c.log.
+ WithField("producerID", producerID).
+ Warn("[HandleSendError] connection closed")
+ }
+ break SendError
+ default:
Review comment:
Is the pending request callback not needed in the default case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]