This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0a496cc Set delay delivery when publishing a message (#160)
0a496cc is described below
commit 0a496ccc2001c80cc437ed615d7c946b3d2896dc
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jan 6 00:40:12 2020 -0800
Set delay delivery when publishing a message (#160)
* Set delay delivery when publishing a message
* Fixed indentations
---
pulsar/internal/batch_builder.go | 6 ++-
pulsar/message.go | 12 ++++++
pulsar/producer_partition.go | 19 +++++++--
pulsar/producer_test.go | 86 ++++++++++++++++++++++++++++++++++++++++
4 files changed, 118 insertions(+), 5 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 33a1655..9bbf8bd 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -105,7 +105,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
// Add will add single message to batch.
func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload
[]byte,
- callback interface{}, replicateTo []string) bool {
+ callback interface{}, replicateTo []string, deliverAt time.Time) bool {
if replicateTo != nil && bb.numMessages != 0 {
// If the current batch is not empty and we're trying to set
the replication clusters,
// then we need to force the current batch to flush and send
the message individually
@@ -126,6 +126,10 @@ func (bb *BatchBuilder) Add(metadata proto.Message,
sequenceID uint64, payload [
bb.msgMetadata.ProducerName = &bb.producerName
bb.msgMetadata.ReplicateTo = replicateTo
+ if deliverAt.UnixNano() > 0 {
+ bb.msgMetadata.DeliverAtTime =
proto.Int64(int64(TimestampMillis(deliverAt)))
+ }
+
bb.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
}
addSingleMessageToBatch(bb.buffer, metadata, payload)
diff --git a/pulsar/message.go b/pulsar/message.go
index cbb3d63..735ca41 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -41,6 +41,18 @@ type ProducerMessage struct {
// SequenceID set the sequence id to assign to the current message
SequenceID *int64
+
+ // Request to deliver the message only after the specified relative
delay.
+ // Note: messages are only delivered with delay when a consumer is
consuming
+ // through a `SubscriptionType=Shared` subscription. With other
subscription
+ // types, the messages will still be delivered immediately.
+ DeliverAfter time.Duration
+
+ // Deliver the message only at or after the specified absolute
timestamp.
+ // Note: messages are only delivered with delay when a consumer is
consuming
+ // through a `SubscriptionType=Shared` subscription. With other
subscription
+ // types, the messages will still be delivered immediately.
+ DeliverAt time.Time
}
// Message abstraction used in Pulsar
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d0ea449..db6c5d9 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -236,7 +236,15 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
msg := request.msg
- sendAsBatch := !p.options.DisableBatching && msg.ReplicationClusters ==
nil
+ deliverAt := msg.DeliverAt
+ if msg.DeliverAfter.Nanoseconds() > 0 {
+ deliverAt = time.Now().Add(msg.DeliverAfter)
+ }
+
+ sendAsBatch := !p.options.DisableBatching &&
+ msg.ReplicationClusters == nil &&
+ deliverAt.UnixNano() == 0
+
smm := &pb.SingleMessageMetadata{
PayloadSize: proto.Int(len(msg.Payload)),
}
@@ -261,13 +269,15 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
if sendAsBatch {
- added := p.batchBuilder.Add(smm, sequenceID, msg.Payload,
request, msg.ReplicationClusters)
+ added := p.batchBuilder.Add(smm, sequenceID, msg.Payload,
request,
+ msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
- if ok := p.batchBuilder.Add(smm, sequenceID,
msg.Payload, request, msg.ReplicationClusters); !ok {
+ if ok := p.batchBuilder.Add(smm, sequenceID,
msg.Payload, request,
+ msg.ReplicationClusters, deliverAt); !ok {
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
@@ -276,7 +286,8 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
}
} else {
// Send individually
- if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload,
request, msg.ReplicationClusters); !added {
+ if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload,
request,
+ msg.ReplicationClusters, deliverAt); !added {
p.log.WithField("size", len(msg.Payload)).
WithField("sequenceID", sequenceID).
WithField("properties", msg.Properties).
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index a387d3d..5e40ec2 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -625,3 +625,89 @@ func TestBatchMessageFlushing(t *testing.T) {
assert.Equal(t, 2, published, "expected to publish two messages")
}
+
+func TestDelayRelative(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "subName",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("test")),
+ DeliverAfter: 3 * time.Second,
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, ID)
+
+ ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+ canc()
+
+ ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ canc()
+}
+
+func TestDelayAbsolute(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.NoError(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "subName",
+ Type: Shared,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ID, err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("test")),
+ DeliverAt: time.Now().Add(3 * time.Second),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, ID)
+
+ ctx, canc := context.WithTimeout(context.Background(), 1*time.Second)
+
+ msg, err := consumer.Receive(ctx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+ canc()
+
+ ctx, canc = context.WithTimeout(context.Background(), 5*time.Second)
+ msg, err = consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ canc()
+}