This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 791d342 Fix logic of command for sendError (#622)
791d342 is described below
commit 791d342a98d0f0b4189913a5ea61547964d095c8
Author: xiaolong ran <[email protected]>
AuthorDate: Mon Oct 11 11:20:33 2021 +0800
Fix logic of command for sendError (#622)
### Motivation

As shown in the figure above, the `ServerError` returned by the broker is
`UnknownError` when the client receives it. In fact, we handled the wrong
command here. Here we should deal with `CommandSendError` instead of
`CommandError`. Correspondingly, we should deal with the `listener` map used to
cache the producer instead of the corresponding `pendingRequest` map.
---
pulsar/internal/connection.go | 45 +++++++++++++++++++++++--------------------
1 file changed, 24 insertions(+), 21 deletions(-)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 0313e4e..163dcac 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -531,7 +531,7 @@ func (c *connection) internalReceivedCommand(cmd
*pb.BaseCommand, headersAndPayl
c.handleResponseError(cmd.GetError())
case pb.BaseCommand_SEND_ERROR:
- c.handleSendError(cmd.GetError())
+ c.handleSendError(cmd.GetSendError())
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
@@ -752,31 +752,29 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
- c.log.Warnf("Received send error from server: [%v] : [%s]",
cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
- requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
- switch cmdError.GetError() {
+ switch sendError.GetError() {
case pb.ServerError_NotAllowedError:
- request, ok := c.deletePendingRequest(requestID)
+ _, ok := c.deletePendingProducers(producerID)
if !ok {
c.log.Warnf("Received unexpected error response for
request %d of type %s",
- requestID, cmdError.GetError())
+ producerID, sendError.GetError())
return
}
- errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
- request.callback(nil, errors.New(errMsg))
+ c.log.Warnf("server error: %s: %s", sendError.GetError(),
sendError.GetMessage())
case pb.ServerError_TopicTerminatedError:
- request, ok := c.deletePendingRequest(requestID)
+ _, ok := c.deletePendingProducers(producerID)
if !ok {
- c.log.Warnf("Received unexpected error response for
request %d of type %s",
- requestID, cmdError.GetError())
+ c.log.Warnf("Received unexpected error response for
producer %d of type %s",
+ producerID, sendError.GetError())
return
}
- errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
- request.callback(nil, errors.New(errMsg))
+ c.log.Warnf("server error: %s: %s", sendError.GetError(),
sendError.GetMessage())
default:
// By default, for transient error, let the reconnection logic
// to take place and re-establish the produce again
@@ -784,6 +782,17 @@ func (c *connection) handleSendError(cmdError
*pb.CommandError) {
}
}
+func (c *connection) deletePendingProducers(producerID uint64)
(ConnectionListener, bool) {
+ c.listenersLock.Lock()
+ producer, ok := c.listeners[producerID]
+ if ok {
+ delete(c.listeners, producerID)
+ }
+ c.listenersLock.Unlock()
+
+ return producer, ok
+}
+
func (c *connection) handleCloseConsumer(closeConsumer
*pb.CommandCloseConsumer) {
consumerID := closeConsumer.GetConsumerId()
c.log.Infof("Broker notification of Closed consumer: %d", consumerID)
@@ -800,13 +809,7 @@ func (c *connection) handleCloseProducer(closeProducer
*pb.CommandCloseProducer)
c.log.Infof("Broker notification of Closed producer: %d",
closeProducer.GetProducerId())
producerID := closeProducer.GetProducerId()
- c.listenersLock.Lock()
- producer, ok := c.listeners[producerID]
- if ok {
- delete(c.listeners, producerID)
- }
- c.listenersLock.Unlock()
-
+ producer, ok := c.deletePendingProducers(producerID)
// did we find a producer?
if ok {
producer.ConnectionClosed()