This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 56263c334 [INLONG-7054][DataProxy] Add the processing of the rtms 
field (#7055)
56263c334 is described below

commit 56263c3348ed13bc48be9d1d2a49c5ed89d53991
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Dec 24 21:31:53 2022 +0800

    [INLONG-7054][DataProxy] Add the processing of the rtms field (#7055)
---
 .../inlong/common/msg/AttributeConstants.java      | 13 ++++
 .../dataproxy/http/SimpleMessageHandler.java       |  4 +-
 .../dataproxy/source/DefaultServiceDecoder.java    | 76 ++++++++++++++++------
 3 files changed, 72 insertions(+), 21 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 0f80b8bfe..a17a59f29 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -96,4 +96,17 @@ public interface AttributeConstants {
     String MESSAGE_DP_IP = "dpIP";
 
     String MESSAGE_TOPIC = "topic";
+
+    // dataproxy IP, used in trace info
+    String DATAPROXY_NODE_IP = "node2ip";
+
+    // dataproxy received time, used in trace info
+    String DATAPROXY_RCVTIME = "rtime2";
+
+    // Message reporting time, in milliseconds
+    // Provided by the initial sender of the data, and passed to
+    // the downstream by the Bus without modification for the downstream to
+    // calculate the end-to-end message delay; if this field does not exist in 
the request,
+    // it will be added by the Bus with the current time
+    public static final String MSG_RPT_TIME = "rtms";
 }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index a94a3b72c..c7c63bcae 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -139,7 +139,9 @@ public class SimpleMessageHandler implements MessageHandler 
{
                 .append("&dt=").append(strDataTime)
                 .append("&NodeIP=").append(strRemoteIP)
                 .append("&cnt=").append(strMsgCount)
-                .append("&rt=").append(msgRcvTime);
+                .append("&rt=").append(msgRcvTime)
+                
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
+                
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
         try {
             inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
             strBuff.delete(0, strBuff.length());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index a537e09c0..b164ad36a 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -150,17 +150,24 @@ public class DefaultServiceDecoder implements 
ServiceDecoder {
         return index;
     }
 
-    private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int 
extendField,
+    private ByteBuffer handleExtraAppendAttrInfo(Map<String, String> 
commonAttrMap,
+            Channel channel, ByteBuf cb, int extendField,
             int msgHeadPos, int totalDataLen, int attrLen,
             String strAttr, int bodyLen, long msgRcvTime) {
-        // whether enable trace
-        ByteBuffer dataBuf;
+        // get and check report time from report node
+        boolean needRebuild = false;
+        String rtMs = "";
+        if 
(StringUtils.isBlank(commonAttrMap.get(AttributeConstants.MSG_RPT_TIME))) {
+            needRebuild = true;
+            rtMs = AttributeConstants.MSG_RPT_TIME
+                    + AttributeConstants.KEY_VALUE_SEPARATOR
+                    + System.currentTimeMillis();
+        }
+        // get trace requirement
+        String traceInfo = "";
         boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
-        if (!enableTrace) {
-            dataBuf = ByteBuffer.allocate(totalDataLen + 
BIN_MSG_TOTALLEN_SIZE);
-            cb.getBytes(msgHeadPos, dataBuf.array(), 0,
-                    totalDataLen + BIN_MSG_TOTALLEN_SIZE);
-        } else {
+        if (enableTrace) {
+            needRebuild = true;
             // get local address
             String strNode2Ip = null;
             SocketAddress loacalSockAddr = channel.localAddress();
@@ -173,17 +180,37 @@ public class DefaultServiceDecoder implements 
ServiceDecoder {
                             strNode2Ip, loacalSockAddr);
                 }
             }
-            // build trace information
-            int newTotalLen = 0;
-            String traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" + 
msgRcvTime;
-            if (attrLen != 0) {
-                newTotalLen = totalDataLen + traceInfo.length() + "&".length();
-                strAttr = strAttr + "&" + traceInfo;
-            } else {
-                newTotalLen = totalDataLen + traceInfo.length();
-                strAttr = traceInfo;
+            traceInfo = AttributeConstants.DATAPROXY_NODE_IP
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + strNode2Ip
+                    + AttributeConstants.SEPARATOR
+                    + AttributeConstants.DATAPROXY_RCVTIME
+                    + AttributeConstants.KEY_VALUE_SEPARATOR + msgRcvTime;
+        }
+        // rebuild msg attribute
+        ByteBuffer dataBuf;
+        if (needRebuild) {
+            int newTotalLen = totalDataLen;
+            // add rtms attribute
+            if (StringUtils.isNotEmpty(rtMs)) {
+                if (StringUtils.isEmpty(strAttr)) {
+                    newTotalLen += rtMs.length();
+                    strAttr = rtMs;
+                } else {
+                    newTotalLen += AttributeConstants.SEPARATOR.length() + 
rtMs.length();
+                    strAttr = strAttr + AttributeConstants.SEPARATOR + rtMs;
+                }
+            }
+            // add trace attribute
+            if (StringUtils.isNotEmpty(traceInfo)) {
+                if (StringUtils.isEmpty(strAttr)) {
+                    newTotalLen += traceInfo.length();
+                    strAttr = traceInfo;
+                } else {
+                    newTotalLen += AttributeConstants.SEPARATOR.length() + 
traceInfo.length();
+                    strAttr = strAttr + AttributeConstants.SEPARATOR + 
traceInfo;
+                }
             }
-            // build trace information bytes
+            // build message buffer
             dataBuf = ByteBuffer.allocate(newTotalLen + BIN_MSG_TOTALLEN_SIZE);
             cb.getBytes(msgHeadPos, dataBuf.array(), 0,
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE
@@ -191,12 +218,16 @@ public class DefaultServiceDecoder implements 
ServiceDecoder {
             dataBuf.putShort(
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE - 
BIN_MSG_MAGIC_SIZE),
                     (short) strAttr.length());
+            // copy all attributes
             System.arraycopy(strAttr.getBytes(StandardCharsets.UTF_8), 0, 
dataBuf.array(),
                     bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_MAGIC_SIZE),
                     strAttr.length());
             dataBuf.putInt(0, newTotalLen);
             dataBuf.putShort(newTotalLen + BIN_MSG_TOTALLEN_SIZE - 
BIN_MSG_MAGIC_SIZE,
                     (short) 0xee01);
+        } else {
+            dataBuf = ByteBuffer.allocate(totalDataLen + 
BIN_MSG_TOTALLEN_SIZE);
+            cb.getBytes(msgHeadPos, dataBuf.array(), 0, totalDataLen + 
BIN_MSG_TOTALLEN_SIZE);
         }
         return dataBuf;
     }
@@ -269,8 +300,8 @@ public class DefaultServiceDecoder implements 
ServiceDecoder {
             handleDateTime(commonAttrMap, uniq, dataTime, msgCount, 
strRemoteIP, msgRcvTime);
             final boolean isIndexMsg =
                     handleExtMap(commonAttrMap, cb, resultMap, extendField, 
msgHeadPos);
-            ByteBuffer dataBuf = handleTrace(channel, cb, extendField, 
msgHeadPos,
-                    totalDataLen, attrLen, strAttr, bodyLen, msgRcvTime);
+            ByteBuffer dataBuf = handleExtraAppendAttrInfo(commonAttrMap, 
channel, cb,
+                    extendField, msgHeadPos, totalDataLen, attrLen, strAttr, 
bodyLen, msgRcvTime);
             // Check if groupId and streamId are number-to-name
             String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
             String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
@@ -368,6 +399,11 @@ public class DefaultServiceDecoder implements 
ServiceDecoder {
         String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
         long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
         commonAttrMap.put(AttributeConstants.DATA_TIME, 
String.valueOf(longDataTime));
+        // add message report time field
+        if 
(StringUtils.isBlank(commonAttrMap.get(AttributeConstants.MSG_RPT_TIME))) {
+            commonAttrMap.put(AttributeConstants.MSG_RPT_TIME,
+                    String.valueOf(msgRcvTime));
+        }
         commonAttrMap.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
         // check message count attr
         String strMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);

Reply via email to