This is an automated email from the ASF dual-hosted git repository.

crossoverJie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6db892c3 [Issue 1473][Consumer] Fix race in grabConn dropping messages 
before handler registration (#1476)
6db892c3 is described below

commit 6db892c38502ad3de9083f34b74b40a103e69fb5
Author: aleks-lazic <[email protected]>
AuthorDate: Wed May 6 02:57:53 2026 +0200

    [Issue 1473][Consumer] Fix race in grabConn dropping messages before 
handler registration (#1476)
---
 pulsar/consumer_partition.go      |  59 ++++--
 pulsar/consumer_partition_test.go | 393 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 440 insertions(+), 12 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a0190f70..e63d4b39 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -2201,10 +2201,49 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                cmdSubscribe.ForceTopicCreation = proto.Bool(false)
        }
 
-       res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
-               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+       // Obtain the connection before sending the subscribe RPC so we can 
register
+       // the consumer handler before the broker starts delivering frames.
+       // This closes a race where MESSAGE and ACTIVE_CONSUMER_CHANGE commands
+       // arriving immediately after the subscribe response were silently 
dropped
+       // because AddConsumeHandler had not been called yet.
+       cnx, err := pc.client.cnxPool.GetConnection(lr.LogicalAddr, 
lr.PhysicalAddr, pc.cnxKeySuffix)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to get connection")
+               return err
+       }
+
+       // Set the connection BEFORE registering the handler so that handler
+       // callbacks (e.g. MessageReceived → discardCorruptedMessage) can safely
+       // call pc._getConn() without hitting a nil pointer.
+       var prevConn internal.Connection
+       if v := pc.conn.Load(); v != nil {
+               prevConn = *v
+       }
+       pc._setConn(cnx)
+
+       // restoreConn rolls back pc.conn to the previous connection (or nil on
+       // the very first call) so a failed subscribe attempt doesn't leave
+       // pc.conn pointing at a stale connection.
+       restoreConn := func() {
+               if prevConn != nil {
+                       pc._setConn(prevConn)
+               } else {
+                       pc.conn.Store(nil)
+               }
+       }
 
+       // Register handler BEFORE the subscribe RPC so no frames are missed
+       err = cnx.AddConsumeHandler(pc.consumerID, pc)
        if err != nil {
+               restoreConn()
+               pc.log.WithError(err).Error("Failed to add consumer handler")
+               return err
+       }
+
+       res, err := pc.client.rpcClient.RequestOnCnx(cnx, requestID, 
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+       if err != nil {
+               cnx.DeleteConsumeHandler(pc.consumerID)
+               restoreConn()
                pc.log.WithError(err).Error("Failed to create consumer")
                if err == internal.ErrRequestTimeOut {
                        requestID := pc.client.rpcClient.NewRequestID()
@@ -2212,7 +2251,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                                ConsumerId: proto.Uint64(pc.consumerID),
                                RequestId:  proto.Uint64(requestID),
                        }
-                       _, _ = 
pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, 
pc.cnxKeySuffix, requestID,
+                       _, _ = pc.client.rpcClient.RequestOnCnx(cnx, requestID,
                                pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
                }
                return err
@@ -2222,13 +2261,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL 
string) error {
                pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
        }
 
-       pc._setConn(res.Cnx)
        pc.log.Info("Connected consumer")
-       err = pc._getConn().AddConsumeHandler(pc.consumerID, pc)
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to add consumer handler")
-               return err
-       }
 
        msgType := res.Response.GetType()
 
@@ -2493,9 +2526,11 @@ func (pc *partitionConsumer) _setConn(conn 
internal.Connection) {
 // _getConn returns internal connection field of this partition consumer 
atomically.
 // Note: should only be called by this partition consumer before attempting to 
use the connection
 func (pc *partitionConsumer) _getConn() internal.Connection {
-       // Invariant: The conn must be non-nill for the lifetime of the 
partitionConsumer.
-       //            For this reason we leave this cast unchecked and panic() 
if the
-       //            invariant is broken
+       // Invariant: conn is non-nil after the first successful grabConn (i.e. 
after
+       //            a subscribe RPC succeeds). During grabConn itself, conn 
is set
+       //            before AddConsumeHandler so that handler callbacks can 
use it.
+       //            Before the first successful subscribe, conn may be nil.
+       //            We leave this cast unchecked and panic() if the invariant 
is broken.
        return *pc.conn.Load()
 }
 
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index cfaf7bf6..a50d02ef 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -18,15 +18,20 @@
 package pulsar
 
 import (
+       "fmt"
+       "net/url"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/stretchr/testify/assert"
+       "google.golang.org/protobuf/proto"
 )
 
 func TestSingleMessageIDNoAckTracker(t *testing.T) {
@@ -347,3 +352,391 @@ func TestMessageReceivedAllMessagesDuplicate(t 
*testing.T) {
        default:
        }
 }
+
+// TestGrabConn_HandlerRegisteredBeforeSubscribe verifies that the consumer
+// handler is registered on the connection BEFORE the subscribe RPC is sent.
+//
+// Without this ordering, the broker can send MESSAGE and 
ACTIVE_CONSUMER_CHANGE
+// frames immediately after the subscribe succeeds, but the client's read
+// goroutine cannot route them because the handler isn't in the map yet.
+// Those frames are silently dropped.
+func TestGrabConn_HandlerRegisteredBeforeSubscribe(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.NoError(t, err)
+
+       // Drain the connectedCh goroutine spawned on success to assert it fires
+       // and avoid relying solely on the channel's buffer capacity.
+       <-pc.connectedCh
+
+       assert.True(t, rpc.handlerRegisteredDuringRPC.Load(),
+               "AddConsumeHandler must be called before the subscribe RPC is 
sent")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeFailure verifies that when the
+// subscribe RPC fails, the pre-registered consumer handler is removed from
+// the connection so it does not leak.
+func TestGrabConn_HandlerRemovedOnSubscribeFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{
+               cnx:          cnx,
+               subscribeErr: fmt.Errorf("broker rejected subscribe"),
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.Error(t, err)
+
+       assert.True(t, cnx.handlerRemoved.Load(),
+               "DeleteConsumeHandler must be called when subscribe fails")
+}
+
+// TestGrabConn_HandlerRemovedOnSubscribeTimeout verifies cleanup on timeout
+// and that the close command is sent on the same connection (not a potentially
+// different one from the pool).
+func TestGrabConn_HandlerRemovedOnSubscribeTimeout(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{
+               cnx:          cnx,
+               subscribeErr: internal.ErrRequestTimeOut,
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.ErrorIs(t, err, internal.ErrRequestTimeOut)
+
+       assert.True(t, cnx.handlerRemoved.Load(),
+               "DeleteConsumeHandler must be called on timeout")
+       assert.True(t, rpc.closeSentOnCnx.Load(),
+               "CloseConsumer must be sent via RequestOnCnx on the same 
connection")
+}
+
+// TestGrabConn_ConnResetOnFirstCallFailure verifies that when grabConn fails
+// on the very first call (no prior connection), pc.conn is reset to nil rather
+// than left pointing at the stale connection on which subscribe failed.
+func TestGrabConn_ConnResetOnFirstCallFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{
+               cnx:          cnx,
+               subscribeErr: fmt.Errorf("broker rejected subscribe"),
+       }
+       pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.Error(t, err)
+
+       assert.Nil(t, pc.conn.Load(),
+               "pc.conn must be reset to nil on first-call failure, not left 
pointing at the stale connection")
+}
+
+// TestGrabConn_BrokerFrameDuringSubscribe simulates the exact race: the broker
+// sends a frame (e.g. ActiveConsumerChange) while the subscribe RPC is still
+// in flight. Because the handler is registered before the RPC, the frame
+// must be delivered to the consumer — not dropped.
+func TestGrabConn_BrokerFrameDuringSubscribe(t *testing.T) {
+       cnx := newSpyConnection()
+       var consumerReceivedChange atomic.Bool
+
+       rpc := &grabConnSpyRPCClient{
+               cnx: cnx,
+               duringSubscribe: func() {
+                       // Simulate the broker's read goroutine delivering a 
frame
+                       // while the subscribe RPC is in flight.
+                       if handler, ok := 
cnx.handler.Load().(*partitionConsumer); ok && handler != nil {
+                               handler.ActiveConsumerChanged(true)
+                               consumerReceivedChange.Store(true)
+                       }
+               },
+       }
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.NoError(t, err)
+
+       // Drain the connectedCh goroutine spawned on success.
+       <-pc.connectedCh
+
+       assert.True(t, consumerReceivedChange.Load(),
+               "Frames sent by the broker during the subscribe RPC must reach 
the consumer handler")
+}
+
+// TestGrabConn_MessageReceivedDuringSubscribe_NilConn verifies that when the
+// broker delivers a MESSAGE frame while the subscribe RPC is in flight, the
+// consumer does not panic due to a nil connection. This is a regression test:
+// registering the handler before calling _setConn means MessageReceived ->
+// discardCorruptedMessage -> _getConn() dereferences nil.
+func TestGrabConn_MessageReceivedDuringSubscribe_NilConn(t *testing.T) {
+       cnx := newSpyConnection()
+
+       rpc := &grabConnSpyRPCClient{
+               cnx: cnx,
+               duringSubscribe: func() {
+                       handler, ok := cnx.handler.Load().(*partitionConsumer)
+                       if !ok || handler == nil {
+                               return
+                       }
+                       // Deliver an empty (invalid) message frame. 
ReadBrokerMetadata
+                       // will fail, causing discardCorruptedMessage which 
calls
+                       // pc._getConn(). If conn is nil this panics.
+                       msgID := &pb.MessageIdData{
+                               LedgerId: proto.Uint64(1),
+                               EntryId:  proto.Uint64(1),
+                       }
+                       cmd := &pb.CommandMessage{MessageId: msgID}
+                       // 4 bytes: ReadBrokerMetadata reads 2 bytes for magic 
number (won't
+                       // match broker metadata magic → returns nil, nil), then
+                       // ReadMessageMetadata fails → discardCorruptedMessage 
→ _getConn()
+                       // panics if conn is nil.
+                       buf := internal.NewBuffer(4)
+                       buf.Write([]byte{0x00, 0x00, 0x00, 0x00})
+                       _ = handler.MessageReceived(cmd, buf)
+               },
+       }
+       // Build consumer WITHOUT pre-setting _setConn to simulate the real
+       // first-call path where conn is nil before grabConn completes.
+       pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+       // This must not panic.
+       assert.NotPanics(t, func() {
+               _ = pc.grabConn("")
+       })
+}
+
+// TestGrabConn_GetConnectionFailure verifies that grabConn returns the error
+// from GetConnection without registering a handler or sending an RPC.
+func TestGrabConn_GetConnectionFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       // Override the pool to return an error
+       pc.client.cnxPool = &grabConnMockPool{err: fmt.Errorf("connection 
refused")}
+
+       err := pc.grabConn("")
+       assert.ErrorContains(t, err, "connection refused")
+
+       assert.False(t, cnx.handlerRegistered.Load(),
+               "AddConsumeHandler must not be called when GetConnection fails")
+}
+
+// TestGrabConn_AddConsumeHandlerFailure verifies that grabConn returns the
+// error from AddConsumeHandler without sending a subscribe RPC.
+func TestGrabConn_AddConsumeHandlerFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       cnx.addHandlerErr = fmt.Errorf("connection closed")
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumer(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.ErrorContains(t, err, "connection closed")
+
+       assert.False(t, rpc.handlerRegisteredDuringRPC.Load(),
+               "Subscribe RPC must not be sent when AddConsumeHandler fails")
+}
+
+// TestGrabConn_ConnResetOnAddHandlerFailure verifies that when 
AddConsumeHandler
+// fails on the first call (no prior connection), pc.conn is reset to nil.
+func TestGrabConn_ConnResetOnAddHandlerFailure(t *testing.T) {
+       cnx := newSpyConnection()
+       cnx.addHandlerErr = fmt.Errorf("connection closed")
+       rpc := &grabConnSpyRPCClient{cnx: cnx}
+       pc := newGrabConnTestConsumerNoConn(cnx, rpc)
+
+       err := pc.grabConn("")
+       assert.ErrorContains(t, err, "connection closed")
+
+       assert.Nil(t, pc.conn.Load(),
+               "pc.conn must be reset to nil on first-call AddConsumeHandler 
failure")
+}
+
+// --- Helpers
+
+// newGrabConnTestConsumer builds a minimal partitionConsumer wired to the
+// given spy connection and RPC client, suitable for testing grabConn.
+func newGrabConnTestConsumer(cnx *spyConnection, rpc *grabConnSpyRPCClient) 
*partitionConsumer {
+       brokerURL, _ := url.Parse("pulsar://localhost:6650")
+       if rpc.lookupResult == nil {
+               rpc.lookupResult = &internal.LookupResult{
+                       LogicalAddr:  brokerURL,
+                       PhysicalAddr: brokerURL,
+               }
+       }
+       pool := &grabConnMockPool{cnx: cnx}
+
+       c := &client{
+               cnxPool:   pool,
+               rpcClient: rpc,
+               log:       log.DefaultNopLogger(),
+       }
+
+       pc := &partitionConsumer{
+               client:               c,
+               topic:                "persistent://public/default/test",
+               options:              &partitionConsumerOpts{subscription: 
"sub"},
+               log:                  log.DefaultNopLogger(),
+               compressionProviders: sync.Map{},
+               connectedCh:          make(chan struct{}, 1),
+               metrics:              newTestMetrics(),
+       }
+       // Required: lookupTopic calls _getConn().IsProxied() when 
assignedBrokerURL != "".
+       // grabConn will overwrite this with the same connection after a 
successful subscribe.
+       pc._setConn(cnx)
+       // availablePermits is NOT initialised here because the success path and
+       // simple error paths don't touch it. Only newGrabConnTestConsumerNoConn
+       // sets it — the MessageReceived path (discardCorruptedMessage) calls
+       // availablePermits.inc() which would panic on a nil receiver.
+       return pc
+}
+
+// newGrabConnTestConsumerNoConn is like newGrabConnTestConsumer but does NOT
+// pre-set _setConn, simulating the real first-call path where conn is nil.
+// Only use with grabConn("") since a non-empty assignedBrokerURL would call
+// _getConn().IsProxied() in lookupTopic and panic.
+func newGrabConnTestConsumerNoConn(cnx *spyConnection, rpc 
*grabConnSpyRPCClient) *partitionConsumer {
+       brokerURL, _ := url.Parse("pulsar://localhost:6650")
+       if rpc.lookupResult == nil {
+               rpc.lookupResult = &internal.LookupResult{
+                       LogicalAddr:  brokerURL,
+                       PhysicalAddr: brokerURL,
+               }
+       }
+       pool := &grabConnMockPool{cnx: cnx}
+
+       c := &client{
+               cnxPool:   pool,
+               rpcClient: rpc,
+               log:       log.DefaultNopLogger(),
+       }
+
+       pc := &partitionConsumer{
+               client:               c,
+               topic:                "persistent://public/default/test",
+               options:              &partitionConsumerOpts{subscription: 
"sub"},
+               log:                  log.DefaultNopLogger(),
+               compressionProviders: sync.Map{},
+               connectedCh:          make(chan struct{}, 1),
+               metrics:              newTestMetrics(),
+       }
+       pc.availablePermits = &availablePermits{pc: pc}
+       return pc
+}
+
+// spyConnection tracks AddConsumeHandler / DeleteConsumeHandler calls and
+// stores the registered handler so tests can deliver frames through it.
+type spyConnection struct {
+       dummyConnection
+       handlerRegistered atomic.Bool
+       handlerRemoved    atomic.Bool
+       handler           atomic.Value // stores *partitionConsumer
+       addHandlerErr     error        // when set, AddConsumeHandler returns 
this error
+}
+
+func newSpyConnection() *spyConnection {
+       return &spyConnection{}
+}
+
+func (s *spyConnection) AddConsumeHandler(_ uint64, h 
internal.ConsumerHandler) error {
+       if s.addHandlerErr != nil {
+               return s.addHandlerErr
+       }
+       s.handlerRegistered.Store(true)
+       // Store as *partitionConsumer so all stores use the same concrete type
+       // (atomic.Value panics if you mix concrete types across stores).
+       s.handler.Store(h.(*partitionConsumer))
+       return nil
+}
+
+func (s *spyConnection) DeleteConsumeHandler(_ uint64) {
+       s.handlerRemoved.Store(true)
+       var h *partitionConsumer
+       s.handler.Store(h)
+}
+
+// grabConnSpyRPCClient records the ordering of AddConsumeHandler relative to
+// the subscribe RPC, and optionally injects errors or mid-RPC callbacks.
+type grabConnSpyRPCClient struct {
+       internal.RPCClient
+       cnx          *spyConnection
+       lookupResult *internal.LookupResult
+
+       // handlerRegisteredDuringRPC is true if AddConsumeHandler was called
+       // before the subscribe RPC executed.
+       handlerRegisteredDuringRPC atomic.Bool
+
+       // subscribeErr, when set, makes the subscribe RPC return this error.
+       subscribeErr error
+
+       // duringSubscribe, when set, is called inside the subscribe RPC to
+       // simulate broker frames arriving while the RPC is in flight.
+       duringSubscribe func()
+
+       // closeSentOnCnx is true if a CLOSE_CONSUMER was sent via RequestOnCnx
+       // (as opposed to RequestWithCnxKeySuffix which could pick a different 
connection).
+       closeSentOnCnx atomic.Bool
+}
+
+func (r *grabConnSpyRPCClient) NewRequestID() uint64 { return 1 }
+
+func (r *grabConnSpyRPCClient) RequestOnCnxNoWait(_ internal.Connection, _ 
pb.BaseCommand_Type,
+       _ proto.Message) error {
+       return nil
+}
+
+func (r *grabConnSpyRPCClient) RequestOnCnx(_ internal.Connection, _ uint64,
+       cmdType pb.BaseCommand_Type, _ proto.Message) (*internal.RPCResult, 
error) {
+
+       switch cmdType {
+       case pb.BaseCommand_CLOSE_CONSUMER:
+               r.closeSentOnCnx.Store(true)
+               return nil, nil
+       case pb.BaseCommand_SUBSCRIBE:
+               // handled below
+       default:
+               panic(fmt.Sprintf("grabConnSpyRPCClient: unexpected command 
type %v", cmdType))
+       }
+
+       r.handlerRegisteredDuringRPC.Store(r.cnx.handlerRegistered.Load())
+
+       if r.duringSubscribe != nil {
+               r.duringSubscribe()
+       }
+
+       if r.subscribeErr != nil {
+               return nil, r.subscribeErr
+       }
+
+       successType := pb.BaseCommand_SUCCESS
+       return &internal.RPCResult{
+               Response: &pb.BaseCommand{Type: &successType},
+               Cnx:      r.cnx,
+       }, nil
+}
+
+func (r *grabConnSpyRPCClient) LookupService(_ string) 
(internal.LookupService, error) {
+       return &grabConnMockLookup{result: r.lookupResult}, nil
+}
+
+type grabConnMockLookup struct {
+       internal.LookupService
+       result *internal.LookupResult
+}
+
+func (m *grabConnMockLookup) Lookup(_ string) (*internal.LookupResult, error) {
+       return m.result, nil
+}
+
+type grabConnMockPool struct {
+       internal.ConnectionPool
+       cnx internal.Connection
+       err error
+}
+
+func (m *grabConnMockPool) GetConnection(_ *url.URL, _ *url.URL, _ int32) 
(internal.Connection, error) {
+       if m.err != nil {
+               return nil, m.err
+       }
+       return m.cnx, nil
+}

Reply via email to