This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 56263c334 [INLONG-7054][DataProxy] Add the processing of the rtms
field (#7055)
56263c334 is described below
commit 56263c3348ed13bc48be9d1d2a49c5ed89d53991
Author: Goson Zhang <[email protected]>
AuthorDate: Sat Dec 24 21:31:53 2022 +0800
[INLONG-7054][DataProxy] Add the processing of the rtms field (#7055)
---
.../inlong/common/msg/AttributeConstants.java | 13 ++++
.../dataproxy/http/SimpleMessageHandler.java | 4 +-
.../dataproxy/source/DefaultServiceDecoder.java | 76 ++++++++++++++++------
3 files changed, 72 insertions(+), 21 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 0f80b8bfe..a17a59f29 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -96,4 +96,17 @@ public interface AttributeConstants {
String MESSAGE_DP_IP = "dpIP";
String MESSAGE_TOPIC = "topic";
+
+ // dataproxy IP, used in trace info
+ String DATAPROXY_NODE_IP = "node2ip";
+
+ // dataproxy received time, used in trace info
+ String DATAPROXY_RCVTIME = "rtime2";
+
+ // Message reporting time, in milliseconds
+ // Provided by the initial sender of the data, and passed to
+ // the downstream by the Bus without modification for the downstream to
+ // calculate the end-to-end message delay; if this field does not exist in
the request,
+ // it will be added by the Bus with the current time
+ public static final String MSG_RPT_TIME = "rtms";
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index a94a3b72c..c7c63bcae 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -139,7 +139,9 @@ public class SimpleMessageHandler implements MessageHandler
{
.append("&dt=").append(strDataTime)
.append("&NodeIP=").append(strRemoteIP)
.append("&cnt=").append(strMsgCount)
- .append("&rt=").append(msgRcvTime);
+ .append("&rt=").append(msgRcvTime)
+
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
+
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
try {
inLongMsg.addMsg(strBuff.toString(), body.getBytes(charset));
strBuff.delete(0, strBuff.length());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
index a537e09c0..b164ad36a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/DefaultServiceDecoder.java
@@ -150,17 +150,24 @@ public class DefaultServiceDecoder implements
ServiceDecoder {
return index;
}
- private ByteBuffer handleTrace(Channel channel, ByteBuf cb, int
extendField,
+ private ByteBuffer handleExtraAppendAttrInfo(Map<String, String>
commonAttrMap,
+ Channel channel, ByteBuf cb, int extendField,
int msgHeadPos, int totalDataLen, int attrLen,
String strAttr, int bodyLen, long msgRcvTime) {
- // whether enable trace
- ByteBuffer dataBuf;
+ // get and check report time from report node
+ boolean needRebuild = false;
+ String rtMs = "";
+ if
(StringUtils.isBlank(commonAttrMap.get(AttributeConstants.MSG_RPT_TIME))) {
+ needRebuild = true;
+ rtMs = AttributeConstants.MSG_RPT_TIME
+ + AttributeConstants.KEY_VALUE_SEPARATOR
+ + System.currentTimeMillis();
+ }
+ // get trace requirement
+ String traceInfo = "";
boolean enableTrace = (((extendField & 0x2) >> 1) == 0x1);
- if (!enableTrace) {
- dataBuf = ByteBuffer.allocate(totalDataLen +
BIN_MSG_TOTALLEN_SIZE);
- cb.getBytes(msgHeadPos, dataBuf.array(), 0,
- totalDataLen + BIN_MSG_TOTALLEN_SIZE);
- } else {
+ if (enableTrace) {
+ needRebuild = true;
// get local address
String strNode2Ip = null;
SocketAddress loacalSockAddr = channel.localAddress();
@@ -173,17 +180,37 @@ public class DefaultServiceDecoder implements
ServiceDecoder {
strNode2Ip, loacalSockAddr);
}
}
- // build trace information
- int newTotalLen = 0;
- String traceInfo = "node2ip=" + strNode2Ip + "&rtime2=" +
msgRcvTime;
- if (attrLen != 0) {
- newTotalLen = totalDataLen + traceInfo.length() + "&".length();
- strAttr = strAttr + "&" + traceInfo;
- } else {
- newTotalLen = totalDataLen + traceInfo.length();
- strAttr = traceInfo;
+ traceInfo = AttributeConstants.DATAPROXY_NODE_IP
+ + AttributeConstants.KEY_VALUE_SEPARATOR + strNode2Ip
+ + AttributeConstants.SEPARATOR
+ + AttributeConstants.DATAPROXY_RCVTIME
+ + AttributeConstants.KEY_VALUE_SEPARATOR + msgRcvTime;
+ }
+ // rebuild msg attribute
+ ByteBuffer dataBuf;
+ if (needRebuild) {
+ int newTotalLen = totalDataLen;
+ // add rtms attribute
+ if (StringUtils.isNotEmpty(rtMs)) {
+ if (StringUtils.isEmpty(strAttr)) {
+ newTotalLen += rtMs.length();
+ strAttr = rtMs;
+ } else {
+ newTotalLen += AttributeConstants.SEPARATOR.length() +
rtMs.length();
+ strAttr = strAttr + AttributeConstants.SEPARATOR + rtMs;
+ }
+ }
+ // add trace attribute
+ if (StringUtils.isNotEmpty(traceInfo)) {
+ if (StringUtils.isEmpty(strAttr)) {
+ newTotalLen += traceInfo.length();
+ strAttr = traceInfo;
+ } else {
+ newTotalLen += AttributeConstants.SEPARATOR.length() +
traceInfo.length();
+ strAttr = strAttr + AttributeConstants.SEPARATOR +
traceInfo;
+ }
}
- // build trace information bytes
+ // build message buffer
dataBuf = ByteBuffer.allocate(newTotalLen + BIN_MSG_TOTALLEN_SIZE);
cb.getBytes(msgHeadPos, dataBuf.array(), 0,
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE
@@ -191,12 +218,16 @@ public class DefaultServiceDecoder implements
ServiceDecoder {
dataBuf.putShort(
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_ATTRLEN_SIZE -
BIN_MSG_MAGIC_SIZE),
(short) strAttr.length());
+ // copy all attributes
System.arraycopy(strAttr.getBytes(StandardCharsets.UTF_8), 0,
dataBuf.array(),
bodyLen + (BIN_MSG_FORMAT_SIZE - BIN_MSG_MAGIC_SIZE),
strAttr.length());
dataBuf.putInt(0, newTotalLen);
dataBuf.putShort(newTotalLen + BIN_MSG_TOTALLEN_SIZE -
BIN_MSG_MAGIC_SIZE,
(short) 0xee01);
+ } else {
+ dataBuf = ByteBuffer.allocate(totalDataLen +
BIN_MSG_TOTALLEN_SIZE);
+ cb.getBytes(msgHeadPos, dataBuf.array(), 0, totalDataLen +
BIN_MSG_TOTALLEN_SIZE);
}
return dataBuf;
}
@@ -269,8 +300,8 @@ public class DefaultServiceDecoder implements
ServiceDecoder {
handleDateTime(commonAttrMap, uniq, dataTime, msgCount,
strRemoteIP, msgRcvTime);
final boolean isIndexMsg =
handleExtMap(commonAttrMap, cb, resultMap, extendField,
msgHeadPos);
- ByteBuffer dataBuf = handleTrace(channel, cb, extendField,
msgHeadPos,
- totalDataLen, attrLen, strAttr, bodyLen, msgRcvTime);
+ ByteBuffer dataBuf = handleExtraAppendAttrInfo(commonAttrMap,
channel, cb,
+ extendField, msgHeadPos, totalDataLen, attrLen, strAttr,
bodyLen, msgRcvTime);
// Check if groupId and streamId are number-to-name
String groupId = commonAttrMap.get(AttributeConstants.GROUP_ID);
String streamId = commonAttrMap.get(AttributeConstants.STREAM_ID);
@@ -368,6 +399,11 @@ public class DefaultServiceDecoder implements
ServiceDecoder {
String strDataTime = commonAttrMap.get(AttributeConstants.DATA_TIME);
long longDataTime = NumberUtils.toLong(strDataTime, msgRcvTime);
commonAttrMap.put(AttributeConstants.DATA_TIME,
String.valueOf(longDataTime));
+ // add message report time field
+ if
(StringUtils.isBlank(commonAttrMap.get(AttributeConstants.MSG_RPT_TIME))) {
+ commonAttrMap.put(AttributeConstants.MSG_RPT_TIME,
+ String.valueOf(msgRcvTime));
+ }
commonAttrMap.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
// check message count attr
String strMsgCnt = commonAttrMap.get(AttributeConstants.MESSAGE_COUNT);