This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e08c6  [fix] Send Close Command on Producer/Consumer create timeout 
(#1061)
d4e08c6 is described below

commit d4e08c699632b19aef70011cf6c3671e739bc101
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Jul 20 05:20:05 2023 -0500

    [fix] Send Close Command on Producer/Consumer create timeout (#1061)
    
    ### Motivation
    
    This change is the same as https://github.com/apache/pulsar/pull/13161 and 
https://github.com/apache/pulsar/pull/16616, and is justified by these lines of 
our binary protocol spec:
    
    * 
https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L301-L304
    * 
https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L468-L471
    
    ### Modifications
    
    * When a producer or a consumer times out during creation, make an attempt 
to close the producer or consumer by sending the appropriate close command. 
Failures can safely be ignored because the only time that the close will 
actually matter is when the TCP connection is open for other protocol messages. 
The one nuance is that we send the close command to the same address pair that 
we send the create command.
---
 pulsar/consumer_partition.go | 9 +++++++++
 pulsar/producer_partition.go | 8 ++++++++
 2 files changed, 17 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cd7aa50..28cb9c5 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1734,6 +1734,15 @@ func (pc *partitionConsumer) grabConn() error {
 
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
+               if err == internal.ErrRequestTimeOut {
+                       requestID := pc.client.rpcClient.NewRequestID()
+                       cmdClose := &pb.CommandCloseConsumer{
+                               ConsumerId: proto.Uint64(pc.consumerID),
+                               RequestId:  proto.Uint64(requestID),
+                       }
+                       _, _ = pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
+                               pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+               }
                return err
        }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5daf54c..7d514d5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -251,6 +251,14 @@ func (p *partitionProducer) grabCnx() error {
        res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
                p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
+               if err == internal.ErrRequestTimeOut {
+                       id := p.client.rpcClient.NewRequestID()
+                       _, _ = p.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
+                               &pb.CommandCloseProducer{
+                                       ProducerId: &p.producerID,
+                                       RequestId:  &id,
+                               })
+               }
                return err
        }
 

Reply via email to