wuYin commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r528022439



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
                return
        }
 
+       p.queueLock.Lock()
        p.pendingQueue.Put(&pendingItem{
+               createdAt:    time.Now(),
                batchData:    batchData,
                sequenceID:   sequenceID,
                sendRequests: callbacks,
        })
+       p.queueLock.Unlock()
+
        p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+       t := time.NewTimer(sendTimeout)
+       var state int32
+       for range t.C {
+               // since Closing/Closed state could not be reopen, load and 
compare is safe
+               state = atomic.LoadInt32(&p.state)
+               if state == producerClosing || state == producerClosed {
+                       t.Stop()
+                       return
+               }
+
+               p.queueLock.Lock()
+               item := p.pendingQueue.Peek()
+               if item == nil {
+                       t.Reset(p.options.SendTimeout)
+                       p.queueLock.Unlock()
+                       continue
+               }
+
+               pi := item.(*pendingItem)
+               diff := p.options.SendTimeout - time.Since(pi.createdAt)
+               if diff > 0 {
+                       // pending messages not timeout yet
+                       t.Reset(diff)
+                       p.queueLock.Unlock()
+                       continue
+               }
+
+               p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+               for p.pendingQueue.Size() > 0 {
+                       pi = p.pendingQueue.Poll().(*pendingItem)
+                       pi.Lock()
+                       for _, i := range pi.sendRequests {
+                               sr := i.(*sendRequest)
+                               if sr.msg != nil {
+                                       size := len(sr.msg.Payload)
+                                       p.publishSemaphore.Release()
+                                       messagesPending.Dec()
+                                       bytesPending.Sub(float64(size))
+                                       publishErrors.Inc()
+                                       p.log.WithError(errSendTimeout).
+                                               WithField("size", size).
+                                               WithField("properties", 
sr.msg.Properties)
+                               }
+                               if sr.callback != nil {
+                                       sr.callback(nil, sr.msg, errSendTimeout)
+                               }
+                       }
+                       buffersPool.Put(pi.batchData)
+                       pi.Unlock()
+               }
+               t.Reset(p.options.SendTimeout)
+               p.queueLock.Unlock()
+       }
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
        p.internalFlushCurrentBatch()
 
+       p.queueLock.Lock()

Review comment:
       Thanks for review, good idea.
   In some case `producer.eventLoop()` will be stucked such as producer is 
reconnecting to broker, 2 ways:
   - Check at independent goroutine: `sendTimeout` still effective, periodic 
check and fail pending messages.
   - Check before flushing: `sendTimeout` not effective in this case.
   
   If no need to check message timeout precisely, the above case will be 
ignored.
   This is reasonable, since `sendTimeout` now implemented at **producer 
level**, no need to be that precise, also avoid the cost of lock.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to