This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd4ed19 [fix] Avoid a data race when flushing with load (#1261)
8dd4ed19 is described below

commit 8dd4ed19b98ecf9287cbcfc20e2de9a12b197e3f
Author: Gaylor Bosson <[email protected]>
AuthorDate: Tue Jul 30 14:25:42 2024 +0200

    [fix] Avoid a data race when flushing with load (#1261)
    
    Fixes #1258
    
    ### Motivation
    
    While flushing, the data channel is switched if a new allocated one which 
can cause the loss of messages because the length can be zero which would stop 
the procedure and at the same time a new message can be sent to the channel.
    
    ### Modifications
    
    Instead of allocating a new channel, it empties the existing one up to the 
length of the buffer of the channel before proceeding with the flush.
---
 pulsar/producer_partition.go | 33 ++++++++++++++++++++-------------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 5c038aa5..f5fd493b 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -338,10 +338,6 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                p.schemaCache.Put(p.schemaInfo, schemaVersion)
        }
 
-       if err != nil {
-               return err
-       }
-
        if !p.options.DisableBatching && p.batchBuilder == nil {
                provider, err := 
GetBatcherBuilderProvider(p.options.BatcherBuilderType)
                if err != nil {
@@ -1022,15 +1018,7 @@ func (p *partitionProducer) 
internalFlushCurrentBatches() {
 }
 
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
-       // clear all the messages which have sent to dataChan before flush
-       if len(p.dataChan) != 0 {
-               oldDataChan := p.dataChan
-               p.dataChan = make(chan *sendRequest, 
p.options.MaxPendingMessages)
-               for len(oldDataChan) != 0 {
-                       pendingData := <-oldDataChan
-                       p.internalSend(pendingData)
-               }
-       }
+       p.clearPendingSendRequests()
 
        if !p.options.DisableBatching {
                p.internalFlushCurrentBatch()
@@ -1061,6 +1049,25 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
        }
 }
 
+// clearPendingSendRequests makes sure to push forward previous sending 
requests
+// by emptying the data channel.
+func (p *partitionProducer) clearPendingSendRequests() {
+       sizeBeforeFlushing := len(p.dataChan)
+
+       // Bound the for loop to the current length of the channel to ensure 
that it
+       // will eventually stop as we only want to ensure that existing 
messages are
+       // flushed.
+       for i := 0; i < sizeBeforeFlushing; i++ {
+               select {
+               case pendingData := <-p.dataChan:
+                       p.internalSend(pendingData)
+
+               default:
+                       return
+               }
+       }
+}
+
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
(MessageID, error) {
        var err error
        var msgID MessageID

Reply via email to