This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.13.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 13a78684ee4c6df78c852bc65539581a2caf1e36
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jul 22 18:32:07 2024 +0800

    fix: failTimeoutMessages cannot delete outdated messages (#1247)
    
    * fix: failTimeoutMessages cannot delete outdated messages
    
    * Fix slice pass
    
    (cherry picked from commit a42cc24f8ff4c00f4d2f901bf9c823b5b2c653e6)
---
 pulsar/producer_partition.go | 36 +++++++++++++++---------------------
 1 file changed, 15 insertions(+), 21 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1b2fae72..78f1f3cc 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -372,8 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                                continue
                        }
                        pi := item.(*pendingItem)
-                       // when resending pending batches, we update the sendAt 
timestamp and put to the back of queue
-                       // to avoid pending item been removed by 
failTimeoutMessages and cause race condition
+                       // when resending pending batches, we update the sendAt 
timestamp to record the metric.
                        pi.Lock()
                        pi.sentAt = time.Now()
                        pi.Unlock()
@@ -814,19 +813,14 @@ func (p *partitionProducer) internalSingleSend(
                return
        }
 
-       p.pendingQueue.Put(&pendingItem{
-               sentAt:       time.Now(),
-               buffer:       buffer,
-               sequenceID:   sid,
-               sendRequests: []interface{}{sr},
-       })
-       p._getConn().WriteData(buffer)
+       p.writeData(buffer, sid, []interface{}{sr})
 }
 
 type pendingItem struct {
        sync.Mutex
        buffer        internal.Buffer
        sequenceID    uint64
+       createdAt     time.Time
        sentAt        time.Time
        sendRequests  []interface{}
        isDone        bool
@@ -868,13 +862,19 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
                return
        }
 
+       p.writeData(batchData, sequenceID, callbacks)
+}
+
+func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID 
uint64, callbacks []interface{}) {
+       now := time.Now()
        p.pendingQueue.Put(&pendingItem{
-               sentAt:       time.Now(),
-               buffer:       batchData,
+               createdAt:    now,
+               sentAt:       now,
+               buffer:       buffer,
                sequenceID:   sequenceID,
                sendRequests: callbacks,
        })
-       p._getConn().WriteData(batchData)
+       p._getConn().WriteData(buffer)
 }
 
 func (p *partitionProducer) failTimeoutMessages() {
@@ -898,7 +898,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                        continue
                }
                oldestItem := item.(*pendingItem)
-               if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
+               if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
                        // none of these pending messages have timed out, wait 
and retry
                        t.Reset(nextWaiting)
                        continue
@@ -930,7 +930,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                                        pi := m.(*pendingItem)
                                        pi.Lock()
                                        defer pi.Unlock()
-                                       if nextWaiting := diff(pi.sentAt); 
nextWaiting > 0 {
+                                       if nextWaiting := diff(pi.createdAt); 
nextWaiting > 0 {
                                                // current and subsequent items 
not timeout yet, stop iterating
                                                tickerNeedWaiting = nextWaiting
                                                return false
@@ -995,13 +995,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
                if b.BatchData == nil {
                        continue
                }
-               p.pendingQueue.Put(&pendingItem{
-                       sentAt:       time.Now(),
-                       buffer:       b.BatchData,
-                       sequenceID:   b.SequenceID,
-                       sendRequests: b.Callbacks,
-               })
-               p._getConn().WriteData(b.BatchData)
+               p.writeData(b.BatchData, b.SequenceID, b.Callbacks)
        }
 
 }

Reply via email to