georgehao commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851025176


##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {
+               return nil, err
+       }
+
+       p.messagesWithNamespace(msgs...)
+       msg := p.encodeBatch(msgs...)
+
+       correlationId, err := p.prepareSendRequest(msg, timeout)
+       if err != nil {
+               return nil, err
+       }
+
+       requestResponseFuture := 
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+       
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+       defer 
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+       f := func(ctx context.Context, result *primitive.SendResult, err error) 
{
+               if err != nil {
+                       requestResponseFuture.SendRequestOk = false
+                       requestResponseFuture.ResponseMsg = nil
+                       requestResponseFuture.CauseErr = err
+                       return
+               }
+               requestResponseFuture.SendRequestOk = true
+       }
+
+       if p.interceptor != nil {
+               primitive.WithMethod(ctx, primitive.SendAsync)
+
+               return nil, p.interceptor(ctx, msg, nil, func(ctx 
context.Context, req, reply interface{}) error {
+                       return p.sendAsync(ctx, msg, f)
+               })
+       }
+       if err := p.sendAsync(ctx, msg, f); err != nil {
+               return nil, errors.Wrap(err, "sendAsync error")
+       }
+
+       return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync  Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout 
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message) 
error {

Review Comment:
   这个error跟Java的异常还是有区别的。这个error只是调用过程中发生错误,让用户感知到而已。用户可以做相应的处理。



##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message) 
[]byte {
        return buffer.Bytes()
 }
 
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl 
time.Duration) (string, error) {
+       correlationId := uuid.NewV4().String()
+       requestClientId := p.client.ClientID()
+       msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+       msg.WithProperty(primitive.PropertyMessageReplyToClient, 
requestClientId)
+       msg.WithProperty(primitive.PropertyMessageTTL, 
strconv.Itoa(int(ttl.Seconds())))
+
+       rlog.Debug("message info:", map[string]interface{}{
+               "clientId":      requestClientId,
+               "correlationId": correlationId,
+               "ttl":           ttl.Seconds(),
+       })
+
+       nameSrv, err := internal.GetNamesrv(requestClientId)
+       if err != nil {
+               return "", errors.Wrap(err, "GetNameServ err")
+       }
+
+       if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+               // todo
+       }
+
+       return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration, 
msgs ...*primitive.Message) (*primitive.Message, error) {
+       if err := p.checkMsg(msgs...); err != nil {

Review Comment:
   收到



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to