This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a42cc24f fix: failTimeoutMessages cannot delete outdated messages
(#1247)
a42cc24f is described below
commit a42cc24f8ff4c00f4d2f901bf9c823b5b2c653e6
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Jul 22 18:32:07 2024 +0800
fix: failTimeoutMessages cannot delete outdated messages (#1247)
* fix: failTimeoutMessages cannot delete outdated messages
* Fix slice pass
---
pulsar/producer_partition.go | 36 +++++++++++++++---------------------
1 file changed, 15 insertions(+), 21 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 1b2fae72..78f1f3cc 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -372,8 +372,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
continue
}
pi := item.(*pendingItem)
- // when resending pending batches, we update the sendAt
timestamp and put to the back of queue
- // to avoid pending item been removed by
failTimeoutMessages and cause race condition
+ // when resending pending batches, we update the sendAt
timestamp to record the metric.
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
@@ -814,19 +813,14 @@ func (p *partitionProducer) internalSingleSend(
return
}
- p.pendingQueue.Put(&pendingItem{
- sentAt: time.Now(),
- buffer: buffer,
- sequenceID: sid,
- sendRequests: []interface{}{sr},
- })
- p._getConn().WriteData(buffer)
+ p.writeData(buffer, sid, []interface{}{sr})
}
type pendingItem struct {
sync.Mutex
buffer internal.Buffer
sequenceID uint64
+ createdAt time.Time
sentAt time.Time
sendRequests []interface{}
isDone bool
@@ -868,13 +862,19 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
return
}
+ p.writeData(batchData, sequenceID, callbacks)
+}
+
+func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID
uint64, callbacks []interface{}) {
+ now := time.Now()
p.pendingQueue.Put(&pendingItem{
- sentAt: time.Now(),
- buffer: batchData,
+ createdAt: now,
+ sentAt: now,
+ buffer: buffer,
sequenceID: sequenceID,
sendRequests: callbacks,
})
- p._getConn().WriteData(batchData)
+ p._getConn().WriteData(buffer)
}
func (p *partitionProducer) failTimeoutMessages() {
@@ -898,7 +898,7 @@ func (p *partitionProducer) failTimeoutMessages() {
continue
}
oldestItem := item.(*pendingItem)
- if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
+ if nextWaiting := diff(oldestItem.createdAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait
and retry
t.Reset(nextWaiting)
continue
@@ -930,7 +930,7 @@ func (p *partitionProducer) failTimeoutMessages() {
pi := m.(*pendingItem)
pi.Lock()
defer pi.Unlock()
- if nextWaiting := diff(pi.sentAt);
nextWaiting > 0 {
+ if nextWaiting := diff(pi.createdAt);
nextWaiting > 0 {
// current and subsequent items
not timeout yet, stop iterating
tickerNeedWaiting = nextWaiting
return false
@@ -995,13 +995,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
if b.BatchData == nil {
continue
}
- p.pendingQueue.Put(&pendingItem{
- sentAt: time.Now(),
- buffer: b.BatchData,
- sequenceID: b.SequenceID,
- sendRequests: b.Callbacks,
- })
- p._getConn().WriteData(b.BatchData)
+ p.writeData(b.BatchData, b.SequenceID, b.Callbacks)
}
}