This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 45cb340  add producer async method. resolve #98 (#100)
45cb340 is described below

commit 45cb34063443322ab550790eb15c5236bbfa335c
Author: xujianhai666 <[email protected]>
AuthorDate: Mon Jul 8 18:54:55 2019 +0800

    add producer async method. resolve #98 (#100)
---
 examples/producer/async/main.go       | 65 +++++++++++++++++++++++++++
 internal/kernel/client.go             |  7 +++
 internal/producer/producer.go         | 63 +++++++++++++++++++++++----
 internal/remote/future.go             | 82 +++++++++++++++++++++++++++++++++++
 internal/remote/remote_client.go      | 67 ++++------------------------
 internal/remote/remote_client_test.go | 49 +++++++++++++++++++++
 primitive/ctx.go                      |  1 +
 7 files changed, 268 insertions(+), 66 deletions(-)

diff --git a/examples/producer/async/main.go b/examples/producer/async/main.go
new file mode 100644
index 0000000..3888e32
--- /dev/null
+++ b/examples/producer/async/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "strconv"
+       "sync"
+
+       "github.com/apache/rocketmq-client-go/internal/producer"
+       "github.com/apache/rocketmq-client-go/primitive"
+)
+
+// Package main implements a async producer to send message.
+func main() {
+       nameServerAddr := []string{"127.0.0.1:9876"}
+       p, _ := producer.NewProducer(nameServerAddr, primitive.WithRetry(2))
+       err := p.Start()
+       if err != nil {
+               fmt.Printf("start producer error: %s", err.Error())
+               os.Exit(1)
+       }
+       var wg sync.WaitGroup
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               err := p.SendAsync(context.Background(), &primitive.Message{
+                       Topic:      "TopicTest",
+                       Body:       []byte("Hello RocketMQ Go Client!"),
+                       Properties: map[string]string{"id": strconv.Itoa(i)},
+               }, func(ctx context.Context, result *primitive.SendResult, e 
error) {
+                       if e != nil {
+                               fmt.Printf("receive message error: %s\n", err)
+                       } else {
+                               fmt.Printf("send message success: result=%s\n", 
result.String())
+                       }
+                       wg.Done()
+               })
+
+               if err != nil {
+                       fmt.Printf("send message error: %s\n", err)
+               }
+       }
+       wg.Wait()
+       err = p.Shutdown()
+       if err != nil {
+               fmt.Printf("shundown producer error: %s", err.Error())
+       }
+}
diff --git a/internal/kernel/client.go b/internal/kernel/client.go
index 94f45c9..5aa6849 100644
--- a/internal/kernel/client.go
+++ b/internal/kernel/client.go
@@ -178,6 +178,13 @@ func (c *RMQClient) InvokeSync(addr string, request 
*remote.RemotingCommand,
        return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
 }
 
+func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+       timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) 
error {
+       return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, 
func(future *remote.ResponseFuture) {
+               f(future.ResponseCommand, future.Err)
+       })
+}
+
 func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
        timeoutMillis time.Duration) error {
        return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
diff --git a/internal/producer/producer.go b/internal/producer/producer.go
index 5be553e..f7c87d1 100644
--- a/internal/producer/producer.go
+++ b/internal/producer/producer.go
@@ -31,10 +31,16 @@ import (
        "github.com/pkg/errors"
 )
 
+var(
+       ErrTopicEmpty = errors.New("topic is nil")
+       ErrMessageEmpty = errors.New("message is nil")
+)
+
 type Producer interface {
        Start() error
        Shutdown() error
        SendSync(context.Context, *primitive.Message) (*primitive.SendResult, 
error)
+       SendAsync(context.Context, *primitive.Message, func(context.Context, 
*primitive.SendResult, error)) error
        SendOneWay(context.Context, *primitive.Message) error
 }
 
@@ -108,13 +114,20 @@ func (p *defaultProducer) Shutdown() error {
        return nil
 }
 
-func (p *defaultProducer) SendSync(ctx context.Context, msg 
*primitive.Message) (*primitive.SendResult, error) {
+func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
        if msg == nil {
-               return nil, errors.New("message is nil")
+               return errors.New("message is nil")
        }
 
        if msg.Topic == "" {
-               return nil, errors.New("topic is nil")
+               return errors.New("topic is nil")
+       }
+       return nil
+}
+
+func (p *defaultProducer) SendSync(ctx context.Context, msg 
*primitive.Message) (*primitive.SendResult, error) {
+       if err := p.checkMsg(msg); err != nil {
+               return nil, err
        }
 
        resp := new(primitive.SendResult)
@@ -165,13 +178,47 @@ func (p *defaultProducer) sendSync(ctx context.Context, 
msg *primitive.Message,
        return err
 }
 
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg 
*primitive.Message) error {
-       if msg == nil {
-               return errors.New("message is nil")
+func (p *defaultProducer) SendAsync(ctx context.Context, msg 
*primitive.Message, h func(context.Context, *primitive.SendResult, error)) 
error {
+       if err := p.checkMsg(msg); err != nil {
+               return err
        }
 
-       if msg.Topic == "" {
-               return errors.New("topic is nil")
+       if p.interceptor != nil {
+               primitive.WithMehod(ctx, primitive.SendAsync)
+
+               return p.interceptor(ctx, msg, nil, func(ctx context.Context, 
req, reply interface{}) error {
+                       return p.sendAsync(ctx, msg, h)
+               })
+       }
+       return p.sendAsync(ctx, msg, h)
+}
+
+func (p *defaultProducer) sendAsync(ctx context.Context, msg 
*primitive.Message, h func(context.Context, *primitive.SendResult, error)) 
error {
+
+       mq := p.selectMessageQueue(msg.Topic)
+       if mq == nil {
+               return errors.Errorf("the topic=%s route info not found", 
msg.Topic)
+       }
+
+       addr := kernel.FindBrokerAddrByName(mq.BrokerName)
+       if addr == "" {
+               return errors.Errorf("topic=%s route info not found", mq.Topic)
+       }
+
+       return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg), 
3*time.Second, func(command *remote.RemotingCommand, e error) {
+               if e != nil {
+                       h(ctx, nil, e)
+                       return
+               }
+               resp := new(primitive.SendResult)
+               p.client.ProcessSendResponse(mq.BrokerName, command, resp, msg)
+               h(ctx, resp, e)
+       })
+}
+
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg 
*primitive.Message) error {
+       if err := p.checkMsg(msg); err != nil {
+               return err
        }
 
        if p.interceptor != nil {
diff --git a/internal/remote/future.go b/internal/remote/future.go
new file mode 100644
index 0000000..8f604cc
--- /dev/null
+++ b/internal/remote/future.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package remote
+
+import (
+       "sync"
+       "time"
+)
+
+// ResponseFuture 
+type ResponseFuture struct {
+       ResponseCommand *RemotingCommand
+       SendRequestOK   bool
+       Err             error
+       Opaque          int32
+       TimeoutMillis   time.Duration
+       callback        func(*ResponseFuture)
+       BeginTimestamp  int64
+       Done            chan bool
+       callbackOnce    sync.Once
+}
+
+// NewResponseFuture create ResponseFuture with opaque, timeout and callback
+func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback 
func(*ResponseFuture)) *ResponseFuture {
+       return &ResponseFuture{
+               Opaque:         opaque,
+               Done:           make(chan bool),
+               TimeoutMillis:  timeoutMillis,
+               callback:       callback,
+               BeginTimestamp: time.Now().Unix() * 1000,
+       }
+}
+
+func (r *ResponseFuture) executeInvokeCallback() {
+       r.callbackOnce.Do(func() {
+               if r.callback != nil {
+                       r.callback(r)
+               }
+       })
+}
+
+func (r *ResponseFuture) isTimeout() bool {
+       diff := time.Now().Unix()*1000 - r.BeginTimestamp
+       return diff > int64(r.TimeoutMillis)
+}
+
+func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
+       var (
+               cmd *RemotingCommand
+               err error
+       )
+       timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
+       for {
+               select {
+               case <-r.Done:
+                       cmd, err = r.ResponseCommand, r.Err
+                       goto done
+               case <-timer.C:
+                       err = ErrRequestTimeout
+                       r.Err = err
+                       goto done
+               }
+       }
+done:
+       timer.Stop()
+       return cmd, err
+}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index cf71134..fde17e4 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -35,64 +35,6 @@ var (
        connectionLocker  sync.Mutex
 )
 
-//ResponseFuture for
-type ResponseFuture struct {
-       ResponseCommand *RemotingCommand
-       SendRequestOK   bool
-       Err             error
-       Opaque          int32
-       TimeoutMillis   time.Duration
-       callback        func(*ResponseFuture)
-       BeginTimestamp  int64
-       Done            chan bool
-       callbackOnce    sync.Once
-}
-
-//NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeoutMillis time.Duration, callback 
func(*ResponseFuture)) *ResponseFuture {
-       return &ResponseFuture{
-               Opaque:         opaque,
-               Done:           make(chan bool),
-               TimeoutMillis:  timeoutMillis,
-               callback:       callback,
-               BeginTimestamp: time.Now().Unix() * 1000,
-       }
-}
-
-func (r *ResponseFuture) executeInvokeCallback() {
-       r.callbackOnce.Do(func() {
-               if r.callback != nil {
-                       r.callback(r)
-               }
-       })
-}
-
-func (r *ResponseFuture) isTimeout() bool {
-       diff := time.Now().Unix()*1000 - r.BeginTimestamp
-       return diff > int64(r.TimeoutMillis)
-}
-
-func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
-       var (
-               cmd *RemotingCommand
-               err error
-       )
-       timer := time.NewTimer(r.TimeoutMillis * time.Millisecond)
-       for {
-               select {
-               case <-r.Done:
-                       cmd, err = r.ResponseCommand, r.Err
-                       goto done
-               case <-timer.C:
-                       err = ErrRequestTimeout
-                       goto done
-               }
-       }
-done:
-       timer.Stop()
-       return cmd, err
-}
-
 type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
@@ -116,6 +58,7 @@ func (c *RemotingClient) RegisterRequestFunc(code int16, f 
ClientRequestFunc) {
        c.processors[code] = f
 }
 
+// TODO: merge sync and async model. sync should run on async model by 
blocking on chan
 func (c *RemotingClient) InvokeSync(addr string, request *RemotingCommand, 
timeoutMillis time.Duration) (*RemotingCommand, error) {
        conn, err := c.connect(addr)
        if err != nil {
@@ -132,6 +75,7 @@ func (c *RemotingClient) InvokeSync(addr string, request 
*RemotingCommand, timeo
        return resp.waitResponse()
 }
 
+// InvokeAsync send request witout blocking, just return immediately.
 func (c *RemotingClient) InvokeAsync(addr string, request *RemotingCommand, 
timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
        conn, err := c.connect(addr)
        if err != nil {
@@ -144,8 +88,15 @@ func (c *RemotingClient) InvokeAsync(addr string, request 
*RemotingCommand, time
                return err
        }
        resp.SendRequestOK = true
+       go c.receiveAsync(resp)
        return nil
+}
 
+func (c *RemotingClient) receiveAsync(f *ResponseFuture) {
+       _, err := f.waitResponse()
+       if err != nil {
+               f.executeInvokeCallback()
+       }
 }
 
 func (c *RemotingClient) InvokeOneWay(addr string, request *RemotingCommand, 
timeout time.Duration) error {
diff --git a/internal/remote/remote_client_test.go 
b/internal/remote/remote_client_test.go
index c999c0e..96e8f1b 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -24,6 +24,8 @@ import (
        "sync"
        "testing"
        "time"
+
+       "github.com/stretchr/testify/assert"
 )
 
 func TestNewResponseFuture(t *testing.T) {
@@ -278,6 +280,53 @@ done:
        wg.Wait()
 }
 
+func TestInvokeAsyncTimeout(t *testing.T) {
+       clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello 
RocketMQ"))
+       serverSendRemotingCommand := NewRemotingCommand(20, nil, 
[]byte("Welcome native"))
+       serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
+       serverSendRemotingCommand.Flag = ResponseType
+
+       var wg sync.WaitGroup
+       wg.Add(1)
+       client := NewRemotingClient()
+
+       var clientSend sync.WaitGroup  // blocking client send message until 
the server listen success.
+       clientSend.Add(1)
+       go func() {
+               clientSend.Wait()
+               err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+                       time.Duration(1000), func(r *ResponseFuture) {
+                               assert.NotNil(t, r.Err)
+                               assert.Equal(t, ErrRequestTimeout, r.Err)
+                               wg.Done()
+                       })
+               assert.Nil(t, err, "failed to invokeSync.")
+       }()
+
+       l, err := net.Listen("tcp", ":3000")
+       assert.Nil(t, err)
+       defer l.Close()
+       clientSend.Done()
+
+       for {
+               conn, err := l.Accept()
+               assert.Nil(t, err)
+               defer conn.Close()
+
+               scanner := client.createScanner(conn)
+               for scanner.Scan() {
+                       t.Logf("receive request.")
+                       _, err := decode(scanner.Bytes())
+                       assert.Nil(t, err, "failed to decode RemotingCommnad.")
+
+                       time.Sleep(5 * time.Second) // force client timeout
+                       goto done
+               }
+       }
+done:
+       wg.Wait()
+}
+
 func TestInvokeOneWay(t *testing.T) {
        clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello 
RocketMQ"))
 
diff --git a/primitive/ctx.go b/primitive/ctx.go
index e74f91a..812ffd6 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -32,6 +32,7 @@ const (
        // method name in  producer
        SendSync = "SendSync"
        SendOneway = "SendOneway"
+       SendAsync = "SendAsync"
        // method name in consumer
        ConsumerPush = "ConsumerPush"
        ConsumerPull = "ConsumerPull"

Reply via email to