This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0422997 [ISSUE #1152] producer support message delay any time (#1158)
0422997 is described below
commit 04229975b79781ac4ca01756178ee91c28df890a
Author: comman <[email protected]>
AuthorDate: Sat Aug 31 15:19:25 2024 +0800
[ISSUE #1152] producer support message delay any time (#1158)
---
examples/producer/delayms/main.go | 75 +++++++++++++++++++++++++++++++++++++++
primitive/message.go | 6 ++++
2 files changed, 81 insertions(+)
diff --git a/examples/producer/delayms/main.go
b/examples/producer/delayms/main.go
new file mode 100644
index 0000000..deb6ed6
--- /dev/null
+++ b/examples/producer/delayms/main.go
@@ -0,0 +1,75 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "os"
+ "strconv"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+var (
+ topic string
+ endpoint string
+ accessKey string
+ secretKey string
+ messageCount int
+ delayInterval int
+)
+
+func init() {
+ flag.StringVar(&topic, "topic", "benchmark-queue-1", "topic name")
+ flag.StringVar(&endpoint, "endpoint", "127.0.0.1:9876", "endpoint")
+ flag.StringVar(&accessKey, "access_key", "******", "access key")
+ flag.StringVar(&secretKey, "secret_key", "******", "secret key")
+ flag.IntVar(&messageCount, "count", 10, "message count")
+ flag.IntVar(&delayInterval, "delay", 0, "delay interval unit: sec")
+}
+
+// Package main implements a simple producer to send message.
+func main() {
+ flag.Parse()
+
+ p, err := rocketmq.NewProducer(
+
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{endpoint})),
+ producer.WithRetry(2),
+ )
+ if err != nil {
+ fmt.Println(err)
+ os.Exit(1)
+ }
+ err = p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+
+ for i := 0; i < messageCount; i++ {
+ msg := &primitive.Message{
+ Topic: topic,
+ Body: []byte("Hello RocketMQ Go Client! " +
strconv.Itoa(i) +
+ " timestamp:" +
strconv.FormatInt(time.Now().Unix(), 10)),
+ }
+ if delayInterval > 0 {
+
msg.WithDelayTimestamp(time.Now().Add(time.Duration(delayInterval) *
time.Second))
+ }
+ res, err := p.SendSync(context.Background(), msg)
+
+ if err != nil {
+ fmt.Printf("index: %v, send message error: %s\n", i+1,
err)
+ break
+ } else {
+ fmt.Printf("index: %v, send message success:
result=%s\n", i+1, res.String())
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shutdown producer error: %s", err.Error())
+ }
+}
diff --git a/primitive/message.go b/primitive/message.go
index 9542413..2d8c504 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -38,6 +38,7 @@ const (
PropertyWaitStoreMsgOk = "WAIT"
PropertyDelayTimeLevel = "DELAY"
PropertyRetryTopic = "RETRY_TOPIC"
+ PropertyTimerDeliverMS = "TIMER_DELIVER_MS"
PropertyRealTopic = "REAL_TOPIC"
PropertyRealQueueId = "REAL_QID"
PropertyTransactionPrepared = "TRAN_MSG"
@@ -176,6 +177,11 @@ func (m *Message) WithDelayTimeLevel(level int) *Message {
return m
}
+func (m *Message) WithDelayTimestamp(deliveryTimestamp time.Time) *Message {
+ timeMs := deliveryTimestamp.Unix()*1000 +
int64(deliveryTimestamp.Nanosecond()/1000000)
+ m.WithProperty(PropertyTimerDeliverMS, strconv.FormatInt(timeMs, 10))
+ return m
+}
func (m *Message) WithTag(tags string) *Message {
m.WithProperty(PropertyTags, tags)
return m