This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 9a58797  [ISSUE #462] fix the trace message was send failed. (#463)
9a58797 is described below

commit 9a587973e438ddb04edda6766fe494016c670064
Author: dinglei <[email protected]>
AuthorDate: Fri Mar 27 10:43:53 2020 +0800

    [ISSUE #462] fix the trace message was send failed. (#463)
    
    * fix(trace): fix the trace message was send failed.
    
    * fix(bug): remove namespace in the trace message body
    
    * fix the processid error in client id.
---
 consumer/push_consumer.go |  5 ++++
 internal/client.go        | 20 +++++++++++++-
 internal/trace.go         | 68 +++++++++++++++++++++++++++++++++++++----------
 primitive/message.go      | 13 ++++++---
 primitive/trace.go        |  1 +
 5 files changed, 89 insertions(+), 18 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 321c59f..9118ae2 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,6 +831,11 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, 
subMsgs []*primitive.M
 
                        msgCtx, _ := primitive.GetConsumerCtx(ctx)
                        msgCtx.Success = realReply.ConsumeResult == 
ConsumeSuccess
+                       if realReply.ConsumeResult == ConsumeSuccess {
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.SuccessReturn)
+                       } else {
+                               msgCtx.Properties[primitive.PropCtxType] = 
string(primitive.FailedReturn)
+                       }
                        return e
                })
                return container.ConsumeResult, err
diff --git a/internal/client.go b/internal/client.go
index 90c9b0c..6110dba 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -393,7 +393,12 @@ func (c *rmqClient) Shutdown() {
 }
 
 func (c *rmqClient) ClientID() string {
-       id := c.option.ClientIP + "@" + c.option.InstanceName
+       id := c.option.ClientIP + "@"
+       if c.option.InstanceName == "DEFAULT" {
+               id += strconv.Itoa(os.Getpid())
+       } else {
+               id += c.option.InstanceName
+       }
        if c.option.UnitName != "" {
                id += "@" + c.option.UnitName
        }
@@ -466,6 +471,19 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
                brokerName := key.(string)
                data := value.(*BrokerData)
                for id, addr := range data.BrokerAddresses {
+                       rlog.Debug("try to send heart beat to broker", 
map[string]interface{}{
+                               "brokerName": brokerName,
+                               "brokerId":   id,
+                               "brokerAddr": addr,
+                       })
+                       if hbData.ConsumerDatas.Len() == 0 && id != 0 {
+                               rlog.Debug("notice, will not send heart beat to 
broker", map[string]interface{}{
+                                       "brokerName": brokerName,
+                                       "brokerId":   id,
+                                       "brokerAddr": addr,
+                               })
+                               continue
+                       }
                        cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, 
hbData.encode())
 
                        ctx, _ := context.WithTimeout(context.Background(), 
3*time.Second)
diff --git a/internal/trace.go b/internal/trace.go
index 473b80f..48b257f 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -89,9 +89,21 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
                buffer.WriteRune(contentSplitter)
                buffer.WriteString(ctx.RegionId)
                buffer.WriteRune(contentSplitter)
-               buffer.WriteString(ctx.GroupName)
+               ss := strings.Split(ctx.GroupName, "%")
+               if len(ss) == 2 {
+                       buffer.WriteString(ss[1])
+               } else {
+                       buffer.WriteString(ctx.GroupName)
+               }
+
                buffer.WriteRune(contentSplitter)
-               buffer.WriteString(bean.Topic)
+               ssTopic := strings.Split(bean.Topic, "%")
+               if len(ssTopic) == 2 {
+                       buffer.WriteString(ssTopic[1])
+               } else {
+                       buffer.WriteString(bean.Topic)
+               }
+               //buffer.WriteString(bean.Topic)
                buffer.WriteRune(contentSplitter)
                buffer.WriteString(bean.MsgId)
                buffer.WriteRune(contentSplitter)
@@ -119,7 +131,12 @@ func (ctx *TraceContext) marshal2Bean() *TraceTransferBean 
{
                        buffer.WriteRune(contentSplitter)
                        buffer.WriteString(ctx.RegionId)
                        buffer.WriteRune(contentSplitter)
-                       buffer.WriteString(ctx.GroupName)
+                       ss := strings.Split(ctx.GroupName, "%")
+                       if len(ss) == 2 {
+                               buffer.WriteString(ss[1])
+                       } else {
+                               buffer.WriteString(ctx.GroupName)
+                       }
                        buffer.WriteRune(contentSplitter)
                        buffer.WriteString(ctx.RequestId)
                        buffer.WriteRune(contentSplitter)
@@ -233,6 +250,9 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) 
*traceDispatcher {
        }
 
        cliOp := DefaultClientOptions()
+       cliOp.GroupName = traceCfg.GroupName
+       cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
+       cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
        cliOp.RetryTimes = 0
        cliOp.Namesrv = srvs
        cliOp.Credentials = traceCfg.Credentials
@@ -301,8 +321,9 @@ func (td *traceDispatcher) process() {
                        batch = append(batch, ctx)
                        if count == batchSize {
                                count = 0
+                               batchSend := batch
                                go primitive.WithRecover(func() {
-                                       td.batchCommit(batch)
+                                       td.batchCommit(batchSend)
                                })
                                batch = make([]TraceContext, 0)
                        }
@@ -312,15 +333,17 @@ func (td *traceDispatcher) process() {
                                count++
                                lastput = time.Now()
                                if len(batch) > 0 {
+                                       batchSend := batch
                                        go primitive.WithRecover(func() {
-                                               td.batchCommit(batch)
+                                               td.batchCommit(batchSend)
                                        })
                                        batch = make([]TraceContext, 0)
                                }
                        }
                case <-td.ctx.Done():
+                       batchSend := batch
                        go primitive.WithRecover(func() {
-                               td.batchCommit(batch)
+                               td.batchCommit(batchSend)
                        })
                        batch = make([]TraceContext, 0)
 
@@ -403,10 +426,14 @@ func (td *traceDispatcher) flush(topic, regionID string, 
data []TraceTransferBea
 }
 
 func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, 
data string) {
-       msg := primitive.NewMessage(td.traceTopic, []byte(data))
+       traceTopic := td.traceTopic
+       if td.access == primitive.Cloud {
+               traceTopic = td.traceTopic + regionID
+       }
+       msg := primitive.NewMessage(traceTopic, []byte(data))
        msg.WithKeys(keySet.slice())
 
-       mq, addr := td.findMq()
+       mq, addr := td.findMq(regionID)
        if mq == nil {
                return
        }
@@ -414,19 +441,32 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet 
Keyset, regionID string, dat
        var req = td.buildSendRequest(mq, msg)
        ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
        err := td.cli.InvokeAsync(ctx, addr, req, func(command 
*remote.RemotingCommand, e error) {
+               resp := new(primitive.SendResult)
                if e != nil {
-                       rlog.Error("send trace data error", 
map[string]interface{}{
+                       rlog.Info("send trace data error.", 
map[string]interface{}{
                                "traceData": data,
                        })
+               } else {
+                       td.cli.ProcessSendResponse(mq.BrokerName, command, 
resp, msg)
+                       rlog.Debug("send trace data success:", 
map[string]interface{}{
+                               "SendResult": resp,
+                               "traceData":  data,
+                       })
                }
        })
-       rlog.Error("send trace data error when invoke", map[string]interface{}{
-               rlog.LogKeyUnderlayError: err,
-       })
+       if err != nil {
+               rlog.Info("send trace data error when invoke", 
map[string]interface{}{
+                       rlog.LogKeyUnderlayError: err,
+               })
+       }
 }
 
-func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
-       mqs, err := td.namesrvs.FetchPublishMessageQueues(td.traceTopic)
+func (td *traceDispatcher) findMq(regionID string) (*primitive.MessageQueue, 
string) {
+       traceTopic := td.traceTopic
+       if td.access == primitive.Cloud {
+               traceTopic = td.traceTopic + regionID
+       }
+       mqs, err := td.namesrvs.FetchPublishMessageQueues(traceTopic)
        if err != nil {
                rlog.Error("fetch publish message queues failed", 
map[string]interface{}{
                        rlog.LogKeyUnderlayError: err,
diff --git a/primitive/message.go b/primitive/message.go
index 7d12edb..6a84477 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -237,6 +237,7 @@ func (m *Message) Marshal() []byte {
 type MessageExt struct {
        Message
        MsgId                     string
+       OffsetMsgId               string
        StoreSize                 int32
        QueueOffset               int64
        SysFlag                   int32
@@ -263,9 +264,9 @@ func (msgExt *MessageExt) IsTraceOn() string {
 }
 
 func (msgExt *MessageExt) String() string {
-       return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, 
QueueOffset=%d, SysFlag=%d, "+
+       return fmt.Sprintf("[Message=%s, MsgId=%s, OffsetMsgId=%s,QueueId=%d, 
StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
                "BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, 
StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
-               "ReconsumeTimes=%d, PreparedTransactionOffset=%d]", 
msgExt.Message.String(), msgExt.MsgId, msgExt.Queue.QueueId,
+               "ReconsumeTimes=%d, PreparedTransactionOffset=%d]", 
msgExt.Message.String(), msgExt.MsgId, msgExt.OffsetMsgId, msgExt.Queue.QueueId,
                msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, 
msgExt.BornTimestamp, msgExt.BornHost,
                msgExt.StoreTimestamp, msgExt.StoreHost, 
msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
                msgExt.PreparedTransactionOffset)
@@ -364,11 +365,17 @@ func DecodeMessage(data []byte) []*MessageExt {
                }
                count += 2 + int(propertiesLength)
 
-               msg.MsgId = CreateMessageId(hostBytes, port, 
msg.CommitLogOffset)
+               msg.OffsetMsgId = CreateMessageId(hostBytes, port, 
msg.CommitLogOffset)
                //count += 16
                if msg.properties == nil {
                        msg.properties = make(map[string]string, 0)
                }
+               msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
+               if len(msgID) == 0 {
+                       msg.MsgId = msg.OffsetMsgId
+               } else {
+                       msg.MsgId = msgID
+               }
                msgs = append(msgs, msg)
        }
 
diff --git a/primitive/trace.go b/primitive/trace.go
index c898623..c0df5b3 100644
--- a/primitive/trace.go
+++ b/primitive/trace.go
@@ -20,6 +20,7 @@ package primitive
 // config for message trace.
 type TraceConfig struct {
        TraceTopic   string
+       GroupName    string
        Access       AccessChannel
        NamesrvAddrs []string
        Credentials  // acl config for trace. omit if acl is closed on broker.

Reply via email to