This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 96c62c6 Update documents for release 2.0.0-alpha1 (#61)
96c62c6 is described below
commit 96c62c6882412d27e23969207f76efb71627670e
Author: wenfeng <[email protected]>
AuthorDate: Tue May 14 20:46:13 2019 +0800
Update documents for release 2.0.0-alpha1 (#61)
---
README.md | 14 ++-
changelog | 6 ++
docs/Introduction.md | 261 +++++++++++++++-------------------------------
examples/consumer/main.go | 1 +
examples/producer/main.go | 5 +-
producer/producer.go | 2 +-
6 files changed, 102 insertions(+), 187 deletions(-)
diff --git a/README.md b/README.md
index 107d646..50c5bb0 100644
--- a/README.md
+++ b/README.md
@@ -2,21 +2,25 @@
[](https://www.apache.org/licenses/LICENSE-2.0.html)
[](https://travis-ci.org/apache/rocketmq-client-go)
-* The client is using cgo to call
[rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), which has
been proven robust and widely adopted within Alibaba Group by many business
units for more than three years.
+The RocketMQ Client in pure go, the project is developing, **there is no any
guarantee in production environment**. in next versions,
+we will do our best to improve reliability, stability, usability and
performance. the API may be changed, and more features will be added.
+there are many works need to continue in this project, like unit test,
integration test, stable test, new feature,
+optimization, documents, etc. so, any contribution is very welcome. if you
want do something, please browse issue list and select one,
+or create a new issue.
+- [2.0.0 Production Ready Road
map](https://github.com/apache/rocketmq-client-go/issues/57)
+- [The alpha1 feature
list](https://github.com/apache/rocketmq-client-go/issues/54)
----------
## Features
-At present, this SDK supports
+in 2.0.0-alpha1, support:
* sending message in synchronous mode
-* sending message in orderly mode
* sending message in oneway mode
* consuming message using push model
-* consuming message using pull model
----------
## How to use
-* Step-by-step instruction are provided in [RocketMQ Go Client
Introduction](./doc/Introduction.md)
+* Step-by-step instruction are provided in [RocketMQ Go Client
Introduction](docs/Introduction.md)
* Consult [RocketMQ Quick
Start](https://rocketmq.apache.org/docs/quick-start/) to setup rocketmq broker
and nameserver.
----------
diff --git a/changelog b/changelog
index 3ca5cb3..8d96e5f 100755
--- a/changelog
+++ b/changelog
@@ -1,3 +1,9 @@
+version 2.0.0-alpha1
+ * basic feature in pure go
+ * sending message in synchronous mode
+ * sending message in oneway mode
+ * consuming message using push model
+
version 1.2.0
* support reliable synchronous sending of messages;
* support reliable orderly sending of messages;
diff --git a/docs/Introduction.md b/docs/Introduction.md
index ecc6bff..9bc22c7 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -1,190 +1,95 @@
-## Prerequisites
-
-### Install `librocketmq`
-because the project top on
[rocketmq-client-cpp](https://github.com/apache/rocketmq-client-cpp), so you
need install
-`librocketmq` first.
-
-#### include file
-```bash
-git clone https://github.com/apache/rocketmq-client-cpp
-
-# By default, CFLAGS contains `/usr/local/include`
-sudo mkdir -p /usr/local/include/rocketmq/
-sudo cp rocketmq-client-cpp/include/* /usr/local/include/rocketmq
-```
-
-#### binary library
-your could download directly or build manually:
-
-- download specific release:
[rocketmq-client-cpp-1.2.0](http://opensource-rocketmq-client-us.oss-us-west-1.aliyuncs.com/cpp-client/libindex.log)
- and move files downloaded to your `LDFLAGS` directory(if you don't know what
is `LDFLAGS`, just google it), in linux, `/usr/local/lib/` works finely.
-
-- build from source: [Build and
Install](https://github.com/apache/rocketmq-client-cpp/tree/master#build-and-install)
-
-### install
-1. Go Version: 1.10 or later
-2. `go get github.com/apache/rocketmq-client-go`
-
## How to use
-- import package
- ```
- import rocketmq "github.com/apache/rocketmq-client-go/core"
- ```
-- Send message
- ```go
- func SendMessagge(){
- producer := rocketmq.NewProducer(config)
- producer.Start()
- defer producer.Shutdown()
- fmt.Printf("Producer: %s started... \n", producer)
- for i := 0; i < 100; i++ {
- msg := fmt.Sprintf("%s-*d", *body, i)
- result, err := producer.SendMessageSync(&rocketmq.Message{Topic:
"test", Body: msg})
- if err != nil {
- fmt.Println("Error:", err)
- }
- fmt.Printf("send message: %s result: %s\n", msg, result)
- }
- }
- ```
-- Send ordered message
- ```go
- type queueSelectorByOrderID struct{}
-
- func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg
interface{}) int{
- return arg.(int) % size
- }
- type worker struct {
- p rocketmq.Producer
- leftMsgCount int64
- }
-
- func (w *worker) run() {
- selector := queueSelectorByOrderID{}
- for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
- r, err := w.p.SendMessageOrderly(
- &rocketmq.Message{Topic: *topic, Body: *body},
selector, 7 /*orderID*/, 3,
- )
- if err != nil {
- println("Send Orderly Error:", err)
- }
- fmt.Printf("send orderly result:%+v\n", r)
- }
- }
-
- func sendMessageOrderly(config *rocketmq.ProducerConfig) {
- producer, err := rocketmq.NewProducer(config)
- if err != nil {
- fmt.Println("create Producer failed, error:", err)
- return
- }
-
- producer.Start()
- defer producer.Shutdown()
-
- wg := sync.WaitGroup{}
- wg.Add(*workerCount)
-
- workers := make([]worker, *workerCount)
- for i := range workers {
- workers[i].p = producer
- workers[i].leftMsgCount = (int64)(*amount)
- }
-
- for i := range workers {
- go func(w *worker) { w.run(); wg.Done() }(&workers[i])
- }
-
- wg.Wait()
- }
- ```
-- Push Consumer
- ```go
- func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
-
- consumer, err := rocketmq.NewPushConsumer(config)
- if err != nil {
- println("create Consumer failed, error:", err)
- return
- }
+### go mod
+```
+require (
+ github.com/apache/rocketmq-client-go v2.0.0-alpha1
+)
+```
- ch := make(chan interface{})
- var count = (int64)(*amount)
- // MUST subscribe topic before consumer started.
- consumer.Subscribe("test", "*", func(msg *rocketmq.MessageExt)
rocketmq.ConsumeStatus {
- fmt.Printf("A message received: \"%s\" \n", msg.Body)
- if atomic.AddInt64(&count, -1) <= 0 {
- ch <- "quit"
- }
- return rocketmq.ConsumeSuccess
- })
+### Set Logger
+Go Client define the `Logger` interface for log output, user can specify
implementation of private.
+in default, client use `logrus`.
+```go
+rlog.SetLogger(Logger)
+```
- err = consumer.Start()
- if err != nil {
- println("consumer start failed,", err)
- return
- }
+### Send message
+#### Interface
+```go
+Producer interface {
+ Start() error
+ Shutdown() error
+ SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
+ SendOneWay(context.Context, *kernel.Message) error
+}
+```
- fmt.Printf("consumer: %s started...\n", consumer)
- <-ch
- err = consumer.Shutdown()
- if err != nil {
- println("consumer shutdown failed")
- return
- }
- println("consumer has shutdown.")
- }
- ```
-- Pull Consumer
- ```go
- func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
+#### Examples
+- create a new `Producer` instance
+```go
+opt := producer.ProducerOptions{
+ NameServerAddr: "127.0.0.1:9876",
+ RetryTimesWhenSendFailed: 2,
+}
+p := producer.NewProducer(opt)
+```
- consumer, err := rocketmq.NewPullConsumer(config)
- if err != nil {
- fmt.Printf("new pull consumer error:%s\n", err)
- return
- }
+- start the producer
+```go
+err := p.Start()
+```
- err = consumer.Start()
- if err != nil {
- fmt.Printf("start consumer error:%s\n", err)
- return
- }
- defer consumer.Shutdown()
+- send message with sync
+```go
+result, err := p.SendSync(context.Background(), &kernel.Message{
+ Topic: "test",
+ Body: []byte("Hello RocketMQ Go Client!"),
+})
- mqs := consumer.FetchSubscriptionMessageQueues(topic)
- fmt.Printf("fetch subscription mqs:%+v\n", mqs)
+// do something with result
+```
- total, offsets, now := 0, map[int]int64{}, time.Now()
+- or send message with oneway
+```go
+err := p.SendOneWay(context.Background(), &kernel.Message{
+ Topic: "test",
+ Body: []byte("Hello RocketMQ Go Client!"),
+})
+```
+Full examples: [producer](../examples/producer/main.go)
+
+### Consume Message
+alpha1 only support `PushConsumer`
+
+#### Interface
+```go
+PushConsumer interface {
+ Start() error
+ Shutdown()
+ Subscribe(topic string, selector MessageSelector,
+ f func(*ConsumeMessageContext, []*kernel.MessageExt)
(ConsumeResult, error)) error
+}
+```
- PULL:
- for {
- for _, mq := range mqs {
- pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
- total += len(pr.Messages)
- fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
+#### Usage
+- Create a `PushConsumer` instance
+```go
+c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+ NameServerAddr: "127.0.0.1:9876",
+ ConsumerModel: consumer.Clustering,
+ FromWhere: consumer.ConsumeFromFirstOffset,
+})
+```
- switch pr.Status {
- case rocketmq.PullNoNewMsg:
- break PULL
- case rocketmq.PullFound:
- fallthrough
- case rocketmq.PullNoMatchedMsg:
- fallthrough
- case rocketmq.PullOffsetIllegal:
- offsets[mq.ID] = pr.NextBeginOffset
- case rocketmq.PullBrokerTimeout:
- fmt.Println("broker timeout occur")
- }
- }
- }
+- Subscribe a topic(only support one topic now), and define your consuming
function
+```go
+err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx
*consumer.ConsumeMessageContext,
+ msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+ fmt.Println(msgs)
+ return consumer.ConsumeSuccess, nil
+})
+```
+- start the consumer(**NOTE: MUST after subscribe**)
- var timePerMessage time.Duration
- if total > 0 {
- timePerMessage = time.Since(now) / time.Duration(total)
- }
- fmt.Printf("total message:%d, per message time:%d\n", total,
timePerMessage)
- }
- ```
-- [Full example](../examples)
\ No newline at end of file
+Full examples: [consumer](../examples/consumer/main.go)
\ No newline at end of file
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index 54410d6..53e9cb5 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -39,6 +39,7 @@ func main() {
if err != nil {
fmt.Println(err.Error())
}
+ // Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
diff --git a/examples/producer/main.go b/examples/producer/main.go
index bc407f7..83bd127 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -37,7 +37,7 @@ func main() {
os.Exit(1)
}
for i := 0; i < 1000; i++ {
- err := p.SendOneWay(context.Background(), &kernel.Message{
+ res, err := p.SendSync(context.Background(), &kernel.Message{
Topic: "test",
Body: []byte("Hello RocketMQ Go Client!"),
})
@@ -45,8 +45,7 @@ func main() {
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
- fmt.Printf("send success: %d\n", i)
- //fmt.Printf("send message success: result=%s\n",
res.String())
+ fmt.Printf("send message success: result=%s\n",
res.String())
}
}
err = p.Shutdown()
diff --git a/producer/producer.go b/producer/producer.go
index 28813f7..439c585 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -144,7 +144,7 @@ func (p *defaultProducer) SendOneWay(ctx context.Context,
msg *kernel.Message) e
return fmt.Errorf("topic=%s route info not found",
mq.Topic)
}
- _err := p.client.InvokeOneWay(addr, p.buildSendRequest(mq,
msg), 3*time.Second)
+ _err := p.client.InvokeOneWay(addr, p.buildSendRequest(mq,
msg), 3*time.Second)
if _err != nil {
err = _err
continue