This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 364bdc5a24 [INLONG-10111][DataProxy] Add auditVersion field processing 
(#10112)
364bdc5a24 is described below

commit 364bdc5a2417126a30e39d7de8dd38d7532e0c31
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Apr 30 09:06:40 2024 +0800

    [INLONG-10111][DataProxy] Add auditVersion field processing (#10112)
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/dataproxy/metrics/audit/AuditUtils.java | 32 +++++++++++++++++++---
 .../source/httpMsg/HttpMessageHandler.java         |  8 ++++++
 .../dataproxy/source/v0msg/AbsV0MsgCodec.java      |  2 ++
 .../inlong/dataproxy/source/v0msg/CodecBinMsg.java |  3 ++
 .../dataproxy/source/v0msg/CodecTextMsg.java       |  3 ++
 5 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 371a4de0c9..899aca38d4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -26,11 +26,14 @@ import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.utils.Constants;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.Event;
 
 import java.util.Map;
 
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+
 /**
  * Audit utils
  */
@@ -72,15 +75,17 @@ public class AuditUtils {
             if 
(event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
                 msgCount = 
Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
             }
-            AuditOperator.getInstance().add(auditID, inlongGroupId,
-                    inlongStreamId, logTime, msgCount, event.getBody().length);
+            long auditVersion = getAuditVersion(headers);
+            AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
+                    inlongGroupId, inlongStreamId, logTime, msgCount, 
event.getBody().length, auditVersion);
         } else {
             String groupId = headers.get(AttributeConstants.GROUP_ID);
             String streamId = headers.get(AttributeConstants.STREAM_ID);
             long dataTime = 
NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
             long msgCount = 
NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
-            AuditOperator.getInstance().add(auditID, groupId,
-                    streamId, dataTime, msgCount, event.getBody().length);
+            long auditVersion = getAuditVersion(headers);
+            AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
+                    groupId, streamId, dataTime, msgCount, 
event.getBody().length, auditVersion);
         }
     }
 
@@ -119,6 +124,25 @@ public class AuditUtils {
         return msgTime - msgTime % 
CommonConfigHolder.getInstance().getAuditFormatInvlMs();
     }
 
+    /**
+     * Get Audit version
+     *
+     * @param headers  the message headers
+     *
+     * @return audit version
+     */
+    public static long getAuditVersion(Map<String, String> headers) {
+        String strAuditVersion = headers.get(AttributeConstants.AUDIT_VERSION);
+        if (StringUtils.isNotBlank(strAuditVersion)) {
+            try {
+                return Long.parseLong(strAuditVersion);
+            } catch (Throwable ex) {
+                //
+            }
+        }
+        return -1L;
+    }
+
     /**
      * Send audit data
      */
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
index 89c8e04857..3510b6df78 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/httpMsg/HttpMessageHandler.java
@@ -26,6 +26,7 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.consts.HttpAttrConst;
 import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.BaseSource;
 import org.apache.inlong.dataproxy.utils.AddressUtils;
 import org.apache.inlong.sdk.commons.protocol.EventConstants;
@@ -335,6 +336,8 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
         // get message count
         int intMsgCnt = 
NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
         String strMsgCount = String.valueOf(intMsgCnt);
+        // get audit version
+        long auditVersion = AuditUtils.getAuditVersion(reqAttrs);
         // build message attributes
         InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
         strBuff.append("groupId=").append(groupId)
@@ -345,6 +348,10 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
                 .append("&rt=").append(msgRcvTime)
                 
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
                 
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
+        if (auditVersion != -1L) {
+            
strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION)
+                    
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
+        }
         inLongMsg.addMsg(strBuff.toString(), 
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
         byte[] inlongMsgData = inLongMsg.buildArray();
         long pkgTime = inLongMsg.getCreatetime();
@@ -365,6 +372,7 @@ public class HttpMessageHandler extends 
SimpleChannelInboundHandler<FullHttpRequ
                 MessageWrapType.INLONG_MSG_V0.getStrId());
         eventHeaders.put(AttributeConstants.RCV_TIME, 
String.valueOf(msgRcvTime));
         eventHeaders.put(ConfigConstants.PKG_TIME_KEY, 
String.valueOf(pkgTime));
+        eventHeaders.put(AttributeConstants.AUDIT_VERSION, 
String.valueOf(auditVersion));
         Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
         try {
             source.getCachedChProcessor().processEvent(event);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
index 68e05d4dc2..b6b5d1fdf6 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/AbsV0MsgCodec.java
@@ -68,6 +68,7 @@ public abstract class AbsV0MsgCodec {
     protected String msgProcType = "b2b";
     protected boolean needResp = true;
     protected long msgPkgTime;
+    protected long auditVersion = -1L;
 
     public AbsV0MsgCodec(int totalDataLen, int msgTypeValue,
             long msgRcvTime, String strRemoteIP) {
@@ -246,6 +247,7 @@ public abstract class AbsV0MsgCodec {
         headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
         headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
         headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime));
+        headers.put(AttributeConstants.AUDIT_VERSION, 
String.valueOf(auditVersion));
         // add extra key-value information
         if (!needResp) {
             headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
index 71014d3058..71ade87872 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecBinMsg.java
@@ -24,6 +24,7 @@ import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.dataproxy.base.SinkRspEvent;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.BaseSource;
 
 import io.netty.buffer.ByteBuf;
@@ -157,6 +158,8 @@ public class CodecBinMsg extends AbsV0MsgCodec {
                     
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
             attrMap.put(AttributeConstants.MSG_RPT_TIME, 
String.valueOf(msgRcvTime));
         }
+        // get audit version
+        this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
         // get trace requirement
         if (this.needTraceMsg) {
             if (strBuff.length() > 0) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
index 1b06d7e2f0..fc83805900 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/v0msg/CodecTextMsg.java
@@ -23,6 +23,7 @@ import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.StatConstants;
+import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
 import org.apache.inlong.dataproxy.source.BaseSource;
 
 import io.netty.buffer.ByteBuf;
@@ -188,6 +189,8 @@ public class CodecTextMsg extends AbsV0MsgCodec {
                 attrMap.put(AttributeConstants.DATA_TIME, 
String.valueOf(this.dataTimeMs));
             }
         }
+        // get audit version
+        this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
         // process sequence id
         String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID);
         if (StringUtils.isNotBlank(sequenceId)) {

Reply via email to