This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit e44bd0481cc8b3b407c257a3411a97d48bb29844 Author: Gaylor Bosson <[email protected]> AuthorDate: Tue Jul 30 14:25:42 2024 +0200 [fix] Avoid a data race when flushing with load (#1261) Fixes #1258 ### Motivation While flushing, the data channel is switched if a new allocated one which can cause the loss of messages because the length can be zero which would stop the procedure and at the same time a new message can be sent to the channel. ### Modifications Instead of allocating a new channel, it empties the existing one up to the length of the buffer of the channel before proceeding with the flush. (cherry picked from commit 8dd4ed19b98ecf9287cbcfc20e2de9a12b197e3f) --- pulsar/producer_partition.go | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 5c038aa5..f5fd493b 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { p.schemaCache.Put(p.schemaInfo, schemaVersion) } - if err != nil { - return err - } - if !p.options.DisableBatching && p.batchBuilder == nil { provider, err := GetBatcherBuilderProvider(p.options.BatcherBuilderType) if err != nil { @@ -1022,15 +1018,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() { } func (p *partitionProducer) internalFlush(fr *flushRequest) { - // clear all the messages which have sent to dataChan before flush - if len(p.dataChan) != 0 { - oldDataChan := p.dataChan - p.dataChan = make(chan *sendRequest, p.options.MaxPendingMessages) - for len(oldDataChan) != 0 { - pendingData := <-oldDataChan - p.internalSend(pendingData) - } - } + p.clearPendingSendRequests() if !p.options.DisableBatching { p.internalFlushCurrentBatch() @@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } } +// clearPendingSendRequests makes sure to push forward previous sending requests +// by emptying the data channel. +func (p *partitionProducer) clearPendingSendRequests() { + sizeBeforeFlushing := len(p.dataChan) + + // Bound the for loop to the current length of the channel to ensure that it + // will eventually stop as we only want to ensure that existing messages are + // flushed. + for i := 0; i < sizeBeforeFlushing; i++ { + select { + case pendingData := <-p.dataChan: + p.internalSend(pendingData) + + default: + return + } + } +} + func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { var err error var msgID MessageID
