This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 8a49505 [ISSUE #122]add msg delay to send. resolve #122 (#123)
8a49505 is described below
commit 8a49505ef050293ca3f8a572bbe114b9c48b0116
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 22 18:58:52 2019 +0800
[ISSUE #122]add msg delay to send. resolve #122 (#123)
* add msg delay to send. resolve #122
* add log
---
examples/consumer/delay/main.go | 56 +++++++++++++++++++++++++++++++++++++++
examples/producer/delay/main.go | 58 +++++++++++++++++++++++++++++++++++++++++
primitive/message.go | 11 ++++++++
3 files changed, 125 insertions(+)
diff --git a/examples/consumer/delay/main.go b/examples/consumer/delay/main.go
new file mode 100644
index 0000000..2a639f9
--- /dev/null
+++ b/examples/consumer/delay/main.go
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/consumer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+ c, _ := rocketmq.NewPushConsumer(
+ consumer.WithGroupName("testGroup"),
+ consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+ )
+ err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx
context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+
+ for _, msg := range msgs {
+ t := time.Now().UnixNano()/int64(time.Millisecond) -
msg.BornTimestamp
+ fmt.Printf("Receive message[msgId=%s] %d ms later\n",
msg.MsgId, t)
+ }
+
+ return consumer.ConsumeSuccess, nil
+ })
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+ // Note: start after subscribe
+ err = c.Start()
+ if err != nil {
+ fmt.Println(err.Error())
+ os.Exit(-1)
+ }
+ time.Sleep(time.Hour)
+}
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
new file mode 100644
index 0000000..059507f
--- /dev/null
+++ b/examples/producer/delay/main.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+
+ "github.com/apache/rocketmq-client-go"
+ "github.com/apache/rocketmq-client-go/primitive"
+ "github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+ p, _ := rocketmq.NewProducer(
+ producer.WithNameServer([]string{"127.0.0.1:9876"}),
+ producer.WithRetry(2),
+ )
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ for i := 0; i < 10; i++ {
+ msg := &primitive.Message{
+ Topic: "TopicTest",
+ Body: []byte("Hello RocketMQ Go Client!"),
+ }
+ msg.SetDelayTimeLevel(3)
+ res, err := p.SendSync(context.Background(), msg)
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
res.String())
+ }
+ }
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shundown producer error: %s", err.Error())
+ }
+}
diff --git a/primitive/message.go b/primitive/message.go
index f191c92..1da6d5a 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -19,6 +19,7 @@ package primitive
import (
"fmt"
+ "strconv"
"github.com/apache/rocketmq-client-go/internal/utils"
)
@@ -73,6 +74,16 @@ func NewMessage(topic string, body []byte) *Message {
}
}
+// SetDelayTimeLevel set message delay time to consume.
+// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m
10m 20m 30m 1h 2h
+// delay level starts from 1. for example, if we set param level=1, then the
delay time is 1s.
+func (msg *Message) SetDelayTimeLevel(level int) {
+ if msg.Properties == nil {
+ msg.Properties = make(map[string]string)
+ }
+ msg.Properties[PropertyDelayTimeLevel] = strconv.Itoa(level)
+}
+
func (msg *Message) String() string {
return fmt.Sprintf("[topic=%s, body=%s, Flag=%d, Properties=%v,
TransactionId=%s]",
msg.Topic, string(msg.Body), msg.Flag, msg.Properties,
msg.TransactionId)