This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 96ce2de Fix data race while accessing connection in partitionConsumer (#535) 96ce2de is described below commit 96ce2de573ea8ad14173f41d6e48b6b48a8b0aa0 Author: dferstay <dfers...@users.noreply.github.com> AuthorDate: Thu Jun 24 02:27:12 2021 -0700 Fix data race while accessing connection in partitionConsumer (#535) The partitionConsumer maintains a few internal go-routines, two of which access the underlying internal.Connection. The main runEvenstLoop() go-routine reads the connection field while a separate go-routine is used to detect connnection loss, initiate reconnection, and sets the connection. Previously, access to the conn field was not synchronized. Now, the conn field is read and written atomically; resolving the data race. Signed-off-by: Daniel Ferstay <dfers...@splunk.com> Co-authored-by: Daniel Ferstay <dfers...@splunk.com> --- pulsar/consumer_partition.go | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 94a914d..daaf759 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -107,7 +107,7 @@ type partitionConsumer struct { state atomic.Int32 options *partitionConsumerOpts - conn internal.Connection + conn atomic.Value topic string name string @@ -238,7 +238,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { RequestId: proto.Uint64(requestID), ConsumerId: proto.Uint64(pc.consumerID), } - _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe) + _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe) if err != nil { pc.log.WithError(err).Error("Failed to unsubscribe consumer") unsub.err = err @@ -248,7 +248,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) { return } - pc.conn.DeleteConsumeHandler(pc.consumerID) + pc._getConn().DeleteConsumeHandler(pc.consumerID) if pc.nackTracker != nil { pc.nackTracker.Close() } @@ -276,7 +276,7 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error RequestId: proto.Uint64(requestID), ConsumerId: proto.Uint64(pc.consumerID), } - res, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, + res, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID) if err != nil { pc.log.WithError(err).Error("Failed to get last message id") @@ -326,7 +326,7 @@ func (pc *partitionConsumer) internalRedeliver(req *redeliveryRequest) { } } - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, + pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{ ConsumerId: proto.Uint64(pc.consumerID), MessageIds: msgIDDataList, @@ -399,7 +399,7 @@ func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error { MessageId: id, } - _, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek) + _, err = pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek) if err != nil { pc.log.WithError(err).Error("Failed to reset to message id") return err @@ -435,7 +435,7 @@ func (pc *partitionConsumer) internalSeekByTime(seek *seekByTimeRequest) { MessagePublishTime: proto.Uint64(uint64(seek.publishTime.UnixNano() / int64(time.Millisecond))), } - _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek) + _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_SEEK, cmdSeek) if err != nil { pc.log.WithError(err).Error("Failed to reset to message publish time") seek.err = err @@ -465,7 +465,7 @@ func (pc *partitionConsumer) internalAck(req *ackRequest) { AckType: pb.CommandAck_Individual.Enum(), } - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_ACK, cmdAck) + pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, cmdAck) } func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error { @@ -607,7 +607,7 @@ func (pc *partitionConsumer) internalFlow(permits uint32) error { ConsumerId: proto.Uint64(pc.consumerID), MessagePermits: proto.Uint32(permits), } - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, pb.BaseCommand_FLOW, cmdFlow) + pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_FLOW, cmdFlow) return nil } @@ -843,7 +843,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { ConsumerId: proto.Uint64(pc.consumerID), RequestId: proto.Uint64(requestID), } - _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose) + _, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose) if err != nil { pc.log.WithError(err).Warn("Failed to close consumer") } else { @@ -855,7 +855,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) { } pc.setConsumerState(consumerClosed) - pc.conn.DeleteConsumeHandler(pc.consumerID) + pc._getConn().DeleteConsumeHandler(pc.consumerID) if pc.nackTracker != nil { pc.nackTracker.Close() } @@ -971,9 +971,9 @@ func (pc *partitionConsumer) grabConn() error { pc.name = res.Response.ConsumerStatsResponse.GetConsumerName() } - pc.conn = res.Cnx + pc._setConn(res.Cnx) pc.log.Info("Connected consumer") - pc.conn.AddConsumeHandler(pc.consumerID, pc) + pc._getConn().AddConsumeHandler(pc.consumerID, pc) msgType := res.Response.GetType() @@ -1104,7 +1104,7 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, "validationError": validationError, }).Error("Discarding corrupted message") - pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, + pc.client.rpcClient.RequestOnCnxNoWait(pc._getConn(), pb.BaseCommand_ACK, &pb.CommandAck{ ConsumerId: proto.Uint64(pc.consumerID), MessageId: []*pb.MessageIdData{msgID}, @@ -1113,6 +1113,21 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData, }) } +// _setConn sets the internal connection field of this partition consumer atomically. +// Note: should only be called by this partition consumer when a new connection is available. +func (pc *partitionConsumer) _setConn(conn internal.Connection) { + pc.conn.Store(conn) +} + +// _getConn returns internal connection field of this partition consumer atomically. +// Note: should only be called by this partition consumer before attempting to use the connection +func (pc *partitionConsumer) _getConn() internal.Connection { + // Invariant: The conn must be non-nill for the lifetime of the partitionConsumer. + // For this reason we leave this cast unchecked and panic() if the + // invariant is broken + return pc.conn.Load().(internal.Connection) +} + func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData { if msgID.Undefined() { return nil