georgehao commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851025302


##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {
+               return nil, err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return nil, err
+       }
+
+       requestResponseFuture := 
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+       
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+       defer 
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+       f := func(ctx context.Context, result *primitive.SendResult, err error) 
{
+               if err != nil {
+                       requestResponseFuture.SendRequestOk = false
+                       requestResponseFuture.ResponseMsg = nil
+                       requestResponseFuture.CauseErr = err
+                       return
+               }
+               requestResponseFuture.SendRequestOk = true
+       }
+
+       if p.interceptor != nil {
+               primitive.WithMethod(ctx, primitive.SendAsync)
+
+               return nil, p.interceptor(ctx, msg, nil, func(ctx 
context.Context, req, reply interface{}) error {
+                       return p.sendAsync(ctx, msg, f)
+               })
+       }
+       if err := p.sendAsync(ctx, msg, f); err != nil {
+               return nil, errors.Wrap(err, "sendAsync error")
+       }
+
+       return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout 
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) 
error {
+       if err := p.checkMsg(msgs...); err != nil {
+               return err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return err

Review Comment:
   没太理解。。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to