This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a496cc  Set delay delivery when publishing a message (#160)
0a496cc is described below

commit 0a496ccc2001c80cc437ed615d7c946b3d2896dc
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jan 6 00:40:12 2020 -0800

    Set delay delivery when publishing a message (#160)
    
    * Set delay delivery when publishing a message
    
    * Fixed indentations
---
 pulsar/internal/batch_builder.go |  6 ++-
 pulsar/message.go                | 12 ++++++
 pulsar/producer_partition.go     | 19 +++++++--
 pulsar/producer_test.go          | 86 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 118 insertions(+), 5 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 33a1655..9bbf8bd 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -105,7 +105,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 
 // Add will add single message to batch.
 func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload 
[]byte,
-       callback interface{}, replicateTo []string) bool {
+       callback interface{}, replicateTo []string, deliverAt time.Time) bool {
        if replicateTo != nil && bb.numMessages != 0 {
                // If the current batch is not empty and we're trying to set 
the replication clusters,
                // then we need to force the current batch to flush and send 
the message individually
@@ -126,6 +126,10 @@ func (bb *BatchBuilder) Add(metadata proto.Message, 
sequenceID uint64, payload [
                bb.msgMetadata.ProducerName = &bb.producerName
                bb.msgMetadata.ReplicateTo = replicateTo
 
+               if deliverAt.UnixNano() > 0 {
+                       bb.msgMetadata.DeliverAtTime = 
proto.Int64(int64(TimestampMillis(deliverAt)))
+               }
+
                bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
        }
        addSingleMessageToBatch(bb.buffer, metadata, payload)
diff --git a/pulsar/message.go b/pulsar/message.go
index cbb3d63..735ca41 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -41,6 +41,18 @@ type ProducerMessage struct {
 
        // SequenceID set the sequence id to assign to the current message
        SequenceID *int64
+
+       // Request to deliver the message only after the specified relative 
delay.
+       // Note: messages are only delivered with delay when a consumer is 
consuming
+       //     through a `SubscriptionType=Shared` subscription. With other 
subscription
+       //     types, the messages will still be delivered immediately.
+       DeliverAfter time.Duration
+
+       // Deliver the message only at or after the specified absolute 
timestamp.
+       // Note: messages are only delivered with delay when a consumer is 
consuming
+       //     through a `SubscriptionType=Shared` subscription. With other 
subscription
+       //     types, the messages will still be delivered immediately.
+       DeliverAt time.Time
 }
 
 // Message abstraction used in Pulsar
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d0ea449..db6c5d9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -236,7 +236,15 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
 
        msg := request.msg
 
-       sendAsBatch := !p.options.DisableBatching && msg.ReplicationClusters == 
nil
+       deliverAt := msg.DeliverAt
+       if msg.DeliverAfter.Nanoseconds() > 0 {
+               deliverAt = time.Now().Add(msg.DeliverAfter)
+       }
+
+       sendAsBatch := !p.options.DisableBatching &&
+               msg.ReplicationClusters == nil &&
+               deliverAt.UnixNano() == 0
+
        smm := &pb.SingleMessageMetadata{
                PayloadSize: proto.Int(len(msg.Payload)),
        }
@@ -261,13 +269,15 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 
        if sendAsBatch {
-               added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request, msg.ReplicationClusters)
+               added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request,
+                       msg.ReplicationClusters, deliverAt)
                if !added {
                        // The current batch is full.. flush it and retry
                        p.internalFlushCurrentBatch()
 
                        // after flushing try again to add the current payload
-                       if ok := p.batchBuilder.Add(smm, sequenceID, 
msg.Payload, request, msg.ReplicationClusters); !ok {
+                       if ok := p.batchBuilder.Add(smm, sequenceID, 
msg.Payload, request,
+                               msg.ReplicationClusters, deliverAt); !ok {
                                p.log.WithField("size", len(msg.Payload)).
                                        WithField("sequenceID", sequenceID).
                                        WithField("properties", msg.Properties).
@@ -276,7 +286,8 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                }
        } else {
                // Send individually
-               if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request, msg.ReplicationClusters); !added {
+               if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request,
+                       msg.ReplicationClusters, deliverAt); !added {
                        p.log.WithField("size", len(msg.Payload)).
                                WithField("sequenceID", sequenceID).
                                WithField("properties", msg.Properties).
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a387d3d..5e40ec2 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -625,3 +625,89 @@ func TestBatchMessageFlushing(t *testing.T) {
 
        assert.Equal(t, 2, published, "expected to publish two messages")
 }
+
+func TestDelayRelative(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "subName",
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload:      []byte(fmt.Sprintf("test")),
+               DeliverAfter: 3 * time.Second,
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, ID)
+
+       ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
+
+       msg, err := consumer.Receive(ctx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+       canc()
+
+       ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err = consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       canc()
+}
+
+func TestDelayAbsolute(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "subName",
+               Type:             Shared,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       ID, err := producer.Send(context.Background(), &ProducerMessage{
+               Payload:   []byte(fmt.Sprintf("test")),
+               DeliverAt: time.Now().Add(3 * time.Second),
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, ID)
+
+       ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
+
+       msg, err := consumer.Receive(ctx)
+       assert.Error(t, err)
+       assert.Nil(t, msg)
+       canc()
+
+       ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
+       msg, err = consumer.Receive(ctx)
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       canc()
+}

Reply via email to