This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new afc040c  [Issue 367][producer] Send delay message individually even 
batching is enabled (#372)
afc040c is described below

commit afc040c67d9fade03e5582c4be206e2f538b48db
Author: 程飞 <[email protected]>
AuthorDate: Fri Oct 9 09:33:08 2020 +0800

    [Issue 367][producer] Send delay message individually even batching is 
enabled (#372)
    
    Fixes #367
    
    
    ### Motivation
    
    Send delay message individually even batching is enabled.
    
    
    ### Modifications
    
    1. flush batching messages immediately when a new delay message is received
    2. reset deliverAtTime metadata of `BatchBuilder`
---
 pulsar/internal/batch_builder.go |  1 +
 pulsar/producer_partition.go     |  4 ++-
 pulsar/producer_test.go          | 70 ++++++++++++++++++++++++++++++++++++++--
 3 files changed, 71 insertions(+), 4 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 18ec3c4..fe34f0c 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -153,6 +153,7 @@ func (bb *BatchBuilder) reset() {
        bb.buffer.Clear()
        bb.callbacks = []interface{}{}
        bb.msgMetadata.ReplicateTo = nil
+       bb.msgMetadata.DeliverAtTime = nil
 }
 
 // Flush all the messages buffered in the client and wait until all messages 
have been successfully persisted.
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index a16193f..870da2e 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -334,6 +334,9 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1)
        }
 
+       if !sendAsBatch {
+               p.internalFlushCurrentBatch()
+       }
        added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request,
                msg.ReplicationClusters, deliverAt)
        if !added {
@@ -437,7 +440,6 @@ func (p *partitionProducer) SendAsync(ctx context.Context, 
msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
-
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 20cfd91..2c1ff51 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -23,6 +23,7 @@ import (
        "net/http"
        "strconv"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -64,7 +65,6 @@ func TestProducerNoTopic(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: "pulsar://localhost:6650",
        })
-
        if err != nil {
                t.Fatal(err)
                return
@@ -151,13 +151,12 @@ func TestProducerAsyncSend(t *testing.T) {
 }
 
 func TestProducerCompression(t *testing.T) {
-
        type testProvider struct {
                name            string
                compressionType CompressionType
        }
 
-       var providers = []testProvider{
+       providers := []testProvider{
                {"zlib", ZLib},
                {"lz4", LZ4},
                {"zstd", ZSTD},
@@ -705,6 +704,71 @@ func TestBatchMessageFlushing(t *testing.T) {
        assert.Equal(t, 2, published, "expected to publish two messages")
 }
 
+// test for issue #367
+func TestBatchDelayMessage(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       batchingDelay := time.Second
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topic,
+               BatchingMaxPublishDelay: batchingDelay,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "subName",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       ctx := context.Background()
+       delayMsg := &ProducerMessage{
+               Payload:      []byte("delay: 3s"),
+               DeliverAfter: 3 * time.Second,
+       }
+       var delayMsgID int64
+       ch := make(chan struct{}, 2)
+       producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+               atomic.StoreInt64(&delayMsgID, id.(messageID).entryID)
+               ch <- struct{}{}
+       })
+       delayMsgPublished := false
+       select {
+       case <-ch:
+               delayMsgPublished = true
+       case <-time.After(batchingDelay):
+       }
+       assert.Equal(t, delayMsgPublished, true, "delay message is not 
published individually when batching is enabled")
+
+       noDelayMsg := &ProducerMessage{
+               Payload: []byte("no delay"),
+       }
+       var noDelayMsgID int64
+       producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+               atomic.StoreInt64(&noDelayMsgID, id.(messageID).entryID)
+       })
+       for i := 0; i < 2; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.Nil(t, err, "unexpected error occurred when recving 
message from topic")
+
+               switch msg.ID().(trackingMessageID).entryID {
+               case atomic.LoadInt64(&noDelayMsgID):
+                       assert.LessOrEqual(t, 
time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2))
+               case atomic.LoadInt64(&delayMsgID):
+                       assert.GreaterOrEqual(t, 
time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3))
+               default:
+                       t.Fatalf("got an unexpected message from topic, id:%v", 
msg.ID().Serialize())
+               }
+       }
+}
+
 func TestDelayRelative(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: serviceURL,

Reply via email to