georgehao commented on code in PR #803:
URL: https://github.com/apache/rocketmq-client-go/pull/803#discussion_r851025302
##########
producer/producer.go:
##########
@@ -150,6 +151,113 @@ func MarshalMessageBatch(msgs ...*primitive.Message)
[]byte {
return buffer.Bytes()
}
+func (p *defaultProducer) prepareSendRequest(msg *primitive.Message, ttl
time.Duration) (string, error) {
+ correlationId := uuid.NewV4().String()
+ requestClientId := p.client.ClientID()
+ msg.WithProperty(primitive.PropertyCorrelationID, correlationId)
+ msg.WithProperty(primitive.PropertyMessageReplyToClient,
requestClientId)
+ msg.WithProperty(primitive.PropertyMessageTTL,
strconv.Itoa(int(ttl.Seconds())))
+
+ rlog.Debug("message info:", map[string]interface{}{
+ "clientId": requestClientId,
+ "correlationId": correlationId,
+ "ttl": ttl.Seconds(),
+ })
+
+ nameSrv, err := internal.GetNamesrv(requestClientId)
+ if err != nil {
+ return "", errors.Wrap(err, "GetNameServ err")
+ }
+
+ if !nameSrv.CheckTopicRouteHasTopic(msg.Topic) {
+ // todo
+ }
+
+ return correlationId, nil
+}
+
+// Request Send messages to consumer
+func (p *defaultProducer) Request(ctx context.Context, timeout time.Duration,
msgs ...*primitive.Message) (*primitive.Message, error) {
+ if err := p.checkMsg(msgs...); err != nil {
+ return nil, err
+ }
+
+ p.messagesWithNamespace(msgs...)
+ msg := p.encodeBatch(msgs...)
+
+ correlationId, err := p.prepareSendRequest(msg, timeout)
+ if err != nil {
+ return nil, err
+ }
+
+ requestResponseFuture :=
internal.NewRequestResponseFuture(correlationId, timeout, nil)
+
internal.RequestResponseFutureMap.SetRequestResponseFuture(requestResponseFuture)
+ defer
internal.RequestResponseFutureMap.RemoveRequestResponseFuture(correlationId)
+
+ f := func(ctx context.Context, result *primitive.SendResult, err error)
{
+ if err != nil {
+ requestResponseFuture.SendRequestOk = false
+ requestResponseFuture.ResponseMsg = nil
+ requestResponseFuture.CauseErr = err
+ return
+ }
+ requestResponseFuture.SendRequestOk = true
+ }
+
+ if p.interceptor != nil {
+ primitive.WithMethod(ctx, primitive.SendAsync)
+
+ return nil, p.interceptor(ctx, msg, nil, func(ctx
context.Context, req, reply interface{}) error {
+ return p.sendAsync(ctx, msg, f)
+ })
+ }
+ if err := p.sendAsync(ctx, msg, f); err != nil {
+ return nil, errors.Wrap(err, "sendAsync error")
+ }
+
+ return requestResponseFuture.WaitResponseMessage(msg)
+}
+
+// RequestAsync Async Send messages to consumer
+func (p *defaultProducer) RequestAsync(ctx context.Context, timeout
time.Duration, callback internal.RequestCallback, msgs ...*primitive.Message)
error {
+ if err := p.checkMsg(msgs...); err != nil {
+ return err
+ }
+
+ p.messagesWithNamespace(msgs...)
+ msg := p.encodeBatch(msgs...)
+
+ correlationId, err := p.prepareSendRequest(msg, timeout)
+ if err != nil {
+ return err
Review Comment:
没太理解。。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]