cckellogg commented on a change in pull request #566:
URL: https://github.com/apache/pulsar-client-go/pull/566#discussion_r669576203
##########
File path: pulsar/internal/pulsartracing/producer_interceptor.go
##########
@@ -33,7 +33,9 @@ func (t *ProducerInterceptor) BeforeSend(producer
pulsar.Producer, message *puls
buildAndInjectSpan(message, producer).Finish()
}
-func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
message *pulsar.ProducerMessage, msgID pulsar.MessageID) {
+func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer,
Review comment:
Can we revert these formatting changes since they have nothing to do
with this change?
##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,46 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError
*pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
+
+ switch *sendError.Error {
+ case pb.ServerError_NotAllowedError:
+ c.pendingLock.Lock()
Review comment:
This block of code is used in a couple of places now maybe we should add
a function? Something like this?
```
func deletePendingRequest(requestID) (request, bool) {
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
request, ok := c.pendingReqs[requestID]
if ok {
delete(c.pendingReqs, requestID)
}
return request, ok
}
```
##########
File path: pulsar/internal/connection.go
##########
@@ -744,6 +745,50 @@ func (c *connection) handleAuthChallenge(authChallenge
*pb.CommandAuthChallenge)
c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE,
cmdAuthResponse))
}
+func (c *connection) handleSendError(sendError *pb.CommandSendError, cmdError
*pb.CommandError) {
+ c.log.Warnf("Received send error from server: [%v] : [%s]",
sendError.GetError(), sendError.GetMessage())
+
+ requestID := cmdError.GetRequestId()
+ producerID := sendError.GetProducerId()
+
+SendError:
+ switch *sendError.Error {
+ case pb.ServerError_NotAllowedError:
+ c.pendingLock.Lock()
+ request, ok := c.pendingReqs[requestID]
+ if !ok {
+ c.log.Warnf("Received unexpected error response for
request %d of type %s",
+ requestID, cmdError.GetError())
+ c.pendingLock.Unlock()
+ return
+ }
+
+ delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
+
+ errMsg := fmt.Sprintf("server error: %s: %s",
cmdError.GetError(), cmdError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
+ break SendError
+ case pb.ServerError_TopicTerminatedError:
+ c.listenersLock.RLock()
+ producer, ok := c.listeners[producerID]
+ c.listenersLock.RUnlock()
+
+ if ok {
+ producer.ConnectionClosed()
Review comment:
The java client closes the connection in the default case.
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L649
Should we do the same. What should the expected client behavior be here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]