This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b65155  [INLONG-2096] DataProxy add InlongGroupId+InlongStreamId 
metric dimensions in TDSDKSource and TubeSink. (#2105)
5b65155 is described below

commit 5b651553cf38a8a8ff0b59e3e808e7ad3a23967a
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Jan 5 17:42:31 2022 +0800

    [INLONG-2096] DataProxy add InlongGroupId+InlongStreamId metric dimensions 
in TDSDKSource and TubeSink. (#2105)
---
 .../org/apache/inlong/dataproxy/sink/MetaSink.java |  50 +++--
 .../dataproxy/source/ServerMessageHandler.java     | 240 ++++++++++++---------
 2 files changed, 165 insertions(+), 125 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 159f860..1590b0e 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -386,16 +386,7 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                 if (clientIdCache && clientId != null && lastTime != null && 
lastTime > 0) {
                     logger.info("{} agent package {} existed,just discard.", 
getName(), clientId);
                 } else {
-                    Message message = new Message(topic, event.getBody());
-                    message.setAttrKeyVal("dataproxyip", 
NetworkUtils.getLocalIp());
-                    String streamId = "";
-                    if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                        streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
-                    } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                        streamId = 
event.getHeaders().get(AttributeConstants.INAME);
-                    }
-                    message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
+                    Message message = this.parseEvent2Message(topic, event);
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
 
@@ -414,22 +405,39 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                         agentIdCache.put(clientId, System.currentTimeMillis());
                     }
 
-                    Message message = new Message(topic, event.getBody());
-                    message.setAttrKeyVal("dataproxyip", 
NetworkUtils.getLocalIp());
-                    String streamId = "";
-                    if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                        streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
-                    } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                        streamId = 
event.getHeaders().get(AttributeConstants.INAME);
-                    }
-                    message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
+                    Message message = this.parseEvent2Message(topic, event);
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
                 }
             }
             illegalTopicMap.remove(topic);
         }
+        
+        /**
+         * parseEvent2Message
+         * @param topic
+         * @param event
+         * @return
+         */
+        private Message parseEvent2Message(String topic, Event event) {
+            Message message = new Message(topic, event.getBody());
+            message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
+            String streamId = "";
+            if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
+                streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+            } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
+                streamId = event.getHeaders().get(AttributeConstants.INAME);
+            }
+            message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+            // common attributes
+            Map<String, String> headers = event.getHeaders();
+            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, 
headers.get(Constants.INLONG_GROUP_ID));
+            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, 
headers.get(Constants.INLONG_STREAM_ID));
+            message.setAttrKeyVal(Constants.TOPIC, 
headers.get(Constants.TOPIC));
+            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, 
headers.get(Constants.HEADER_KEY_MSG_TIME));
+            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, 
headers.get(Constants.HEADER_KEY_SOURCE_IP));
+            return message;
+        }
 
         private void handleException(Throwable t, String topic, boolean 
decrementFlag, EventStat es) {
             if (t instanceof TubeClientException) {
@@ -585,6 +593,7 @@ public class MetaSink extends AbstractSink implements 
Configurable {
             } else {
                 dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
             }
+            DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
             DataProxyMetricItem metricItem = 
MetaSink.this.metricItemSet.findMetricItem(dimensions);
             if (result) {
                 metricItem.sendSuccessCount.incrementAndGet();
@@ -678,6 +687,7 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                     } else {
                         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, 
"");
                     }
+                    DataProxyMetricItem.fillInlongId(event, dimensions);
                     DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 1f6d87d..97373c9 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
@@ -45,10 +46,10 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.Constants;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -81,20 +82,20 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             .on(AttributeConstants.SEPARATOR)
             
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final ThreadLocal<SimpleDateFormat> dateFormator =
-            new ThreadLocal<SimpleDateFormat>() {
-                @Override
-                protected SimpleDateFormat initialValue() {
-                    return new SimpleDateFormat("yyyyMMddHHmm");
-                }
-            };
-    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
-            new ThreadLocal<SimpleDateFormat>() {
-                @Override
-                protected SimpleDateFormat initialValue() {
-                    return new SimpleDateFormat("yyyyMMddHHmmss");
-                }
-            };
+    private static final ThreadLocal<SimpleDateFormat> dateFormator = new 
ThreadLocal<SimpleDateFormat>() {
+
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmm");
+        }
+    };
+    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = 
new ThreadLocal<SimpleDateFormat>() {
+
+        @Override
+        protected SimpleDateFormat initialValue() {
+            return new SimpleDateFormat("yyyyMMddHHmmss");
+        }
+    };
     private AbstractSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
@@ -110,10 +111,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     private final DataProxyMetricItemSet metricItemSet;
 
     public ServerMessageHandler(AbstractSource source, ServiceDecoder 
serProcessor,
-                                ChannelGroup allChannels,
-                                String topic, String attr, Boolean 
filterEmptyMsg, Integer maxMsgLength,
-                                Integer maxCons,
-                                Boolean isCompressed, String protocolType) {
+            ChannelGroup allChannels,
+            String topic, String attr, Boolean filterEmptyMsg, Integer 
maxMsgLength,
+            Integer maxCons,
+            Boolean isCompressed, String protocolType) {
         this.source = source;
         this.processor = source.getChannelProcessor();
         this.serviceProcessor = serProcessor;
@@ -226,7 +227,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     }
 
     private void checkGroupIdInfo(ProxyMessage message, Map<String, String> 
commonAttrMap,
-        Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+            Map<String, String> attrMap, AtomicReference<String> topicInfo) {
         String groupId = message.getGroupId();
         String streamId = message.getStreamId();
         if (null != groupId) {
@@ -234,10 +235,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             if ("dc".equals(from)) {
                 String dcInterfaceId = message.getStreamId();
                 if (StringUtils.isNotEmpty(dcInterfaceId)
-                    && configManager.getDcMappingProperties()
-                    .containsKey(dcInterfaceId.trim())) {
+                        && configManager.getDcMappingProperties()
+                                .containsKey(dcInterfaceId.trim())) {
                     groupId = configManager.getDcMappingProperties()
-                        .get(dcInterfaceId.trim()).trim();
+                            .get(dcInterfaceId.trim()).trim();
                     message.setGroupId(groupId);
                 }
             }
@@ -259,16 +260,17 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             String streamIdNum = 
commonAttrMap.get(AttributeConstants.STREAMID_NUM);
 
             if (configManager.getGroupIdMappingProperties() != null
-                && configManager.getStreamIdMappingProperties() != null) {
+                    && configManager.getStreamIdMappingProperties() != null) {
                 groupId = 
configManager.getGroupIdMappingProperties().get(groupIdNum);
                 streamId = 
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                    ? null : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                        ? null
+                        : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
                 if (groupId != null && streamId != null) {
-                    String enableTrans =
-                        (configManager.getGroupIdEnableMappingProperties() == 
null)
-                            ? null : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+                    String enableTrans = 
(configManager.getGroupIdEnableMappingProperties() == null)
+                            ? null
+                            : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
                     if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
-                        .equalsIgnoreCase(num2name))) {
+                            .equalsIgnoreCase(num2name))) {
                         String extraAttr = "groupId=" + groupId + "&" + 
"streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), 
extraAttr));
                     }
@@ -288,8 +290,8 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     }
 
     private void updateMsgList(List<ProxyMessage> msgList, Map<String, String> 
commonAttrMap,
-        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-        String strRemoteIP, MsgType msgType) {
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+            String strRemoteIP, MsgType msgType) {
         for (ProxyMessage message : msgList) {
             Map<String, String> attrMap = message.getAttributeMap();
 
@@ -316,15 +318,15 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             if (groupId != null && streamId != null) {
                 String tubeSwtichKey = groupId + SEPARATOR + streamId;
                 if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) 
!= null
-                    && "false".equals(configManager.getTubeSwitchProperties()
-                    .get(tubeSwtichKey).trim())) {
+                        && 
"false".equals(configManager.getTubeSwitchProperties()
+                                .get(tubeSwtichKey).trim())) {
                     continue;
                 }
             }
 
             if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
-                && !MsgType.MSG_MULTI_BODY.equals(msgType)
-                && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
+                    && !MsgType.MSG_MULTI_BODY.equals(msgType)
+                    && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                 byte[] data = message.getData();
                 if (data[data.length - 1] == '\n') {
                     int tripDataLen = data.length - 1;
@@ -341,16 +343,25 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 streamId = "";
             }
             HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
-                .computeIfAbsent(topic, k -> new HashMap<>());
+                    .computeIfAbsent(topic, k -> new HashMap<>());
             List<ProxyMessage> streamIdMsgList = streamIdMsgMap
-                .computeIfAbsent(streamId, k -> new ArrayList<>());
+                    .computeIfAbsent(streamId, k -> new ArrayList<>());
             streamIdMsgList.add(message);
         }
     }
 
+    /**
+     * formatMessagesAndSend
+     * 
+     * @param  commonAttrMap
+     * @param  messageMap
+     * @param  strRemoteIP
+     * @param  msgType
+     * @throws MessageIDException
+     */
     private void formatMessagesAndSend(Map<String, String> commonAttrMap,
-        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-        String strRemoteIP, MsgType msgType) throws MessageIDException {
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+            String strRemoteIP, MsgType msgType) throws MessageIDException {
 
         int tdMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -386,11 +397,9 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     }
                 }
 
-                if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
-                    headers.put(AttributeConstants.DATA_TIME, 
commonAttrMap.get(AttributeConstants.DATA_TIME));
-                } else {
-                    headers.put(AttributeConstants.DATA_TIME, 
String.valueOf(System.currentTimeMillis()));
-                }
+                long dtTime = 
NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
+                        System.currentTimeMillis());
+                headers.put(AttributeConstants.DATA_TIME, 
String.valueOf(dtTime));
 
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
                 headers.put(AttributeConstants.INTERFACE_ID, 
streamIdEntry.getKey());
@@ -408,49 +417,72 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
 
                     StringBuilder sidBuilder = new StringBuilder();
                     
sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
-                        .append(SEPARATOR).append(sequenceId);
+                            .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, 
sidBuilder.toString());
                 }
 
                 headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
-                Event event = EventBuilder.withBody(data, headers);
 
-                long dtten = 0;
-                try {
-                    dtten = 
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
-                } catch (Exception e1) {
-                    long uniqVal = 
Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
-                    throw new MessageIDException(uniqVal,
-                        ErrorCode.DT_ERROR,
-                        new Throwable("attribute dt=" + 
headers.get(AttributeConstants.DATA_TIME
-                            + " has error, detail is: topic=" + 
topicEntry.getKey() + "&streamId="
-                            + streamIdEntry.getKey() + "&NodeIP=" + 
strRemoteIP), e1));
-                }
+                // process proxy message list
+                this.processProxyMessageList(headers, 
streamIdEntry.getValue());
+            }
+        }
+    }
 
-                dtten = dtten / 1000 / 60 / 10;
-                dtten = dtten * 1000 * 60 * 10;
-                try {
-                    processor.processEvent(event);
-                    this.addMetric(true, data.length);
-                } catch (Throwable ex) {
-                    logger.error("Error writting to channel,data will 
discard.", ex);
-                    this.addMetric(false, data.length);
-                    throw new ChannelException("ProcessEvent error can't write 
event to channel.");
-                }
+    /**
+     * processProxyMessageList
+     * 
+     * @param commonHeaders
+     * @param proxyMessages
+     */
+    private void processProxyMessageList(Map<String, String> commonHeaders,
+            List<ProxyMessage> proxyMessages) {
+        for (ProxyMessage message : proxyMessages) {
+            Event event = this.parseProxyMessage2Event(commonHeaders, message);
+            try {
+                processor.processEvent(event);
+                this.addMetric(true, message);
+            } catch (Throwable ex) {
+                logger.error("Error writting to channel,data will discard.", 
ex);
+                this.addMetric(false, message);
+                throw new ChannelException("ProcessEvent error can't write 
event to channel.");
             }
         }
     }
 
+    /**
+     * parseProxyMessage2Event
+     * 
+     * @param  commonHeaders
+     * @param  proxyMessage
+     * @return
+     */
+    private Event parseProxyMessage2Event(Map<String, String> commonHeaders, 
ProxyMessage proxyMessage) {
+        Map<String, String> headers = new HashMap<>();
+        if (proxyMessage.getAttributeMap() != null) {
+            headers.putAll(proxyMessage.getAttributeMap());
+        }
+        headers.putAll(commonHeaders);
+        headers.put(AttributeConstants.MESSAGE_COUNT, "1");
+        headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
+        headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
+        headers.put(Constants.TOPIC, proxyMessage.getTopic());
+        headers.put(Constants.HEADER_KEY_MSG_TIME, 
commonHeaders.get(AttributeConstants.DATA_TIME));
+        headers.put(Constants.HEADER_KEY_SOURCE_IP, 
commonHeaders.get(AttributeConstants.NODE_IP));
+        Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
+        return event;
+    }
+
     private void responsePackage(Map<String, String> commonAttrMap,
-        Map<String, Object> resultMap,
-        Channel remoteChannel,
-        SocketAddress remoteSocketAddress,
-        MsgType msgType) throws Exception {
+            Map<String, Object> resultMap,
+            Channel remoteChannel,
+            SocketAddress remoteSocketAddress,
+            MsgType msgType) throws Exception {
         if (!commonAttrMap.containsKey("isAck") || 
"true".equals(commonAttrMap.get("isAck"))) {
             if (MsgType.MSG_ACK_SERVICE.equals(msgType) || 
MsgType.MSG_ORIGINAL_RETURN
-                .equals(msgType)
-                || MsgType.MSG_MULTI_BODY.equals(msgType) || 
MsgType.MSG_MULTI_BODY_ATTR
-                .equals(msgType)) {
+                    .equals(msgType)
+                    || MsgType.MSG_MULTI_BODY.equals(msgType) || 
MsgType.MSG_MULTI_BODY_ATTR
+                            .equals(msgType)) {
                 byte[] backAttr = 
mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
                 byte[] backBody = null;
 
@@ -475,14 +507,14 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     } else {
                         String backAttrStr = new String(backAttr, 
StandardCharsets.UTF_8);
                         logger.warn(
-                            "the send buffer1 is full, so disconnect it!please 
check remote client"
-                                + "; Connection info:"
-                                + remoteChannel + ";attr is " + backAttrStr);
+                                "the send buffer1 is full, so disconnect 
it!please check remote client"
+                                        + "; Connection info:"
+                                        + remoteChannel + ";attr is " + 
backAttrStr);
                         throw new Exception(new Throwable(
-                            "the send buffer1 is full, so disconnect it!please 
check remote client"
-                                +
-                                "; Connection info:" + remoteChannel + ";attr 
is "
-                                + backAttrStr));
+                                "the send buffer1 is full, so disconnect 
it!please check remote client"
+                                        +
+                                        "; Connection info:" + remoteChannel + 
";attr is "
+                                        + backAttrStr));
                     }
                 }
             } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
@@ -520,12 +552,12 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     remoteChannel.write(binBuffer, remoteSocketAddress);
                 } else {
                     logger.warn(
-                        "the send buffer2 is full, so disconnect it!please 
check remote client"
-                            + "; Connection info:" + remoteChannel + ";attr is 
"
-                            + backattrs);
+                            "the send buffer2 is full, so disconnect it!please 
check remote client"
+                                    + "; Connection info:" + remoteChannel + 
";attr is "
+                                    + backattrs);
                     throw new Exception(new Throwable(
-                        "the send buffer2 is full,so disconnect it!please 
check remote client, Connection info:"
-                            + remoteChannel + ";attr is " + backattrs));
+                            "the send buffer2 is full,so disconnect it!please 
check remote client, Connection info:"
+                                    + remoteChannel + ";attr is " + 
backattrs));
                 }
             }
         }
@@ -536,7 +568,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         logger.info("message received");
         if (e == null) {
             logger.error("get null messageevent, just skip");
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             return;
         }
         ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
@@ -546,7 +578,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         if (len == 0 && this.filterEmptyMsg) {
             logger.warn("skip empty msg.");
             cb.clear();
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             return;
         }
 
@@ -555,32 +587,31 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         try {
             resultMap = serviceProcessor.extractData(cb, remoteChannel);
         } catch (MessageIDException ex) {
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             throw new IOException(ex.getCause());
         }
 
         if (resultMap == null) {
             logger.info("result is null");
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             return;
         }
 
         MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
         if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
             remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             return;
         }
 
         if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
 //            ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
 //            remoteChannel.write(binBuffer, remoteSocketAddress);
-            this.addMetric(false, 0);
+            this.addMetric(false, null);
             return;
         }
 
-        Map<String, String> commonAttrMap =
-                (Map<String, String>) 
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+        Map<String, String> commonAttrMap = (Map<String, String>) 
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
         if (commonAttrMap == null) {
             commonAttrMap = new HashMap<String, String>();
         }
@@ -589,9 +620,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         if (msgList != null
                 && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
                 && 
!commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
-                    new HashMap<String, HashMap<String, List<ProxyMessage>>>(
-                    msgList.size());
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new 
HashMap<>(msgList.size());
 
             updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, 
msgType);
 
@@ -609,10 +638,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
-                    this.addMetric(true, body.length);
+                    this.addMetric(true, message);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will 
discard.", ex);
-                    this.addMetric(false, body.length);
+                    this.addMetric(false, message);
                     throw new ChannelException(
                             "Process Controller Event error can't write event 
to channel.");
                 }
@@ -630,10 +659,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
-                    this.addMetric(true, body.length);
+                    this.addMetric(true, message);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will 
discard.", ex);
-                    this.addMetric(false, body.length);
+                    this.addMetric(false, message);
                     throw new ChannelException(
                             "Process Controller Event error can't write event 
to channel.");
                 }
@@ -680,15 +709,16 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
      * addMetric
      * 
      * @param result
-     * @param size
+     * @param msg
      */
-    private void addMetric(boolean result, long size) {
+    private void addMetric(boolean result, ProxyMessage msg) {
+        int size = (msg == null) ? 0 : msg.getData().length;
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, 
source.getName());
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, 
msg.getGroupId());
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, 
msg.getStreamId());
         long msgTime = System.currentTimeMillis();
         long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
         dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));

Reply via email to