cckellogg commented on a change in pull request #394:
URL: https://github.com/apache/pulsar-client-go/pull/394#discussion_r527941803
##########
File path: pulsar/producer_partition.go
##########
@@ -434,17 +445,79 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
return
}
+ p.queueLock.Lock()
p.pendingQueue.Put(&pendingItem{
+ createdAt: time.Now(),
batchData: batchData,
sequenceID: sequenceID,
sendRequests: callbacks,
})
+ p.queueLock.Unlock()
+
p.cnx.WriteData(batchData)
}
+func (p *partitionProducer) failTimeoutMessages(sendTimeout time.Duration) {
+ t := time.NewTimer(sendTimeout)
+ var state int32
+ for range t.C {
+ // since Closing/Closed state could not be reopen, load and
compare is safe
+ state = atomic.LoadInt32(&p.state)
+ if state == producerClosing || state == producerClosed {
+ t.Stop()
+ return
+ }
+
+ p.queueLock.Lock()
+ item := p.pendingQueue.Peek()
+ if item == nil {
+ t.Reset(p.options.SendTimeout)
+ p.queueLock.Unlock()
+ continue
+ }
+
+ pi := item.(*pendingItem)
+ diff := p.options.SendTimeout - time.Since(pi.createdAt)
+ if diff > 0 {
+ // pending messages not timeout yet
+ t.Reset(diff)
+ p.queueLock.Unlock()
+ continue
+ }
+
+ p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+ for p.pendingQueue.Size() > 0 {
+ pi = p.pendingQueue.Poll().(*pendingItem)
+ pi.Lock()
+ for _, i := range pi.sendRequests {
+ sr := i.(*sendRequest)
+ if sr.msg != nil {
+ size := len(sr.msg.Payload)
+ p.publishSemaphore.Release()
+ messagesPending.Dec()
+ bytesPending.Sub(float64(size))
+ publishErrors.Inc()
+ p.log.WithError(errSendTimeout).
+ WithField("size", size).
+ WithField("properties",
sr.msg.Properties)
+ }
+ if sr.callback != nil {
+ sr.callback(nil, sr.msg, errSendTimeout)
+ }
+ }
+ buffersPool.Put(pi.batchData)
+ pi.Unlock()
+ }
+ t.Reset(p.options.SendTimeout)
+ p.queueLock.Unlock()
+ }
+}
+
func (p *partitionProducer) internalFlush(fr *flushRequest) {
p.internalFlushCurrentBatch()
+ p.queueLock.Lock()
Review comment:
Instead of having a separate go routine checking for timeouts can the
message expiry check be performed here?
I don't think the timeout check needs to be exact? We just need to expires
old messages at some point? This could simplify the code and reduces the need
for another lock. We can just call this method `failTimeoutMessages` and expire
messages if needed.
##########
File path: pulsar/producer.go
##########
@@ -76,6 +76,15 @@ type ProducerOptions struct {
// This properties will be visible in the topic stats
Properties map[string]string
+ // SendTimeout set the timeout for a message that not be acknowledged
by server since sent.
+ // Send and SendAsync returns an error after timeout.
+ // Default is 30 seconds, -1 to disable.
+ SendTimeout time.Duration
+
+ // NonBlockIfQueueFull control whether Send and SendAsync block if
producer's message queue is full.
+ // Default is false, if set to true then Send and SendAsync return
error when queue is full.
+ NonBlockIfQueueFull bool
Review comment:
`DisableBlockIfQueueFull` this is clearer to me
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]