This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2490435  Support sendTimeout (#394)
2490435 is described below

commit 2490435fa54c9a01f76e942612301a59f32382d0
Author: wuYin <[email protected]>
AuthorDate: Tue Dec 1 02:51:30 2020 +0800

    Support sendTimeout (#394)
    
    * Support sendTimeout
    
    * Substitute NonBlockIfQueueFull for BlockIfQueueFull
    
    * Rename NonBlockIfQueueFull to DisableBlockIfQueueFull
    
    * Replaced createdAt with sentAt which added in PR 397.
    
    * Check and fail timeout messages before flushing batch
    
    * Optimize code
    
    * Remove redundant checking
---
 integration-tests/standalone.conf |  2 +-
 pulsar/producer.go                |  9 +++++
 pulsar/producer_impl.go           |  8 ++++-
 pulsar/producer_partition.go      | 70 +++++++++++++++++++++++++++++++++++----
 pulsar/producer_test.go           | 48 +++++++++++++++++++++++++++
 5 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/integration-tests/standalone.conf 
b/integration-tests/standalone.conf
index 3fdf53a..b426ff3 100644
--- a/integration-tests/standalone.conf
+++ b/integration-tests/standalone.conf
@@ -54,7 +54,7 @@ brokerShutdownTimeoutMs=3000
 backlogQuotaCheckEnabled=true
 
 # How often to check for topics that have reached the quota
-backlogQuotaCheckIntervalInSeconds=60
+backlogQuotaCheckIntervalInSeconds=5
 
 # Default per-topic backlog quota limit
 backlogQuotaDefaultLimitGB=10
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a2b7526..1dc0775 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -76,6 +76,15 @@ type ProducerOptions struct {
        // This properties will be visible in the topic stats
        Properties map[string]string
 
+       // SendTimeout set the timeout for a message that not be acknowledged 
by server since sent.
+       // Send and SendAsync returns an error after timeout.
+       // Default is 30 seconds, negative such as -1 to disable.
+       SendTimeout time.Duration
+
+       // DisableBlockIfQueueFull control whether Send and SendAsync block if 
producer's message queue is full.
+       // Default is false, if set to true then Send and SendAsync return 
error when queue is full.
+       DisableBlockIfQueueFull bool
+
        // MaxPendingMessages set the max size of the queue holding the 
messages pending to receive an
        // acknowledgment from the broker.
        MaxPendingMessages int
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index c166f70..51ae69e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -32,6 +32,9 @@ import (
 )
 
 const (
+       // defaultSendTimeout init default timeout for ack since sent.
+       defaultSendTimeout = 30 * time.Second
+
        // defaultBatchingMaxPublishDelay init default for maximum delay to 
batch messages
        defaultBatchingMaxPublishDelay = 10 * time.Millisecond
 
@@ -91,13 +94,16 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                return nil, newError(ResultInvalidTopicName, "Topic name is 
required for producer")
        }
 
+       if options.SendTimeout == 0 {
+               options.SendTimeout = defaultSendTimeout
+       }
        if options.BatchingMaxMessages == 0 {
                options.BatchingMaxMessages = defaultMaxMessagesPerBatch
        }
        if options.BatchingMaxSize == 0 {
                options.BatchingMaxSize = defaultMaxBatchSize
        }
-       if options.BatchingMaxPublishDelay == 0 {
+       if options.BatchingMaxPublishDelay <= 0 {
                options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay
        }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 3397b6e..2a9093f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -46,6 +46,8 @@ const (
 
 var (
        errFailAddBatch    = errors.New("message send failed")
+       errSendTimeout     = errors.New("message send timeout")
+       errSendQueueIsFull = errors.New("producer send queue is full")
        errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
 
        buffersPool sync.Pool
@@ -430,26 +432,74 @@ type pendingItem struct {
        sync.Mutex
        batchData    internal.Buffer
        sequenceID   uint64
-       sentAt       int64
+       sentAt       time.Time
        sendRequests []interface{}
        completed    bool
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
+       if p.options.SendTimeout > 0 {
+               p.failTimeoutMessages()
+       }
+
        batchData, sequenceID, callbacks := p.batchBuilder.Flush()
        if batchData == nil {
                return
        }
 
        p.pendingQueue.Put(&pendingItem{
+               sentAt:       time.Now(),
                batchData:    batchData,
                sequenceID:   sequenceID,
-               sentAt:       time.Now().UnixNano(),
                sendRequests: callbacks,
        })
        p.cnx.WriteData(batchData)
 }
 
+func (p *partitionProducer) failTimeoutMessages() {
+       // since Closing/Closed connection couldn't be reopen, load and compare 
is safe
+       state := atomic.LoadInt32(&p.state)
+       if state == producerClosing || state == producerClosed {
+               return
+       }
+
+       item := p.pendingQueue.Peek()
+       if item == nil {
+               // pending queue is empty
+               return
+       }
+
+       pi := item.(*pendingItem)
+       if time.Since(pi.sentAt) < p.options.SendTimeout {
+               // pending messages not timeout yet
+               return
+       }
+
+       p.log.Infof("Failing %d messages", p.pendingQueue.Size())
+       for p.pendingQueue.Size() > 0 {
+               pi = p.pendingQueue.Poll().(*pendingItem)
+               pi.Lock()
+               for _, i := range pi.sendRequests {
+                       sr := i.(*sendRequest)
+                       if sr.msg != nil {
+                               size := len(sr.msg.Payload)
+                               p.publishSemaphore.Release()
+                               messagesPending.Dec()
+                               bytesPending.Sub(float64(size))
+                               publishErrors.Inc()
+                               p.log.WithError(errSendTimeout).
+                                       WithField("size", size).
+                                       WithField("properties", 
sr.msg.Properties)
+                       }
+                       if sr.callback != nil {
+                               sr.callback(nil, sr.msg, errSendTimeout)
+                       }
+               }
+               buffersPool.Put(pi.batchData)
+               pi.Unlock()
+       }
+}
+
 func (p *partitionProducer) internalFlush(fr *flushRequest) {
        p.internalFlushCurrentBatch()
 
@@ -516,10 +566,20 @@ func (p *partitionProducer) internalSendAsync(ctx 
context.Context, msg *Producer
        }
        p.options.Interceptors.BeforeSend(p, msg)
 
+       if p.options.DisableBlockIfQueueFull {
+               if !p.publishSemaphore.TryAcquire() {
+                       if callback != nil {
+                               callback(nil, msg, errSendQueueIsFull)
+                       }
+                       return
+               }
+       } else {
+               p.publishSemaphore.Acquire()
+       }
+
        messagesPending.Inc()
        bytesPending.Add(float64(len(sr.msg.Payload)))
 
-       p.publishSemaphore.Acquire()
        p.eventsChan <- sr
 }
 
@@ -553,9 +613,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
        // lock the pending item while sending the requests
        pi.Lock()
        defer pi.Unlock()
-       if pi.sentAt > 0 {
-               publishRPCLatency.Observe(float64(now-pi.sentAt) / 1.0e9)
-       }
+       publishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
        for idx, i := range pi.sendRequests {
                sr := i.(*sendRequest)
                if sr.msg != nil {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 3fe46c4..9475051 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -882,6 +882,54 @@ func TestMaxMessageSize(t *testing.T) {
        }
 }
 
+func TestSendTimeout(t *testing.T) {
+       quotaURL := adminURL + 
"/admin/v2/namespaces/public/default/backlogQuota"
+       quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
+       makeHTTPCall(t, http.MethodPost, quotaURL, fmt.Sprintf(quotaFmt, 
10*1024))
+
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "send_timeout_sub",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close() // subscribe but do nothing
+
+       noRetry := uint(0)
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                topicName,
+               SendTimeout:          2 * time.Second,
+               MaxReconnectToBroker: &noRetry,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       for i := 0; i < 10; i++ {
+               id, err := producer.Send(context.Background(), &ProducerMessage{
+                       Payload: make([]byte, 1024),
+               })
+               assert.Nil(t, err)
+               assert.NotNil(t, id)
+       }
+
+       // waiting for the backlog check
+       time.Sleep((5 + 1) * time.Second)
+
+       id, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload: make([]byte, 1024),
+       })
+       assert.NotNil(t, err)
+       assert.Nil(t, id)
+
+       makeHTTPCall(t, http.MethodDelete, quotaURL, "")
+}
+
 type noopProduceInterceptor struct{}
 
 func (noopProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {}

Reply via email to