This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3812c07 [Improve][Producer] simplify the flush logic (#1049)
3812c07 is described below
commit 3812c07a0b8f6c37a2803cd740f4bf917160fd22
Author: gunli <[email protected]>
AuthorDate: Tue Jul 18 18:57:47 2023 +0800
[Improve][Producer] simplify the flush logic (#1049)
### Motivation
Simplify the producer flush logic
### Modifications
1. add a callback field to the pendingItem, default is nil;
2. in partitionProducer.internalFlush() get the last pendingItem from
pendingQueue;
3. update the last pendingItem by setup a new callback;
4. in partitionProducer.ReceivedSendReceipt, no need to identify the
sendRequest by checking if the msg is nil;
5. in pendingItem.Complete(), invoke its callback to notify the flush is
done.
---------
Co-authored-by: gunli <[email protected]>
---
pulsar/producer_partition.go | 59 ++++++++++++++++++--------------------------
1 file changed, 24 insertions(+), 35 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 48411b4..e74fd98 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -848,11 +848,12 @@ func (p *partitionProducer) internalSingleSend(mm
*pb.MessageMetadata,
type pendingItem struct {
sync.Mutex
- buffer internal.Buffer
- sequenceID uint64
- sentAt time.Time
- sendRequests []interface{}
- completed bool
+ buffer internal.Buffer
+ sequenceID uint64
+ sentAt time.Time
+ sendRequests []interface{}
+ completed bool
+ flushCallback func(err error)
}
func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -990,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() {
}
// flag the sending has completed with error, flush
make no effect
- pi.Complete()
+ pi.Complete(errSendTimeout)
pi.Unlock()
// finally reached the last view item, current
iteration ends
@@ -1062,15 +1063,10 @@ func (p *partitionProducer) internalFlush(fr
*flushRequest) {
return
}
- sendReq := &sendRequest{
- msg: nil,
- callback: func(id MessageID, message *ProducerMessage, e error)
{
- fr.err = e
- close(fr.doneCh)
- },
+ pi.flushCallback = func(err error) {
+ fr.err = err
+ close(fr.doneCh)
}
-
- pi.sendRequests = append(pi.sendRequests, sendReq)
}
func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage)
(MessageID, error) {
@@ -1208,27 +1204,17 @@ func (p *partitionProducer)
ReceivedSendReceipt(response *pb.CommandSendReceipt)
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
- batchSize := int32(0)
- for _, i := range pi.sendRequests {
- sr := i.(*sendRequest)
- if sr.msg != nil {
- batchSize = batchSize + 1
- } else { // Flush request
- break
- }
- }
+ batchSize := int32(len(pi.sendRequests))
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
- if sr.msg != nil {
- atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
-
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
-
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
- p.metrics.MessagesPublished.Inc()
- p.metrics.MessagesPending.Dec()
- payloadSize := float64(len(sr.msg.Payload))
- p.metrics.BytesPublished.Add(payloadSize)
- p.metrics.BytesPending.Sub(payloadSize)
- }
+ atomic.StoreInt64(&p.lastSequenceID,
int64(pi.sequenceID))
+ p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
+
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+ p.metrics.MessagesPublished.Inc()
+ p.metrics.MessagesPending.Dec()
+ payloadSize := float64(len(sr.msg.Payload))
+ p.metrics.BytesPublished.Add(payloadSize)
+ p.metrics.BytesPending.Sub(payloadSize)
if sr.callback != nil || len(p.options.Interceptors) >
0 {
msgID := newMessageID(
@@ -1274,7 +1260,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
}
// Mark this pending item as done
- pi.Complete()
+ pi.Complete(nil)
}
}
@@ -1384,12 +1370,15 @@ type flushRequest struct {
err error
}
-func (i *pendingItem) Complete() {
+func (i *pendingItem) Complete(err error) {
if i.completed {
return
}
i.completed = true
buffersPool.Put(i.buffer)
+ if i.flushCallback != nil {
+ i.flushCallback(err)
+ }
}
// _setConn sets the internal connection field of this partition producer
atomically.