cckellogg commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527941803



##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
                return
        }
 
+       p.queueLock.Lock()
        p.pendingQueue.Put(&pendingItem{
+               createdAt:    time.Now(),
                batchData:    batchData,
                sequenceID:   sequenceID,
                sendRequests: callbacks,
        })
+       p.queueLock.Unlock()
+
        p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+       t := time.NewTimer(sendTimeout)
+       var state int32
+       for range t.C {
+               // since Closing/Closed state could not be reopen, load and 
compare is safe
+               state = atomic.LoadInt32(&p.state)
+               if state == producerClosing || state == producerClosed {
+                       t.Stop()
+                       return
+               }
+
+               p.queueLock.Lock()
+               item := p.pendingQueue.Peek()
+               if item == nil {
+                       t.Reset(p.options.SendTimeout)
+                       p.queueLock.Unlock()
+                       continue
+               }
+
+               pi := item.(*pendingItem)
+               diff := p.options.SendTimeout - time.Since(pi.createdAt)
+               if diff > 0 {
+                       // pending messages not timeout yet
+                       t.Reset(diff)
+                       p.queueLock.Unlock()
+                       continue
+               }
+
+               p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+               for p.pendingQueue.Size() > 0 {
+                       pi = p.pendingQueue.Poll().(*pendingItem)
+                       pi.Lock()
+                       for _, i := range pi.sendRequests {
+                               sr := i.(*sendRequest)
+                               if sr.msg != nil {
+                                       size := len(sr.msg.Payload)
+                                       p.publishSemaphore.Release()
+                                       messagesPending.Dec()
+                                       bytesPending.Sub(float64(size))
+                                       publishErrors.Inc()
+                                       p.log.WithError(errSendTimeout).
+                                               WithField("size", size).
+                                               WithField("properties", 
sr.msg.Properties)
+                               }
+                               if sr.callback != nil {
+                                       sr.callback(nil, sr.msg, errSendTimeout)
+                               }
+                       }
+                       buffersPool.Put(pi.batchData)
+                       pi.Unlock()
+               }
+               t.Reset(p.options.SendTimeout)
+               p.queueLock.Unlock()
+       }
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
        p.internalFlushCurrentBatch()
 
+       p.queueLock.Lock()

Review comment:
       Instead of having a separate go routine checking for timeouts can the 
message expiry check be performed here?
   
   I don't think the timeout check needs to be exact? We just need to expires 
old messages at some point? This could simplify the code and reduces the need 
for another lock. We can just call this method `failTimeoutMessages` and expire 
messages if needed. 
    

##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
        // This properties will be visible in the topic stats
        Properties map[string]string
 
+       // SendTimeout set the timeout for a message that not be acknowledged 
by server since sent.
+       // Send and SendAsync returns an error after timeout.
+       // Default is 30 seconds, -1 to disable.
+       SendTimeout time.Duration
+
+       // NonBlockIfQueueFull control whether Send and SendAsync block if 
producer's message queue is full.
+       // Default is false, if set to true then Send and SendAsync return 
error when queue is full.
+       NonBlockIfQueueFull bool

Review comment:
       `DisableBlockIfQueueFull` this is clearer to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to