This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 1724dc98ee14f8b33b30aa4d421af52c1d2851de Author: Michael Marshall <[email protected]> AuthorDate: Thu Jul 20 05:20:05 2023 -0500 [fix] Send Close Command on Producer/Consumer create timeout (#1061) ### Motivation This change is the same as https://github.com/apache/pulsar/pull/13161 and https://github.com/apache/pulsar/pull/16616, and is justified by these lines of our binary protocol spec: * https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L301-L304 * https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L468-L471 ### Modifications * When a producer or a consumer times out during creation, make an attempt to close the producer or consumer by sending the appropriate close command. Failures can safely be ignored because the only time that the close will actually matter is when the TCP connection is open for other protocol messages. The one nuance is that we send the close command to the same address pair that we send the create command. (cherry picked from commit d4e08c699632b19aef70011cf6c3671e739bc101) --- pulsar/consumer_partition.go | 9 +++++++++ pulsar/producer_partition.go | 8 ++++++++ 2 files changed, 17 insertions(+) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index f5e9febe..b50c0a00 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -1737,6 +1737,15 @@ func (pc *partitionConsumer) grabConn() error { if err != nil { pc.log.WithError(err).Error("Failed to create consumer") + if err == internal.ErrRequestTimeOut { + requestID := pc.client.rpcClient.NewRequestID() + cmdClose := &pb.CommandCloseConsumer{ + ConsumerId: proto.Uint64(pc.consumerID), + RequestId: proto.Uint64(requestID), + } + _, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID, + pb.BaseCommand_CLOSE_CONSUMER, cmdClose) + } return err } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ca813762..9f89d7b7 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -256,6 +256,14 @@ func (p *partitionProducer) grabCnx() error { res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer) if err != nil { p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") + if err == internal.ErrRequestTimeOut { + id := p.client.rpcClient.NewRequestID() + _, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER, + &pb.CommandCloseProducer{ + ProducerId: &p.producerID, + RequestId: &id, + }) + } return err }
