This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3812c07  [Improve][Producer] simplify the flush logic (#1049)
3812c07 is described below

commit 3812c07a0b8f6c37a2803cd740f4bf917160fd22
Author: gunli <[email protected]>
AuthorDate: Tue Jul 18 18:57:47 2023 +0800

    [Improve][Producer] simplify the flush logic (#1049)
    
    ### Motivation
    
    Simplify the producer flush logic
    
    ### Modifications
    1. add a callback field to the pendingItem, default is nil;
    2. in partitionProducer.internalFlush() get the last pendingItem from 
pendingQueue;
    3. update the last pendingItem by setup a new callback;
    4. in partitionProducer.ReceivedSendReceipt, no need to identify the 
sendRequest by checking if the msg is nil;
    5. in pendingItem.Complete(), invoke its callback to notify the flush is 
done.
    
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 pulsar/producer_partition.go | 59 ++++++++++++++++++--------------------------
 1 file changed, 24 insertions(+), 35 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 48411b4..e74fd98 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -848,11 +848,12 @@ func (p *partitionProducer) internalSingleSend(mm 
*pb.MessageMetadata,
 
 type pendingItem struct {
        sync.Mutex
-       buffer       internal.Buffer
-       sequenceID   uint64
-       sentAt       time.Time
-       sendRequests []interface{}
-       completed    bool
+       buffer        internal.Buffer
+       sequenceID    uint64
+       sentAt        time.Time
+       sendRequests  []interface{}
+       completed     bool
+       flushCallback func(err error)
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -990,7 +991,7 @@ func (p *partitionProducer) failTimeoutMessages() {
                        }
 
                        // flag the sending has completed with error, flush 
make no effect
-                       pi.Complete()
+                       pi.Complete(errSendTimeout)
                        pi.Unlock()
 
                        // finally reached the last view item, current 
iteration ends
@@ -1062,15 +1063,10 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
                return
        }
 
-       sendReq := &sendRequest{
-               msg: nil,
-               callback: func(id MessageID, message *ProducerMessage, e error) 
{
-                       fr.err = e
-                       close(fr.doneCh)
-               },
+       pi.flushCallback = func(err error) {
+               fr.err = err
+               close(fr.doneCh)
        }
-
-       pi.sendRequests = append(pi.sendRequests, sendReq)
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) 
(MessageID, error) {
@@ -1208,27 +1204,17 @@ func (p *partitionProducer) 
ReceivedSendReceipt(response *pb.CommandSendReceipt)
                pi.Lock()
                defer pi.Unlock()
                
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
-               batchSize := int32(0)
-               for _, i := range pi.sendRequests {
-                       sr := i.(*sendRequest)
-                       if sr.msg != nil {
-                               batchSize = batchSize + 1
-                       } else { // Flush request
-                               break
-                       }
-               }
+               batchSize := int32(len(pi.sendRequests))
                for idx, i := range pi.sendRequests {
                        sr := i.(*sendRequest)
-                       if sr.msg != nil {
-                               atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
-                               
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
-                               
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
-                               p.metrics.MessagesPublished.Inc()
-                               p.metrics.MessagesPending.Dec()
-                               payloadSize := float64(len(sr.msg.Payload))
-                               p.metrics.BytesPublished.Add(payloadSize)
-                               p.metrics.BytesPending.Sub(payloadSize)
-                       }
+                       atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
+                       p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
+                       
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+                       p.metrics.MessagesPublished.Inc()
+                       p.metrics.MessagesPending.Dec()
+                       payloadSize := float64(len(sr.msg.Payload))
+                       p.metrics.BytesPublished.Add(payloadSize)
+                       p.metrics.BytesPending.Sub(payloadSize)
 
                        if sr.callback != nil || len(p.options.Interceptors) > 
0 {
                                msgID := newMessageID(
@@ -1274,7 +1260,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                }
 
                // Mark this pending item as done
-               pi.Complete()
+               pi.Complete(nil)
        }
 }
 
@@ -1384,12 +1370,15 @@ type flushRequest struct {
        err    error
 }
 
-func (i *pendingItem) Complete() {
+func (i *pendingItem) Complete(err error) {
        if i.completed {
                return
        }
        i.completed = true
        buffersPool.Put(i.buffer)
+       if i.flushCallback != nil {
+               i.flushCallback(err)
+       }
 }
 
 // _setConn sets the internal connection field of this partition producer 
atomically.

Reply via email to