githublaohu commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851025919
##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message)
[]byte {
return buffer.Bytes()
}
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl
time.Duration) (string, error) {
+ correlationId := uuid.NewV4().String()
+ requestClientId := p.client.ClientID()
+ msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+ msg.WithProperty(primitive.PropertyMessageReplyToClient,
requestClientId)
+ msg.WithProperty(primitive.PropertyMessageTTL,
strconv.Itoa(int(ttl.Seconds())))
+
+ rlog.Debug("message info:", map[string]interface{}{
+ "clientId": requestClientId,
+ "correlationId": correlationId,
+ "ttl": ttl.Seconds(),
+ })
+
+ nameSrv, err := internal.GetNamesrv(requestClientId)
+ if err != nil {
+ return "", errors.Wrap(err, "GetNameServ err")
+ }
+
+ if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+ // todo
+ }
+
+ return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration,
msgs ...*primitive.Message) (*primitive.Message, error) {
+ if err := p.checkMsg(msgs...); err != nil {
+ return nil, err
+ }
+
+ p.messagesWithNamespace(msgs...)
+ msg := p.encodeBatch(msgs...)
+
+ correlationId, err := p.prepareSendRequest(msg, timeout)
+ if err != nil {
+ return nil, err
+ }
+
+ requestResponseFuture :=
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+ defer
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+ f := func(ctx context.Context, result *primitive.SendResult, err error)
{
+ if err != nil {
+ requestResponseFuture.SendRequestOk = false
+ requestResponseFuture.ResponseMsg = nil
+ requestResponseFuture.CauseErr = err
+ return
+ }
+ requestResponseFuture.SendRequestOk = true
+ }
+
+ if p.interceptor != nil {
+ primitive.WithMethod(ctx, primitive.SendAsync)
+
+ return nil, p.interceptor(ctx, msg, nil, func(ctx
context.Context, req, reply interface{}) error {
+ return p.sendAsync(ctx, msg, f)
+ })
+ }
+ if err := p.sendAsync(ctx, msg, f); err != nil {
+ return nil, errors.Wrap(err, "sendAsync error")
+ }
+
+ return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message)
error {
Review Comment:
异步是,不需要返回的。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]