githublaohu commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851007256


##########
internal/client.go:
##########
@@ -331,6 +331,46 @@ func GetOrNewRocketMQClient(option ClientOptions, 
callbackCh chan interface{}) R
                        client.resetOffset(header.topic, header.group, 
body.OffsetTable)
                        return nil
                })
+
+               
client.remoteClient.RegisterRequestFunc(ReqPushReplyMessageToClient, func(req 
*remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+                       receiveTime := time.Now().UnixNano() / 
int64(time.Millisecond)
+                       rlog.Info("receive push reply to client request...", 
map[string]interface{}{
+                               rlog.LogKeyBroker:        addr.String(),
+                               rlog.LogKeyTopic:         
req.ExtFields["topic"],

Review Comment:
   Use static variable proxy string
   
   请使用静态变量代理字符串



##########
examples/consumer/rpc/main.go:
##########
@@ -0,0 +1,89 @@
+package main
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/apache/rocketmq-client-go/v2/consumer"
+       "github.com/apache/rocketmq-client-go/v2/internal"
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/producer"
+)
+
+const (
+       producerGroup = "please_rename_unique_group_name"
+       consumerGroup = "please_rename_unique_group_name"
+       topic         = "RequestTopic"
+)
+
+func main() {

Review Comment:
   Trouble, use the new API to provide a complete RPC response. I feel that the 
current solution is not elegant enough and users have trouble using it
   
   麻烦,使用新的API提供完整的rpc响应。感觉目前的解决方案不够优雅,用户使用麻烦



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {

Review Comment:
   No query is required. Let users configure the ability of broker to 
automatically create topics. It is specially noted and explained in the 
function publicity
   
   不需要查询。让用户配置broker自动创建topic的能力。在功能宣讲的时候特地注明,解释



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/patrickmn/go-cache"
+       "github.com/pkg/errors"
+
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+       cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+       tmpRrfCache := requestResponseFutureCache{
+               cache: cache.New(5*time.Minute, 10*time.Minute),
+       }
+
+       // OnEvicted delete the timeout RequestResponseFuture, trigger set the 
failure cause.
+       tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+               rrf, ok := i.(*RequestResponseFuture)
+               if !ok {
+                       rlog.Error("convert i to RequestResponseFuture err", 
map[string]interface{}{
+                               "correlationId": s,
+                       })
+                       return
+               }
+               if !rrf.IsTimeout() {
+                       return
+               }
+
+               err := fmt.Errorf("correlationId:%s request timeout, no reply 
message", s)
+               rrf.CauseErr = err
+               rrf.ExecuteRequestCallback()

Review Comment:
   Considering the asynchronous callback, the synchronization timeout problem 
is not handled
   
   考虑的异步回调,同步超时问题没有处理



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {
+               return nil, err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return nil, err
+       }
+
+       requestResponseFuture := 
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+       
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+       defer 
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+       f := func(ctx context.Context, result *primitive.SendResult, err error) 
{
+               if err != nil {
+                       requestResponseFuture.SendRequestOk = false
+                       requestResponseFuture.ResponseMsg = nil
+                       requestResponseFuture.CauseErr = err
+                       return
+               }
+               requestResponseFuture.SendRequestOk = true
+       }
+
+       if p.interceptor != nil {
+               primitive.WithMethod(ctx, primitive.SendAsync)
+
+               return nil, p.interceptor(ctx, msg, nil, func(ctx 
context.Context, req, reply interface{}) error {
+                       return p.sendAsync(ctx, msg, f)
+               })
+       }
+       if err := p.sendAsync(ctx, msg, f); err != nil {
+               return nil, errors.Wrap(err, "sendAsync error")
+       }
+
+       return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout 
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) 
error {

Review Comment:
   Do not define the return of a method as an exception
   
   不要定义方法的返回为异常



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/patrickmn/go-cache"
+       "github.com/pkg/errors"
+
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+       cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+       tmpRrfCache := requestResponseFutureCache{
+               cache: cache.New(5*time.Minute, 10*time.Minute),
+       }
+
+       // OnEvicted delete the timeout RequestResponseFuture, trigger set the 
failure cause.
+       tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+               rrf, ok := i.(*RequestResponseFuture)
+               if !ok {
+                       rlog.Error("convert i to RequestResponseFuture err", 
map[string]interface{}{
+                               "correlationId": s,
+                       })
+                       return
+               }
+               if !rrf.IsTimeout() {
+                       return
+               }
+
+               err := fmt.Errorf("correlationId:%s request timeout, no reply 
message", s)
+               rrf.CauseErr = err
+               rrf.ExecuteRequestCallback()
+       })
+       return &tmpRrfCache
+}
+
+// SetRequestResponseFuture set rrf to map
+func (fm *requestResponseFutureCache) SetRequestResponseFuture(rrf 
*RequestResponseFuture) {
+       fm.cache.Set(rrf.CorrelationId, rrf, rrf.Timeout)
+}
+
+// SetResponseToRequestResponseFuture set reply to rrf
+func (fm *requestResponseFutureCache) 
SetResponseToRequestResponseFuture(correlationId string, reply 
*primitive.Message) error {
+       rrf, exist := fm.RequestResponseFuture(correlationId)
+       if !exist {
+               return errors.Wrapf(nil, "correlationId:%s not exist in map", 
correlationId)
+       }
+       rrf.PutResponseMessage(reply)
+       if rrf.RequestCallback != nil {
+               rrf.ExecuteRequestCallback()
+       }
+       return nil
+}
+
+// RequestResponseFuture get rrf from map by the CorrelationId
+func (fm *requestResponseFutureCache) RequestResponseFuture(correlationId 
string) (*RequestResponseFuture, bool) {
+       res, exists := fm.cache.Get(correlationId)
+       if exists {
+               return res.(*RequestResponseFuture), exists
+       }
+       return nil, exists
+}
+
+// RemoveRequestResponseFuture remove the rrf from map
+func (fm *requestResponseFutureCache) 
RemoveRequestResponseFuture(correlationId string) {
+       fm.cache.Delete(correlationId)

Review Comment:
   When deleting, please trigger wake-up and execute asynchronous callback 
operation. At the same time, please pass in the exception
   
   删除的时候,请触发唤醒与执行异步回调操作。同时请传递异常进来



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/patrickmn/go-cache"
+       "github.com/pkg/errors"
+
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()

Review Comment:
   The naming definition of requestresponsefuture in the file is not readable 
and understandable
   
   文件内RequestResponseFuture的命名定义可读性与可理解性比较差



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {

Review Comment:
   The RPC function does not require batch capacity. Just one message at a 
time. And related codes also need to be corrected
   
   rpc功能不需要批量能力。每次只需要一个消息就行了。以及相关代码也需要修正



##########
internal/request_response_future.go:
##########
@@ -0,0 +1,135 @@
+package internal
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/patrickmn/go-cache"
+       "github.com/pkg/errors"
+
+       "github.com/apache/rocketmq-client-go/v2/primitive"
+       "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+var RequestResponseFutureMap = NewRequestResponseFutureMap()
+
+type requestResponseFutureCache struct {
+       cache *cache.Cache
+}
+
+func NewRequestResponseFutureMap() *requestResponseFutureCache {
+       tmpRrfCache := requestResponseFutureCache{
+               cache: cache.New(5*time.Minute, 10*time.Minute),
+       }
+
+       // OnEvicted delete the timeout RequestResponseFuture, trigger set the 
failure cause.
+       tmpRrfCache.cache.OnEvicted(func(s string, i interface{}) {
+               rrf, ok := i.(*RequestResponseFuture)
+               if !ok {
+                       rlog.Error("convert i to RequestResponseFuture err", 
map[string]interface{}{
+                               "correlationId": s,
+                       })
+                       return
+               }
+               if !rrf.IsTimeout() {
+                       return
+               }
+
+               err := fmt.Errorf("correlationId:%s request timeout, no reply 
message", s)
+               rrf.CauseErr = err
+               rrf.ExecuteRequestCallback()

Review Comment:
   Automatically delete cache when it times out
   If there are exceptions and timeouts, do you want to delete them from the 
cache
   
   cache 超时时候自动删除
   如果异常与超时,是否要从cache中删除



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {
+               return nil, err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return nil, err
+       }
+
+       requestResponseFuture := 
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+       
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+       defer 
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+       f := func(ctx context.Context, result *primitive.SendResult, err error) 
{
+               if err != nil {
+                       requestResponseFuture.SendRequestOk = false
+                       requestResponseFuture.ResponseMsg = nil
+                       requestResponseFuture.CauseErr = err
+                       return
+               }
+               requestResponseFuture.SendRequestOk = true
+       }
+
+       if p.interceptor != nil {
+               primitive.WithMethod(ctx, primitive.SendAsync)
+
+               return nil, p.interceptor(ctx, msg, nil, func(ctx 
context.Context, req, reply interface{}) error {
+                       return p.sendAsync(ctx, msg, f)
+               })
+       }
+       if err := p.sendAsync(ctx, msg, f); err != nil {
+               return nil, errors.Wrap(err, "sendAsync error")
+       }
+
+       return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout 
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) 
error {
+       if err := p.checkMsg(msgs...); err != nil {
+               return err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return err

Review Comment:
   Please call the asynchronous return method
   
   请调用异步回到方法



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to