This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0422997  [ISSUE #1152] producer support message delay any time (#1158)
0422997 is described below

commit 04229975b79781ac4ca01756178ee91c28df890a
Author: comman <[email protected]>
AuthorDate: Sat Aug 31 15:19:25 2024 +0800

    [ISSUE #1152] producer support message delay any time (#1158)
---
 examples/producer/delayms/main.go | 75 +++++++++++++++++++++++++++++++++++++++
 primitive/message.go              |  6 ++++
 2 files changed, 81 insertions(+)

diff --git a/examples/producer/delayms/main.go 
b/examples/producer/delayms/main.go
new file mode 100644
index 0000000..deb6ed6
--- /dev/null
+++ b/examples/producer/delayms/main.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "os"
+       "strconv"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+var (
+       topic         string
+       endpoint      string
+       accessKey     string
+       secretKey     string
+       messageCount  int
+       delayInterval int
+)
+
+func init() {
+       flag.StringVar(&topic, "topic", "benchmark-queue-1", "topic name")
+       flag.StringVar(&endpoint, "endpoint", "127.0.0.1:9876", "endpoint")
+       flag.StringVar(&accessKey, "access_key", "******", "access key")
+       flag.StringVar(&secretKey, "secret_key", "******", "secret key")
+       flag.IntVar(&messageCount, "count", 10, "message count")
+       flag.IntVar(&delayInterval, "delay", 0, "delay interval unit: sec")
+}
+
+// Package main implements a simple producer to send message.
+func main() {
+       flag.Parse()
+
+       p, err := rocketmq.NewProducer(
+               
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
+               producer.WithRetry(2),
+       )
+       if err != nil {
+               fmt.Println(err)
+               os.Exit(1)
+       }
+       err = p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+
+       for i := 0; i < messageCount; i++ {
+               msg := &primitive.Message{
+                       Topic: topic,
+                       Body: []byte("Hello RocketMQ Go Client! " + 
strconv.Itoa(i) +
+                               " timestamp:" + 
strconv.FormatInt(time.Now().Unix(), 10)),
+               }
+               if delayInterval > 0 {
+                       
msg.WithDelayTimestamp(time.Now().Add(time.Duration(delayInterval) * 
time.Second))
+               }
+               res, err := p.SendSync(context.Background(), msg)
+
+               if err != nil {
+                       fmt.Printf("index: %v, send message error: %s\n", i+1, 
err)
+                       break
+               } else {
+                       fmt.Printf("index: %v, send message success: 
result=%s\n", i+1, res.String())
+               }
+               time.Sleep(100 * time.Millisecond)
+       }
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shutdown producer error: %s", err.Error())
+       }
+}
diff --git a/primitive/message.go b/primitive/message.go
index 9542413..2d8c504 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -38,6 +38,7 @@ const (
        PropertyWaitStoreMsgOk                 = "WAIT"
        PropertyDelayTimeLevel                 = "DELAY"
        PropertyRetryTopic                     = "RETRY_TOPIC"
+       PropertyTimerDeliverMS                 = "TIMER_DELIVER_MS"
        PropertyRealTopic                      = "REAL_TOPIC"
        PropertyRealQueueId                    = "REAL_QID"
        PropertyTransactionPrepared            = "TRAN_MSG"
@@ -176,6 +177,11 @@ func (m *Message) WithDelayTimeLevel(level int) *Message {
        return m
 }
 
+func (m *Message) WithDelayTimestamp(deliveryTimestamp time.Time) *Message {
+       timeMs := deliveryTimestamp.Unix()*1000 + 
int64(deliveryTimestamp.Nanosecond()/1000000)
+       m.WithProperty(PropertyTimerDeliverMS, strconv.FormatInt(timeMs, 10))
+       return m
+}
 func (m *Message) WithTag(tags string) *Message {
        m.WithProperty(PropertyTags, tags)
        return m

Reply via email to