This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.11.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 1724dc98ee14f8b33b30aa4d421af52c1d2851de
Author: Michael Marshall <[email protected]>
AuthorDate: Thu Jul 20 05:20:05 2023 -0500

    [fix] Send Close Command on Producer/Consumer create timeout (#1061)
    
    ### Motivation
    
    This change is the same as https://github.com/apache/pulsar/pull/13161 and 
https://github.com/apache/pulsar/pull/16616, and is justified by these lines of 
our binary protocol spec:
    
    * 
https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L301-L304
    * 
https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L468-L471
    
    ### Modifications
    
    * When a producer or a consumer times out during creation, make an attempt 
to close the producer or consumer by sending the appropriate close command. 
Failures can safely be ignored because the only time that the close will 
actually matter is when the TCP connection is open for other protocol messages. 
The one nuance is that we send the close command to the same address pair that 
we send the create command.
    
    (cherry picked from commit d4e08c699632b19aef70011cf6c3671e739bc101)
---
 pulsar/consumer_partition.go | 9 +++++++++
 pulsar/producer_partition.go | 8 ++++++++
 2 files changed, 17 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index f5e9febe..b50c0a00 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1737,6 +1737,15 @@ func (pc *partitionConsumer) grabConn() error {
 
        if err != nil {
                pc.log.WithError(err).Error("Failed to create consumer")
+               if err == internal.ErrRequestTimeOut {
+                       requestID := pc.client.rpcClient.NewRequestID()
+                       cmdClose := &pb.CommandCloseConsumer{
+                               ConsumerId: proto.Uint64(pc.consumerID),
+                               RequestId:  proto.Uint64(requestID),
+                       }
+                       _, _ = pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
+                               pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+               }
                return err
        }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ca813762..9f89d7b7 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -256,6 +256,14 @@ func (p *partitionProducer) grabCnx() error {
        res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, 
id, pb.BaseCommand_PRODUCER, cmdProducer)
        if err != nil {
                p.log.WithError(err).Error("Failed to create producer at send 
PRODUCER request")
+               if err == internal.ErrRequestTimeOut {
+                       id := p.client.rpcClient.NewRequestID()
+                       _, _ = p.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
+                               &pb.CommandCloseProducer{
+                                       ProducerId: &p.producerID,
+                                       RequestId:  &id,
+                               })
+               }
                return err
        }
 

Reply via email to