This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new db46545  Modify examples
db46545 is described below

commit db465456094affddc5fb862eb7afbaf85b46d53b
Author: ShannonDing <[email protected]>
AuthorDate: Fri Nov 1 11:34:16 2019 +0800

    Modify examples
---
 core/{cfuns.go => consumer_callback.go}            |  0
 core/error.go                                      |  8 ++-
 ...ransaction_funcs.go => transaction_callback.go} |  0
 demos/main.go                                      | 29 --------
 demos/producer.go                                  | 67 -------------------
 demos/producer_orderly.go                          | 62 -----------------
 demos/push_consumer.go                             | 78 ----------------------
 examples/main.go                                   | 65 +++---------------
 {demos => examples}/orderly_push_consumer.go       |  0
 examples/producer.go                               | 31 +++++++--
 examples/producer_orderly.go                       | 64 +++++++-----------
 examples/pull_consumer.go                          | 74 --------------------
 examples/push_consumer.go                          | 25 ++++++-
 {demos => examples}/transaction_producer.go        |  0
 14 files changed, 86 insertions(+), 417 deletions(-)

diff --git a/core/cfuns.go b/core/consumer_callback.go
similarity index 100%
rename from core/cfuns.go
rename to core/consumer_callback.go
diff --git a/core/error.go b/core/error.go
index 8986660..f5871f7 100644
--- a/core/error.go
+++ b/core/error.go
@@ -34,10 +34,12 @@ const (
        ErrSendSyncFailed          = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
        ErrSendOnewayFailed        = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
        ErrSendOrderlyFailed       = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
-       ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
+       ErrSendTransactionFailed   = 
rmqError(C.PRODUCER_SEND_TRANSACTION_FAILED)
+       ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_START_FAILED)
        ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
        ErrFetchMQFailed           = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
        ErrFetchMessageFailed      = 
rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
+       ErrNotSupportNow      = rmqError(C.NOT_SUPPORT_NOW)
 )
 
 func (e rmqError) Error() string {
@@ -54,6 +56,8 @@ func (e rmqError) Error() string {
                return "send message with orderly failed"
        case ErrSendOnewayFailed:
                return "send message with oneway failed"
+       case ErrSendTransactionFailed:
+               return "send transaction message failed"
        case ErrPushConsumerStartFailed:
                return "start push-consumer failed"
        case ErrPullConsumerStartFailed:
@@ -62,6 +66,8 @@ func (e rmqError) Error() string {
                return "fetch MessageQueue failed"
        case ErrFetchMessageFailed:
                return "fetch Message failed"
+       case ErrNotSupportNow:
+               return "this function is not support"
        default:
                return fmt.Sprintf("unknow error: %v", int(e))
        }
diff --git a/core/transaction_funcs.go b/core/transaction_callback.go
similarity index 100%
rename from core/transaction_funcs.go
rename to core/transaction_callback.go
diff --git a/demos/main.go b/demos/main.go
deleted file mode 100644
index 1e325e9..0000000
--- a/demos/main.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-func main() {
-       //run producer
-       main0()
-       //run consumer
-       main1()
-       //run orderly producer
-       main2()
-       //run orderly consumer
-       main3()
-}
diff --git a/demos/producer.go b/demos/producer.go
deleted file mode 100644
index ad08c12..0000000
--- a/demos/producer.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/core"
-)
-
-// Change to main if you want to run it directly
-func main0() {
-       pConfig := &rocketmq.ProducerConfig{
-               ClientConfig: rocketmq.ClientConfig{
-                       GroupID:    "GID_XXXXXXXXXXXX",
-                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
-                       Credentials: &rocketmq.SessionCredentials{
-                               AccessKey: "Your Access Key",
-                               SecretKey: "Your Secret Key",
-                               Channel:   "ALIYUN/OtherChannel",
-                       },
-               },
-               //Set to Common Producer as default.
-               ProducerModel: rocketmq.CommonProducer,
-       }
-       sendMessage(pConfig)
-}
-func sendMessage(config *rocketmq.ProducerConfig) {
-       producer, err := rocketmq.NewProducer(config)
-
-       if err != nil {
-               fmt.Println("create common producer failed, error:", err)
-               return
-       }
-
-       err = producer.Start()
-       if err != nil {
-               fmt.Println("start common producer error", err)
-               return
-       }
-       defer producer.Shutdown()
-
-       fmt.Printf("Common producer: %s started... \n", producer)
-       for i := 0; i < 10; i++ {
-               msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
-               result, err := 
producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: 
msg})
-               if err != nil {
-                       fmt.Println("Error:", err)
-               }
-               fmt.Printf("send message: %s result: %s\n", msg, result)
-       }
-       fmt.Println("shutdown common producer.")
-}
diff --git a/demos/producer_orderly.go b/demos/producer_orderly.go
deleted file mode 100644
index e379a93..0000000
--- a/demos/producer_orderly.go
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/core"
-       "time"
-)
-
-// Change to main if you want to run it directly
-func main2() {
-       pConfig := &rocketmq.ProducerConfig{
-               ClientConfig: rocketmq.ClientConfig{
-                       GroupID:    "GID_XXXXXXXXXXXX",
-                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
-                       Credentials: &rocketmq.SessionCredentials{
-                               AccessKey: "Your Access Key",
-                               SecretKey: "Your Secret Key",
-                               Channel:   "ALIYUN/OtherChannel",
-                       },
-               },
-               ProducerModel: rocketmq.OrderlyProducer,
-       }
-       sendMessageOrderlyByShardingKey(pConfig)
-}
-func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) {
-       producer, err := rocketmq.NewProducer(config)
-       if err != nil {
-               fmt.Println("create Producer failed, error:", err)
-               return
-       }
-
-       producer.Start()
-       defer producer.Shutdown()
-       for i := 0; i < 1000; i++ {
-               msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i)
-               r, err := producer.SendMessageOrderlyByShardingKey(
-                       &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", 
Body: msg}, "ShardingKey" /*orderID*/)
-               if err != nil {
-                       println("Send Orderly Message Error:", err)
-               }
-               fmt.Printf("send orderly message result:%+v\n", r)
-               time.Sleep(time.Duration(1) * time.Second)
-       }
-
-}
diff --git a/demos/push_consumer.go b/demos/push_consumer.go
deleted file mode 100644
index 98a1ddf..0000000
--- a/demos/push_consumer.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "github.com/apache/rocketmq-client-go/core"
-       "sync/atomic"
-)
-
-// Change to main if you want to run it directly
-func main1() {
-       pConfig := &rocketmq.PushConsumerConfig{
-               ClientConfig: rocketmq.ClientConfig{
-                       GroupID:    "GID_XXXXXXXXXXXX",
-                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
-                       Credentials: &rocketmq.SessionCredentials{
-                               AccessKey: "Your Access Key",
-                               SecretKey: "Your Secret Key",
-                               Channel:   "ALIYUN/OtherChannel",
-                       },
-               },
-               Model:         rocketmq.Clustering,
-               ConsumerModel: rocketmq.CoCurrently,
-       }
-       consumeWithPush(pConfig)
-}
-func consumeWithPush(config *rocketmq.PushConsumerConfig) {
-
-       consumer, err := rocketmq.NewPushConsumer(config)
-       if err != nil {
-               println("create Consumer failed, error:", err)
-               return
-       }
-
-       ch := make(chan interface{})
-       var count = (int64)(1000000)
-       // ********************************************
-       // MUST subscribe topic before consumer started.
-       // *********************************************
-       consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg 
*rocketmq.MessageExt) rocketmq.ConsumeStatus {
-               fmt.Printf("A message received, MessageID:%s, Body:%s \n", 
msg.MessageID, msg.Body)
-               if atomic.AddInt64(&count, -1) <= 0 {
-                       ch <- "quit"
-               }
-               return rocketmq.ConsumeSuccess
-       })
-
-       err = consumer.Start()
-       if err != nil {
-               println("consumer start failed,", err)
-               return
-       }
-
-       fmt.Printf("consumer: %s started...\n", consumer)
-       <-ch
-       err = consumer.Shutdown()
-       if err != nil {
-               println("consumer shutdown failed")
-               return
-       }
-       println("consumer has shutdown.")
-}
diff --git a/examples/main.go b/examples/main.go
index b3018f0..1e325e9 100644
--- a/examples/main.go
+++ b/examples/main.go
@@ -17,62 +17,13 @@
 
 package main
 
-import (
-       "github.com/apache/rocketmq-client-go/core"
-       "gopkg.in/alecthomas/kingpin.v2"
-       "os"
-)
-
-var (
-       rmq     = kingpin.New("rocketmq", "RocketMQ cmd tools")
-       namesrv = rmq.Flag("namesrv", "NameServer 
address.").Default("localhost:9876").Short('n').String()
-       topic   = rmq.Flag("topic", "topic 
name.").Short('t').Required().String()
-       gid     = rmq.Flag("groupId", "group 
Id").Short('g').Default("testGroup").String()
-       amount  = rmq.Flag("amount", "how many message to produce or 
consume").Default("64").Short('a').Int()
-
-       produce     = rmq.Command("produce", "send messages to RocketMQ")
-       body        = produce.Flag("body", "message 
body").Short('b').Required().String()
-       workerCount = produce.Flag("workerCount", "works of send message with 
orderly").Default("1").Short('w').Int()
-       orderly     = produce.Flag("orderly", "send msg 
orderly").Short('o').Bool()
-
-       consume = rmq.Command("consume", "consumes message from RocketMQ")
-)
-
 func main() {
-       switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
-       case produce.FullCommand():
-               pConfig := &rocketmq.ProducerConfig{ClientConfig: 
rocketmq.ClientConfig{
-                       GroupID:    "MQ_INST_xxxxxxx%GID",
-                       NameServer: 
"http://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:80";,
-                       Credentials: &rocketmq.SessionCredentials{
-                               AccessKey: "xxxxxx",
-                               SecretKey: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
-                               Channel:   "mq-channel",
-                       },
-                       LogC: &rocketmq.LogConfig{
-                               Path:     "example",
-                               FileSize: 64 * 1 << 10,
-                               FileNum:  1,
-                               Level:    rocketmq.LogLevelDebug,
-                       },
-               }}
-               if *orderly {
-                       sendMessageOrderly(pConfig)
-               } else {
-                       sendMessage(pConfig)
-               }
-       case consume.FullCommand():
-               cConfig := &rocketmq.PushConsumerConfig{ClientConfig: 
rocketmq.ClientConfig{
-                       GroupID:    *gid,
-                       NameServer: *namesrv,
-                       LogC: &rocketmq.LogConfig{
-                               Path:     "example",
-                               FileSize: 64 * 1 << 10,
-                               FileNum:  1,
-                               Level:    rocketmq.LogLevelInfo,
-                       },
-               }, Model: rocketmq.Clustering}
-
-               consumeWithPush(cConfig)
-       }
+       //run producer
+       main0()
+       //run consumer
+       main1()
+       //run orderly producer
+       main2()
+       //run orderly consumer
+       main3()
 }
diff --git a/demos/orderly_push_consumer.go b/examples/orderly_push_consumer.go
similarity index 100%
rename from demos/orderly_push_consumer.go
rename to examples/orderly_push_consumer.go
diff --git a/examples/producer.go b/examples/producer.go
index e1c4d2f..ad08c12 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -22,29 +22,46 @@ import (
        "github.com/apache/rocketmq-client-go/core"
 )
 
+// Change to main if you want to run it directly
+func main0() {
+       pConfig := &rocketmq.ProducerConfig{
+               ClientConfig: rocketmq.ClientConfig{
+                       GroupID:    "GID_XXXXXXXXXXXX",
+                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
+                       Credentials: &rocketmq.SessionCredentials{
+                               AccessKey: "Your Access Key",
+                               SecretKey: "Your Secret Key",
+                               Channel:   "ALIYUN/OtherChannel",
+                       },
+               },
+               //Set to Common Producer as default.
+               ProducerModel: rocketmq.CommonProducer,
+       }
+       sendMessage(pConfig)
+}
 func sendMessage(config *rocketmq.ProducerConfig) {
        producer, err := rocketmq.NewProducer(config)
 
        if err != nil {
-               fmt.Println("create Producer failed, error:", err)
+               fmt.Println("create common producer failed, error:", err)
                return
        }
 
        err = producer.Start()
        if err != nil {
-               fmt.Println("start producer error", err)
+               fmt.Println("start common producer error", err)
                return
        }
        defer producer.Shutdown()
 
-       fmt.Printf("Producer: %s started... \n", producer)
-       for i := 0; i < *amount; i++ {
-               msg := fmt.Sprintf("%s-%d", *body, i)
-               result, err := 
producer.SendMessageSync(&rocketmq.Message{Topic: *topic, Body: msg})
+       fmt.Printf("Common producer: %s started... \n", producer)
+       for i := 0; i < 10; i++ {
+               msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
+               result, err := 
producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: 
msg})
                if err != nil {
                        fmt.Println("Error:", err)
                }
                fmt.Printf("send message: %s result: %s\n", msg, result)
        }
-       fmt.Println("shutdown producer.")
+       fmt.Println("shutdown common producer.")
 }
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
index f88c3d5..e379a93 100644
--- a/examples/producer_orderly.go
+++ b/examples/producer_orderly.go
@@ -19,37 +19,27 @@ package main
 
 import (
        "fmt"
-       "sync"
-       "sync/atomic"
-
        "github.com/apache/rocketmq-client-go/core"
+       "time"
 )
 
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg 
interface{}) int {
-       return arg.(int) % size
-}
-
-type worker struct {
-       p            rocketmq.Producer
-       leftMsgCount int64
-}
-
-func (w *worker) run() {
-       selector := queueSelectorByOrderID{}
-       for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
-               r, err := w.p.SendMessageOrderly(
-                       &rocketmq.Message{Topic: *topic, Body: *body}, 
selector, 7 /*orderID*/, 3,
-               )
-               if err != nil {
-                       println("Send Orderly Error:", err)
-               }
-               fmt.Printf("send orderly result:%+v\n", r)
+// Change to main if you want to run it directly
+func main2() {
+       pConfig := &rocketmq.ProducerConfig{
+               ClientConfig: rocketmq.ClientConfig{
+                       GroupID:    "GID_XXXXXXXXXXXX",
+                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
+                       Credentials: &rocketmq.SessionCredentials{
+                               AccessKey: "Your Access Key",
+                               SecretKey: "Your Secret Key",
+                               Channel:   "ALIYUN/OtherChannel",
+                       },
+               },
+               ProducerModel: rocketmq.OrderlyProducer,
        }
+       sendMessageOrderlyByShardingKey(pConfig)
 }
-
-func sendMessageOrderly(config *rocketmq.ProducerConfig) {
+func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) {
        producer, err := rocketmq.NewProducer(config)
        if err != nil {
                fmt.Println("create Producer failed, error:", err)
@@ -58,19 +48,15 @@ func sendMessageOrderly(config *rocketmq.ProducerConfig) {
 
        producer.Start()
        defer producer.Shutdown()
-
-       wg := sync.WaitGroup{}
-       wg.Add(*workerCount)
-
-       workers := make([]worker, *workerCount)
-       for i := range workers {
-               workers[i].p = producer
-               workers[i].leftMsgCount = (int64)(*amount)
-       }
-
-       for i := range workers {
-               go func(w *worker) { w.run(); wg.Done() }(&workers[i])
+       for i := 0; i < 1000; i++ {
+               msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i)
+               r, err := producer.SendMessageOrderlyByShardingKey(
+                       &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", 
Body: msg}, "ShardingKey" /*orderID*/)
+               if err != nil {
+                       println("Send Orderly Message Error:", err)
+               }
+               fmt.Printf("send orderly message result:%+v\n", r)
+               time.Sleep(time.Duration(1) * time.Second)
        }
 
-       wg.Wait()
 }
diff --git a/examples/pull_consumer.go b/examples/pull_consumer.go
deleted file mode 100644
index de38048..0000000
--- a/examples/pull_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-       "fmt"
-       "time"
-
-       "github.com/apache/rocketmq-client-go/core"
-)
-
-func consumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-
-       consumer, err := rocketmq.NewPullConsumer(config)
-       if err != nil {
-               fmt.Printf("new pull consumer error:%s\n", err)
-               return
-       }
-
-       err = consumer.Start()
-       if err != nil {
-               fmt.Printf("start consumer error:%s\n", err)
-               return
-       }
-       defer consumer.Shutdown()
-
-       mqs := consumer.FetchSubscriptionMessageQueues(topic)
-       fmt.Printf("fetch subscription mqs:%+v\n", mqs)
-
-       total, offsets, now := 0, map[int]int64{}, time.Now()
-
-PULL:
-       for {
-               for _, mq := range mqs {
-                       pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
-                       total += len(pr.Messages)
-                       fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
-
-                       switch pr.Status {
-                       case rocketmq.PullNoNewMsg:
-                               break PULL
-                       case rocketmq.PullFound:
-                               fallthrough
-                       case rocketmq.PullNoMatchedMsg:
-                               fallthrough
-                       case rocketmq.PullOffsetIllegal:
-                               offsets[mq.ID] = pr.NextBeginOffset
-                       case rocketmq.PullBrokerTimeout:
-                               fmt.Println("broker timeout occur")
-                       }
-               }
-       }
-
-       var timePerMessage time.Duration
-       if total > 0 {
-               timePerMessage = time.Since(now) / time.Duration(total)
-       }
-       fmt.Printf("total message:%d, per message time:%d\n", total, 
timePerMessage)
-}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index 3f0e34a..98a1ddf 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -23,6 +23,23 @@ import (
        "sync/atomic"
 )
 
+// Change to main if you want to run it directly
+func main1() {
+       pConfig := &rocketmq.PushConsumerConfig{
+               ClientConfig: rocketmq.ClientConfig{
+                       GroupID:    "GID_XXXXXXXXXXXX",
+                       NameServer: "http://XXXXXXXXXXXXXXXXXX:80";,
+                       Credentials: &rocketmq.SessionCredentials{
+                               AccessKey: "Your Access Key",
+                               SecretKey: "Your Secret Key",
+                               Channel:   "ALIYUN/OtherChannel",
+                       },
+               },
+               Model:         rocketmq.Clustering,
+               ConsumerModel: rocketmq.CoCurrently,
+       }
+       consumeWithPush(pConfig)
+}
 func consumeWithPush(config *rocketmq.PushConsumerConfig) {
 
        consumer, err := rocketmq.NewPushConsumer(config)
@@ -32,10 +49,12 @@ func consumeWithPush(config *rocketmq.PushConsumerConfig) {
        }
 
        ch := make(chan interface{})
-       var count = (int64)(*amount)
+       var count = (int64)(1000000)
+       // ********************************************
        // MUST subscribe topic before consumer started.
-       consumer.Subscribe(*topic, "*", func(msg *rocketmq.MessageExt) 
rocketmq.ConsumeStatus {
-               fmt.Printf("A message received: \"%s\" \n", msg.Body)
+       // *********************************************
+       consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg 
*rocketmq.MessageExt) rocketmq.ConsumeStatus {
+               fmt.Printf("A message received, MessageID:%s, Body:%s \n", 
msg.MessageID, msg.Body)
                if atomic.AddInt64(&count, -1) <= 0 {
                        ch <- "quit"
                }
diff --git a/demos/transaction_producer.go b/examples/transaction_producer.go
similarity index 100%
rename from demos/transaction_producer.go
rename to examples/transaction_producer.go

Reply via email to