This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 45cb340 add producer async method. resolve #98 (#100)
45cb340 is described below
commit 45cb34063443322ab550790eb15c5236bbfa335c
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 8 18:54:55 2019 +0800
add producer async method. resolve #98 (#100)
---
examples/producer/async/main.go | 65 +++++++++++++++++++++++++++
internal/kernel/client.go | 7 +++
internal/producer/producer.go | 63 +++++++++++++++++++++++----
internal/remote/future.go | 82 +++++++++++++++++++++++++++++++++++
internal/remote/remote_client.go | 67 ++++------------------------
internal/remote/remote_client_test.go | 49 +++++++++++++++++++++
primitive/ctx.go | 1 +
7 files changed, 268 insertions(+), 66 deletions(-)
diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
new file mode 100644
index 0000000..3888e32
--- /dev/null
+++ b/examples/producer/async/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "strconv"
+ "sync"
+
+ "github.com/apache/rocketmq-client-go/internal/producer"
+ "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// Package main implements a async producer to send message.
+func main() {
+ nameServerAddr := []string{"127.0.0.1:9876"}
+ p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
+ err := p.Start()
+ if err != nil {
+ fmt.Printf("start producer error: %s", err.Error())
+ os.Exit(1)
+ }
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ err := p.SendAsync(context.Background(), &primitive.Message{
+ Topic: "TopicTest",
+ Body: []byte("Hello RocketMQ Go Client!"),
+ Properties: map[string]string{"id": strconv.Itoa(i)},
+ }, func(ctx context.Context, result *primitive.SendResult, e
error) {
+ if e != nil {
+ fmt.Printf("receive message error: %s\n", err)
+ } else {
+ fmt.Printf("send message success: result=%s\n",
result.String())
+ }
+ wg.Done()
+ })
+
+ if err != nil {
+ fmt.Printf("send message error: %s\n", err)
+ }
+ }
+ wg.Wait()
+ err = p.Shutdown()
+ if err != nil {
+ fmt.Printf("shundown producer error: %s", err.Error())
+ }
+}
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 94f45c9..5aa6849 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -178,6 +178,13 @@ func (c *RMQClient) InvokeSync(addr string, request
*remote.RemotingCommand,
return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
}
+func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+ timeoutMillis time.Duration, f func(*remote.RemotingCommand, error))
error {
+ return c.remoteClient.InvokeAsync(addr, request, timeoutMillis,
func(future *remote.ResponseFuture) {
+ f(future.ResponseCommand, future.Err)
+ })
+}
+
func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error {
return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 5be553e..f7c87d1 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -31,10 +31,16 @@ import (
"github.com/pkg/errors"
)
+var(
+ ErrTopicEmpty = errors.New("topic is nil")
+ ErrMessageEmpty = errors.New("message is nil")
+)
+
type Producer interface {
Start() error
Shutdown() error
SendSync(context.Context, *primitive.Message) (*primitive.SendResult,
error)
+ SendAsync(context.Context, *primitive.Message, func(context.Context,
*primitive.SendResult, error)) error
SendOneWay(context.Context, *primitive.Message) error
}
@@ -108,13 +114,20 @@ func (p *defaultProducer) Shutdown() error {
return nil
}
-func (p *defaultProducer) SendSync(ctx context.Context, msg
*primitive.Message) (*primitive.SendResult, error) {
+func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
if msg == nil {
- return nil, errors.New("message is nil")
+ return errors.New("message is nil")
}
if msg.Topic == "" {
- return nil, errors.New("topic is nil")
+ return errors.New("topic is nil")
+ }
+ return nil
+}
+
+func (p *defaultProducer) SendSync(ctx context.Context, msg
*primitive.Message) (*primitive.SendResult, error) {
+ if err := p.checkMsg(msg); err != nil {
+ return nil, err
}
resp := new(primitive.SendResult)
@@ -165,13 +178,47 @@ func (p *defaultProducer) sendSync(ctx context.Context,
msg *primitive.Message,
return err
}
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg
*primitive.Message) error {
- if msg == nil {
- return errors.New("message is nil")
+func (p *defaultProducer) SendAsync(ctx context.Context, msg
*primitive.Message, h func(context.Context, *primitive.SendResult, error))
error {
+ if err := p.checkMsg(msg); err != nil {
+ return err
}
- if msg.Topic == "" {
- return errors.New("topic is nil")
+ if p.interceptor != nil {
+ primitive.WithMehod(ctx, primitive.SendAsync)
+
+ return p.interceptor(ctx, msg, nil, func(ctx context.Context,
req, reply interface{}) error {
+ return p.sendAsync(ctx, msg, h)
+ })
+ }
+ return p.sendAsync(ctx, msg, h)
+}
+
+func (p *defaultProducer) sendAsync(ctx context.Context, msg
*primitive.Message, h func(context.Context, *primitive.SendResult, error))
error {
+
+ mq := p.selectMessageQueue(msg.Topic)
+ if mq == nil {
+ return errors.Errorf("the topic=%s route info not found",
msg.Topic)
+ }
+
+ addr := kernel.FindBrokerAddrByName(mq.BrokerName)
+ if addr == "" {
+ return errors.Errorf("topic=%s route info not found", mq.Topic)
+ }
+
+ return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg),
3*time.Second, func(command *remote.RemotingCommand, e error) {
+ if e != nil {
+ h(ctx, nil, e)
+ return
+ }
+ resp := new(primitive.SendResult)
+ p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
+ h(ctx, resp, e)
+ })
+}
+
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg
*primitive.Message) error {
+ if err := p.checkMsg(msg); err != nil {
+ return err
}
if p.interceptor != nil {
diff --git a/internal/remote/future.go b/internal/remote/future.go
new file mode 100644
index 0000000..8f604cc
--- /dev/null
+++ b/internal/remote/future.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remote
+
+import (
+ "sync"
+ "time"
+)
+
+// ResponseFuture
+type ResponseFuture struct {
+ ResponseCommand *RemotingCommand
+ SendRequestOK bool
+ Err error
+ Opaque int32
+ TimeoutMillis time.Duration
+ callback func(*ResponseFuture)
+ BeginTimestamp int64
+ Done chan bool
+ callbackOnce sync.Once
+}
+
+// NewResponseFuture create ResponseFuture with opaque, timeout and callback
+func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback
func(*ResponseFuture)) *ResponseFuture {
+ return &ResponseFuture{
+ Opaque: opaque,
+ Done: make(chan bool),
+ TimeoutMillis: timeoutMillis,
+ callback: callback,
+ BeginTimestamp: time.Now().Unix() * 1000,
+ }
+}
+
+func (r *ResponseFuture) executeInvokeCallback() {
+ r.callbackOnce.Do(func() {
+ if r.callback != nil {
+ r.callback(r)
+ }
+ })
+}
+
+func (r *ResponseFuture) isTimeout() bool {
+ diff := time.Now().Unix()*1000 - r.BeginTimestamp
+ return diff > int64(r.TimeoutMillis)
+}
+
+func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+ var (
+ cmd *RemotingCommand
+ err error
+ )
+ timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
+ for {
+ select {
+ case <-r.Done:
+ cmd, err = r.ResponseCommand, r.Err
+ goto done
+ case <-timer.C:
+ err = ErrRequestTimeout
+ r.Err = err
+ goto done
+ }
+ }
+done:
+ timer.Stop()
+ return cmd, err
+}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index cf71134..fde17e4 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -35,64 +35,6 @@ var (
connectionLocker sync.Mutex
)
-//ResponseFuture for
-type ResponseFuture struct {
- ResponseCommand *RemotingCommand
- SendRequestOK bool
- Err error
- Opaque int32
- TimeoutMillis time.Duration
- callback func(*ResponseFuture)
- BeginTimestamp int64
- Done chan bool
- callbackOnce sync.Once
-}
-
-//NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback
func(*ResponseFuture)) *ResponseFuture {
- return &ResponseFuture{
- Opaque: opaque,
- Done: make(chan bool),
- TimeoutMillis: timeoutMillis,
- callback: callback,
- BeginTimestamp: time.Now().Unix() * 1000,
- }
-}
-
-func (r *ResponseFuture) executeInvokeCallback() {
- r.callbackOnce.Do(func() {
- if r.callback != nil {
- r.callback(r)
- }
- })
-}
-
-func (r *ResponseFuture) isTimeout() bool {
- diff := time.Now().Unix()*1000 - r.BeginTimestamp
- return diff > int64(r.TimeoutMillis)
-}
-
-func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
- var (
- cmd *RemotingCommand
- err error
- )
- timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
- for {
- select {
- case <-r.Done:
- cmd, err = r.ResponseCommand, r.Err
- goto done
- case <-timer.C:
- err = ErrRequestTimeout
- goto done
- }
- }
-done:
- timer.Stop()
- return cmd, err
-}
-
type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
type TcpOption struct {
@@ -116,6 +58,7 @@ func (c *RemotingClient) RegisterRequestFunc(code int16, f
ClientRequestFunc) {
c.processors[code] = f
}
+// TODO: merge sync and async model. sync should run on async model by
blocking on chan
func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand,
timeoutMillis time.Duration) (*RemotingCommand, error) {
conn, err := c.connect(addr)
if err != nil {
@@ -132,6 +75,7 @@ func (c *RemotingClient) InvokeSync(addr string, request
*RemotingCommand, timeo
return resp.waitResponse()
}
+// InvokeAsync send request witout blocking, just return immediately.
func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand,
timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
conn, err := c.connect(addr)
if err != nil {
@@ -144,8 +88,15 @@ func (c *RemotingClient) InvokeAsync(addr string, request
*RemotingCommand, time
return err
}
resp.SendRequestOK = true
+ go c.receiveAsync(resp)
return nil
+}
+func (c *RemotingClient) receiveAsync(f *ResponseFuture) {
+ _, err := f.waitResponse()
+ if err != nil {
+ f.executeInvokeCallback()
+ }
}
func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand,
timeout time.Duration) error {
diff --git a/internal/remote/remote_client_test.go
b/internal/remote/remote_client_test.go
index c999c0e..96e8f1b 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -24,6 +24,8 @@ import (
"sync"
"testing"
"time"
+
+ "github.com/stretchr/testify/assert"
)
func TestNewResponseFuture(t *testing.T) {
@@ -278,6 +280,53 @@ done:
wg.Wait()
}
+func TestInvokeAsyncTimeout(t *testing.T) {
+ clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello
RocketMQ"))
+ serverSendRemotingCommand := NewRemotingCommand(20, nil,
[]byte("Welcome native"))
+ serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+ serverSendRemotingCommand.Flag = ResponseType
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ client := NewRemotingClient()
+
+ var clientSend sync.WaitGroup // blocking client send message until
the server listen success.
+ clientSend.Add(1)
+ go func() {
+ clientSend.Wait()
+ err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+ time.Duration(1000), func(r *ResponseFuture) {
+ assert.NotNil(t, r.Err)
+ assert.Equal(t, ErrRequestTimeout, r.Err)
+ wg.Done()
+ })
+ assert.Nil(t, err, "failed to invokeSync.")
+ }()
+
+ l, err := net.Listen("tcp", ":3000")
+ assert.Nil(t, err)
+ defer l.Close()
+ clientSend.Done()
+
+ for {
+ conn, err := l.Accept()
+ assert.Nil(t, err)
+ defer conn.Close()
+
+ scanner := client.createScanner(conn)
+ for scanner.Scan() {
+ t.Logf("receive request.")
+ _, err := decode(scanner.Bytes())
+ assert.Nil(t, err, "failed to decode RemotingCommnad.")
+
+ time.Sleep(5 * time.Second) // force client timeout
+ goto done
+ }
+ }
+done:
+ wg.Wait()
+}
+
func TestInvokeOneWay(t *testing.T) {
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello
RocketMQ"))
diff --git a/primitive/ctx.go b/primitive/ctx.go
index e74f91a..812ffd6 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -32,6 +32,7 @@ const (
// method name in producer
SendSync = "SendSync"
SendOneway = "SendOneway"
+ SendAsync = "SendAsync"
// method name in consumer
ConsumerPush = "ConsumerPush"
ConsumerPull = "ConsumerPull"