This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 364bdc5a24 [INLONG-10111][DataProxy] Add auditVersion field processing
(#10112)
364bdc5a24 is described below
commit 364bdc5a2417126a30e39d7de8dd38d7532e0c31
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Apr 30 09:06:40 2024 +0800
[INLONG-10111][DataProxy] Add auditVersion field processing (#10112)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 32 +++++++++++++++++++---
.../source/httpMsg/HttpMessageHandler.java | 8 ++++++
.../dataproxy/source/v0msg/AbsV0MsgCodec.java | 2 ++
.../inlong/dataproxy/source/v0msg/CodecBinMsg.java | 3 ++
.../dataproxy/source/v0msg/CodecTextMsg.java | 3 ++
5 files changed, 44 insertions(+), 4 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 371a4de0c9..899aca38d4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -26,11 +26,14 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import java.util.Map;
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+
/**
* Audit utils
*/
@@ -72,15 +75,17 @@ public class AuditUtils {
if
(event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
msgCount =
Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
}
- AuditOperator.getInstance().add(auditID, inlongGroupId,
- inlongStreamId, logTime, msgCount, event.getBody().length);
+ long auditVersion = getAuditVersion(headers);
+ AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
+ inlongGroupId, inlongStreamId, logTime, msgCount,
event.getBody().length, auditVersion);
} else {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long dataTime =
NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
long msgCount =
NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
- AuditOperator.getInstance().add(auditID, groupId,
- streamId, dataTime, msgCount, event.getBody().length);
+ long auditVersion = getAuditVersion(headers);
+ AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
+ groupId, streamId, dataTime, msgCount,
event.getBody().length, auditVersion);
}
}
@@ -119,6 +124,25 @@ public class AuditUtils {
return msgTime - msgTime %
CommonConfigHolder.getInstance().getAuditFormatInvlMs();
}
+ /**
+ * Get Audit version
+ *
+ * @param headers the message headers
+ *
+ * @return audit version
+ */
+ public static long getAuditVersion(Map<String, String> headers) {
+ String strAuditVersion = headers.get(AttributeConstants.AUDIT_VERSION);
+ if (StringUtils.isNotBlank(strAuditVersion)) {
+ try {
+ return Long.parseLong(strAuditVersion);
+ } catch (Throwable ex) {
+ //
+ }
+ }
+ return -1L;
+ }
+
/**
* Send audit data
*/
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index 89c8e04857..3510b6df78 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
@@ -335,6 +336,8 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
// get message count
int intMsgCnt =
NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
String strMsgCount = String.valueOf(intMsgCnt);
+ // get audit version
+ long auditVersion = AuditUtils.getAuditVersion(reqAttrs);
// build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
strBuff.append("groupId=").append(groupId)
@@ -345,6 +348,10 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
.append("&rt=").append(msgRcvTime)
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
+ if (auditVersion != -1L) {
+
strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION)
+
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
+ }
inLongMsg.addMsg(strBuff.toString(),
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
byte[] inlongMsgData = inLongMsg.buildArray();
long pkgTime = inLongMsg.getCreatetime();
@@ -365,6 +372,7 @@ public class HttpMessageHandler extends
SimpleChannelInboundHandler<FullHttpRequ
MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
eventHeaders.put(ConfigConstants.PKG_TIME_KEY,
String.valueOf(pkgTime));
+ eventHeaders.put(AttributeConstants.AUDIT_VERSION,
String.valueOf(auditVersion));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
try {
source.getCachedChProcessor().processEvent(event);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
index 68e05d4dc2..b6b5d1fdf6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
@@ -68,6 +68,7 @@ public abstract class AbsV0MsgCodec {
protected String msgProcType = "b2b";
protected boolean needResp = true;
protected long msgPkgTime;
+ protected long auditVersion = -1L;
public AbsV0MsgCodec(int totalDataLen, int msgTypeValue,
long msgRcvTime, String strRemoteIP) {
@@ -246,6 +247,7 @@ public abstract class AbsV0MsgCodec {
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime));
+ headers.put(AttributeConstants.AUDIT_VERSION,
String.valueOf(auditVersion));
// add extra key-value information
if (!needResp) {
headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
index 71014d3058..71ade87872 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
@@ -24,6 +24,7 @@ import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;
import io.netty.buffer.ByteBuf;
@@ -157,6 +158,8 @@ public class CodecBinMsg extends AbsV0MsgCodec {
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
attrMap.put(AttributeConstants.MSG_RPT_TIME,
String.valueOf(msgRcvTime));
}
+ // get audit version
+ this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
// get trace requirement
if (this.needTraceMsg) {
if (strBuff.length() > 0) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index 1b06d7e2f0..fc83805900 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -23,6 +23,7 @@ import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;
import io.netty.buffer.ByteBuf;
@@ -188,6 +189,8 @@ public class CodecTextMsg extends AbsV0MsgCodec {
attrMap.put(AttributeConstants.DATA_TIME,
String.valueOf(this.dataTimeMs));
}
}
+ // get audit version
+ this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
// process sequence id
String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID);
if (StringUtils.isNotBlank(sequenceId)) {