This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 8a49505  [ISSUE #122]add msg delay to send. resolve #122 (#123)
8a49505 is described below

commit 8a49505ef050293ca3f8a572bbe114b9c48b0116
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 22 18:58:52 2019 +0800

    [ISSUE #122]add msg delay to send. resolve #122 (#123)
    
    * add msg delay to send. resolve #122
    
    * add log
---
 examples/consumer/delay/main.go | 56 +++++++++++++++++++++++++++++++++++++++
 examples/producer/delay/main.go | 58 +++++++++++++++++++++++++++++++++++++++++
 primitive/message.go            | 11 ++++++++
 3 files changed, 125 insertions(+)

diff --git a/examples/consumer/delay/main.go b/examples/consumer/delay/main.go
new file mode 100644
index 0000000..2a639f9
--- /dev/null
+++ b/examples/consumer/delay/main.go
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "time"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/consumer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+       c, _ := rocketmq.NewPushConsumer(
+               consumer.WithGroupName("testGroup"),
+               consumer.WithNameServer([]string{"127.0.0.1:9876"}),
+       )
+       err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx 
context.Context,
+               msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+
+               for _, msg := range msgs {
+                       t := time.Now().UnixNano()/int64(time.Millisecond) - 
msg.BornTimestamp
+                       fmt.Printf("Receive message[msgId=%s] %d ms later\n", 
msg.MsgId, t)
+               }
+
+               return consumer.ConsumeSuccess, nil
+       })
+       if err != nil {
+               fmt.Println(err.Error())
+       }
+       // Note: start after subscribe
+       err = c.Start()
+       if err != nil {
+               fmt.Println(err.Error())
+               os.Exit(-1)
+       }
+       time.Sleep(time.Hour)
+}
diff --git a/examples/producer/delay/main.go b/examples/producer/delay/main.go
new file mode 100644
index 0000000..059507f
--- /dev/null
+++ b/examples/producer/delay/main.go
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+
+       "github.com/apache/rocketmq-client-go"
+       "github.com/apache/rocketmq-client-go/primitive"
+       "github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+       p, _ := rocketmq.NewProducer(
+               producer.WithNameServer([]string{"127.0.0.1:9876"}),
+               producer.WithRetry(2),
+       )
+       err := p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       for i := 0; i < 10; i++ {
+               msg := &primitive.Message{
+                       Topic: "TopicTest",
+                       Body:  []byte("Hello RocketMQ Go Client!"),
+               }
+               msg.SetDelayTimeLevel(3)
+               res, err := p.SendSync(context.Background(), msg)
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               } else {
+                       fmt.Printf("send message success: result=%s\n", 
res.String())
+               }
+       }
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shundown producer error: %s", err.Error())
+       }
+}
diff --git a/primitive/message.go b/primitive/message.go
index f191c92..1da6d5a 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -19,6 +19,7 @@ package primitive
 
 import (
        "fmt"
+       "strconv"
 
        "github.com/apache/rocketmq-client-go/internal/utils"
 )
@@ -73,6 +74,16 @@ func NewMessage(topic string, body []byte) *Message {
        }
 }
 
+// SetDelayTimeLevel set message delay time to consume.
+// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 
10m 20m 30m 1h 2h
+// delay level starts from 1. for example, if we set param level=1, then the 
delay time is 1s.
+func (msg *Message) SetDelayTimeLevel(level int) {
+       if msg.Properties == nil {
+               msg.Properties = make(map[string]string)
+       }
+       msg.Properties[PropertyDelayTimeLevel] = strconv.Itoa(level)
+}
+
 func (msg *Message) String() string {
        return fmt.Sprintf("[topic=%s, body=%s, Flag=%d, Properties=%v, 
TransactionId=%s]",
                msg.Topic, string(msg.Body), msg.Flag, msg.Properties, 
msg.TransactionId)

Reply via email to