This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5238a39  [INLONG-2144] InlongDataProxy recover TDMsg1 protocol of 
TDSDKSource and MetaSink (#2145)
5238a39 is described below

commit 5238a393976cc5126dae5fc8dbfdf4d40a2847e8
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Jan 12 12:47:45 2022 +0800

    [INLONG-2144] InlongDataProxy recover TDMsg1 protocol of TDSDKSource and 
MetaSink (#2145)
    
    * [INLONG-2144] InlongDataProxy recover TDMsg1 protocol of TDSDKSource and 
MetaSink.
---
 .../org/apache/inlong/dataproxy/sink/MetaSink.java |  50 ++---
 .../dataproxy/source/ServerMessageHandler.java     | 245 +++++++++------------
 2 files changed, 125 insertions(+), 170 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 1590b0e..159f860 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -386,7 +386,16 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                 if (clientIdCache && clientId != null && lastTime != null && 
lastTime > 0) {
                     logger.info("{} agent package {} existed,just discard.", 
getName(), clientId);
                 } else {
-                    Message message = this.parseEvent2Message(topic, event);
+                    Message message = new Message(topic, event.getBody());
+                    message.setAttrKeyVal("dataproxyip", 
NetworkUtils.getLocalIp());
+                    String streamId = "";
+                    if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
+                        streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+                    } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
+                        streamId = 
event.getHeaders().get(AttributeConstants.INAME);
+                    }
+                    message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
 
@@ -405,39 +414,22 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                         agentIdCache.put(clientId, System.currentTimeMillis());
                     }
 
-                    Message message = this.parseEvent2Message(topic, event);
+                    Message message = new Message(topic, event.getBody());
+                    message.setAttrKeyVal("dataproxyip", 
NetworkUtils.getLocalIp());
+                    String streamId = "";
+                    if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
+                        streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+                    } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
+                        streamId = 
event.getHeaders().get(AttributeConstants.INAME);
+                    }
+                    message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+
                     producer.sendMessage(message, new MyCallback(es));
                     flag.set(true);
                 }
             }
             illegalTopicMap.remove(topic);
         }
-        
-        /**
-         * parseEvent2Message
-         * @param topic
-         * @param event
-         * @return
-         */
-        private Message parseEvent2Message(String topic, Event event) {
-            Message message = new Message(topic, event.getBody());
-            message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
-            String streamId = "";
-            if 
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
-                streamId = 
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
-            } else if 
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
-                streamId = event.getHeaders().get(AttributeConstants.INAME);
-            }
-            message.putSystemHeader(streamId, 
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-            // common attributes
-            Map<String, String> headers = event.getHeaders();
-            message.setAttrKeyVal(Constants.INLONG_GROUP_ID, 
headers.get(Constants.INLONG_GROUP_ID));
-            message.setAttrKeyVal(Constants.INLONG_STREAM_ID, 
headers.get(Constants.INLONG_STREAM_ID));
-            message.setAttrKeyVal(Constants.TOPIC, 
headers.get(Constants.TOPIC));
-            message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME, 
headers.get(Constants.HEADER_KEY_MSG_TIME));
-            message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP, 
headers.get(Constants.HEADER_KEY_SOURCE_IP));
-            return message;
-        }
 
         private void handleException(Throwable t, String topic, boolean 
decrementFlag, EventStat es) {
             if (t instanceof TubeClientException) {
@@ -593,7 +585,6 @@ public class MetaSink extends AbstractSink implements 
Configurable {
             } else {
                 dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
             }
-            DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
             DataProxyMetricItem metricItem = 
MetaSink.this.metricItemSet.findMetricItem(dimensions);
             if (result) {
                 metricItem.sendSuccessCount.incrementAndGet();
@@ -687,7 +678,6 @@ public class MetaSink extends AbstractSink implements 
Configurable {
                     } else {
                         dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, 
"");
                     }
-                    DataProxyMetricItem.fillInlongId(event, dimensions);
                     DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
                     metricItem.readFailCount.incrementAndGet();
                     metricItem.readFailSize.addAndGet(event.getBody().length);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 97373c9..eada0fe 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.math.NumberUtils;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Event;
 import org.apache.flume.channel.ChannelProcessor;
@@ -43,13 +42,12 @@ import org.apache.flume.source.AbstractSource;
 import org.apache.inlong.commons.msg.TDMsg1;
 import org.apache.inlong.dataproxy.base.ProxyMessage;
 import org.apache.inlong.dataproxy.config.ConfigManager;
-import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.exception.ErrorCode;
 import org.apache.inlong.dataproxy.exception.MessageIDException;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
-import org.apache.inlong.dataproxy.utils.Constants;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -70,7 +68,6 @@ import com.google.common.base.Splitter;
  *
  */
 public class ServerMessageHandler extends SimpleChannelHandler {
-
     private static final Logger logger = 
LoggerFactory.getLogger(ServerMessageHandler.class);
 
     private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
@@ -82,20 +79,20 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             .on(AttributeConstants.SEPARATOR)
             
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
-    private static final ThreadLocal<SimpleDateFormat> dateFormator = new 
ThreadLocal<SimpleDateFormat>() {
-
-        @Override
-        protected SimpleDateFormat initialValue() {
-            return new SimpleDateFormat("yyyyMMddHHmm");
-        }
-    };
-    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer = 
new ThreadLocal<SimpleDateFormat>() {
-
-        @Override
-        protected SimpleDateFormat initialValue() {
-            return new SimpleDateFormat("yyyyMMddHHmmss");
-        }
-    };
+    private static final ThreadLocal<SimpleDateFormat> dateFormator =
+            new ThreadLocal<SimpleDateFormat>() {
+                @Override
+                protected SimpleDateFormat initialValue() {
+                    return new SimpleDateFormat("yyyyMMddHHmm");
+                }
+            };
+    private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
+            new ThreadLocal<SimpleDateFormat>() {
+                @Override
+                protected SimpleDateFormat initialValue() {
+                    return new SimpleDateFormat("yyyyMMddHHmmss");
+                }
+            };
     private AbstractSource source;
     private final ChannelGroup allChannels;
     private int maxConnections = Integer.MAX_VALUE;
@@ -111,10 +108,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     private final DataProxyMetricItemSet metricItemSet;
 
     public ServerMessageHandler(AbstractSource source, ServiceDecoder 
serProcessor,
-            ChannelGroup allChannels,
-            String topic, String attr, Boolean filterEmptyMsg, Integer 
maxMsgLength,
-            Integer maxCons,
-            Boolean isCompressed, String protocolType) {
+                                ChannelGroup allChannels,
+                                String topic, String attr, Boolean 
filterEmptyMsg, Integer maxMsgLength,
+                                Integer maxCons,
+                                Boolean isCompressed, String protocolType) {
         this.source = source;
         this.processor = source.getChannelProcessor();
         this.serviceProcessor = serProcessor;
@@ -227,7 +224,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     }
 
     private void checkGroupIdInfo(ProxyMessage message, Map<String, String> 
commonAttrMap,
-            Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+        Map<String, String> attrMap, AtomicReference<String> topicInfo) {
         String groupId = message.getGroupId();
         String streamId = message.getStreamId();
         if (null != groupId) {
@@ -235,10 +232,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             if ("dc".equals(from)) {
                 String dcInterfaceId = message.getStreamId();
                 if (StringUtils.isNotEmpty(dcInterfaceId)
-                        && configManager.getDcMappingProperties()
-                                .containsKey(dcInterfaceId.trim())) {
+                    && configManager.getDcMappingProperties()
+                    .containsKey(dcInterfaceId.trim())) {
                     groupId = configManager.getDcMappingProperties()
-                            .get(dcInterfaceId.trim()).trim();
+                        .get(dcInterfaceId.trim()).trim();
                     message.setGroupId(groupId);
                 }
             }
@@ -260,17 +257,16 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             String streamIdNum = 
commonAttrMap.get(AttributeConstants.STREAMID_NUM);
 
             if (configManager.getGroupIdMappingProperties() != null
-                    && configManager.getStreamIdMappingProperties() != null) {
+                && configManager.getStreamIdMappingProperties() != null) {
                 groupId = 
configManager.getGroupIdMappingProperties().get(groupIdNum);
                 streamId = 
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
-                        ? null
-                        : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+                    ? null : 
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
                 if (groupId != null && streamId != null) {
-                    String enableTrans = 
(configManager.getGroupIdEnableMappingProperties() == null)
-                            ? null
-                            : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+                    String enableTrans =
+                        (configManager.getGroupIdEnableMappingProperties() == 
null)
+                            ? null : 
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
                     if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
-                            .equalsIgnoreCase(num2name))) {
+                        .equalsIgnoreCase(num2name))) {
                         String extraAttr = "groupId=" + groupId + "&" + 
"streamId=" + streamId;
                         message.setData(newBinMsg(message.getData(), 
extraAttr));
                     }
@@ -290,8 +286,8 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
     }
 
     private void updateMsgList(List<ProxyMessage> msgList, Map<String, String> 
commonAttrMap,
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-            String strRemoteIP, MsgType msgType) {
+        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+        String strRemoteIP, MsgType msgType) {
         for (ProxyMessage message : msgList) {
             Map<String, String> attrMap = message.getAttributeMap();
 
@@ -318,15 +314,15 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
             if (groupId != null && streamId != null) {
                 String tubeSwtichKey = groupId + SEPARATOR + streamId;
                 if (configManager.getTubeSwitchProperties().get(tubeSwtichKey) 
!= null
-                        && 
"false".equals(configManager.getTubeSwitchProperties()
-                                .get(tubeSwtichKey).trim())) {
+                    && "false".equals(configManager.getTubeSwitchProperties()
+                    .get(tubeSwtichKey).trim())) {
                     continue;
                 }
             }
 
             if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
-                    && !MsgType.MSG_MULTI_BODY.equals(msgType)
-                    && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
+                && !MsgType.MSG_MULTI_BODY.equals(msgType)
+                && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
                 byte[] data = message.getData();
                 if (data[data.length - 1] == '\n') {
                     int tripDataLen = data.length - 1;
@@ -343,25 +339,16 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 streamId = "";
             }
             HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
-                    .computeIfAbsent(topic, k -> new HashMap<>());
+                .computeIfAbsent(topic, k -> new HashMap<>());
             List<ProxyMessage> streamIdMsgList = streamIdMsgMap
-                    .computeIfAbsent(streamId, k -> new ArrayList<>());
+                .computeIfAbsent(streamId, k -> new ArrayList<>());
             streamIdMsgList.add(message);
         }
     }
 
-    /**
-     * formatMessagesAndSend
-     * 
-     * @param  commonAttrMap
-     * @param  messageMap
-     * @param  strRemoteIP
-     * @param  msgType
-     * @throws MessageIDException
-     */
     private void formatMessagesAndSend(Map<String, String> commonAttrMap,
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
-            String strRemoteIP, MsgType msgType) throws MessageIDException {
+        Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+        String strRemoteIP, MsgType msgType) throws MessageIDException {
 
         int tdMsgVer = 1;
         if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -397,9 +384,11 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     }
                 }
 
-                long dtTime = 
NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
-                        System.currentTimeMillis());
-                headers.put(AttributeConstants.DATA_TIME, 
String.valueOf(dtTime));
+                if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
+                    headers.put(AttributeConstants.DATA_TIME, 
commonAttrMap.get(AttributeConstants.DATA_TIME));
+                } else {
+                    headers.put(AttributeConstants.DATA_TIME, 
String.valueOf(System.currentTimeMillis()));
+                }
 
                 headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
                 headers.put(AttributeConstants.INTERFACE_ID, 
streamIdEntry.getKey());
@@ -417,72 +406,49 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
 
                     StringBuilder sidBuilder = new StringBuilder();
                     
sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
-                            .append(SEPARATOR).append(sequenceId);
+                        .append(SEPARATOR).append(sequenceId);
                     headers.put(ConfigConstants.SEQUENCE_ID, 
sidBuilder.toString());
                 }
 
                 headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
+                Event event = EventBuilder.withBody(data, headers);
 
-                // process proxy message list
-                this.processProxyMessageList(headers, 
streamIdEntry.getValue());
-            }
-        }
-    }
+                long dtten = 0;
+                try {
+                    dtten = 
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
+                } catch (Exception e1) {
+                    long uniqVal = 
Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
+                    throw new MessageIDException(uniqVal,
+                        ErrorCode.DT_ERROR,
+                        new Throwable("attribute dt=" + 
headers.get(AttributeConstants.DATA_TIME
+                            + " has error, detail is: topic=" + 
topicEntry.getKey() + "&streamId="
+                            + streamIdEntry.getKey() + "&NodeIP=" + 
strRemoteIP), e1));
+                }
 
-    /**
-     * processProxyMessageList
-     * 
-     * @param commonHeaders
-     * @param proxyMessages
-     */
-    private void processProxyMessageList(Map<String, String> commonHeaders,
-            List<ProxyMessage> proxyMessages) {
-        for (ProxyMessage message : proxyMessages) {
-            Event event = this.parseProxyMessage2Event(commonHeaders, message);
-            try {
-                processor.processEvent(event);
-                this.addMetric(true, message);
-            } catch (Throwable ex) {
-                logger.error("Error writting to channel,data will discard.", 
ex);
-                this.addMetric(false, message);
-                throw new ChannelException("ProcessEvent error can't write 
event to channel.");
+                dtten = dtten / 1000 / 60 / 10;
+                dtten = dtten * 1000 * 60 * 10;
+                try {
+                    processor.processEvent(event);
+                    this.addMetric(true, data.length);
+                } catch (Throwable ex) {
+                    logger.error("Error writting to channel,data will 
discard.", ex);
+                    this.addMetric(false, data.length);
+                    throw new ChannelException("ProcessEvent error can't write 
event to channel.");
+                }
             }
         }
     }
 
-    /**
-     * parseProxyMessage2Event
-     * 
-     * @param  commonHeaders
-     * @param  proxyMessage
-     * @return
-     */
-    private Event parseProxyMessage2Event(Map<String, String> commonHeaders, 
ProxyMessage proxyMessage) {
-        Map<String, String> headers = new HashMap<>();
-        if (proxyMessage.getAttributeMap() != null) {
-            headers.putAll(proxyMessage.getAttributeMap());
-        }
-        headers.putAll(commonHeaders);
-        headers.put(AttributeConstants.MESSAGE_COUNT, "1");
-        headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
-        headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
-        headers.put(Constants.TOPIC, proxyMessage.getTopic());
-        headers.put(Constants.HEADER_KEY_MSG_TIME, 
commonHeaders.get(AttributeConstants.DATA_TIME));
-        headers.put(Constants.HEADER_KEY_SOURCE_IP, 
commonHeaders.get(AttributeConstants.NODE_IP));
-        Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
-        return event;
-    }
-
     private void responsePackage(Map<String, String> commonAttrMap,
-            Map<String, Object> resultMap,
-            Channel remoteChannel,
-            SocketAddress remoteSocketAddress,
-            MsgType msgType) throws Exception {
+        Map<String, Object> resultMap,
+        Channel remoteChannel,
+        SocketAddress remoteSocketAddress,
+        MsgType msgType) throws Exception {
         if (!commonAttrMap.containsKey("isAck") || 
"true".equals(commonAttrMap.get("isAck"))) {
             if (MsgType.MSG_ACK_SERVICE.equals(msgType) || 
MsgType.MSG_ORIGINAL_RETURN
-                    .equals(msgType)
-                    || MsgType.MSG_MULTI_BODY.equals(msgType) || 
MsgType.MSG_MULTI_BODY_ATTR
-                            .equals(msgType)) {
+                .equals(msgType)
+                || MsgType.MSG_MULTI_BODY.equals(msgType) || 
MsgType.MSG_MULTI_BODY_ATTR
+                .equals(msgType)) {
                 byte[] backAttr = 
mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
                 byte[] backBody = null;
 
@@ -507,14 +473,14 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     } else {
                         String backAttrStr = new String(backAttr, 
StandardCharsets.UTF_8);
                         logger.warn(
-                                "the send buffer1 is full, so disconnect 
it!please check remote client"
-                                        + "; Connection info:"
-                                        + remoteChannel + ";attr is " + 
backAttrStr);
+                            "the send buffer1 is full, so disconnect it!please 
check remote client"
+                                + "; Connection info:"
+                                + remoteChannel + ";attr is " + backAttrStr);
                         throw new Exception(new Throwable(
-                                "the send buffer1 is full, so disconnect 
it!please check remote client"
-                                        +
-                                        "; Connection info:" + remoteChannel + 
";attr is "
-                                        + backAttrStr));
+                            "the send buffer1 is full, so disconnect it!please 
check remote client"
+                                +
+                                "; Connection info:" + remoteChannel + ";attr 
is "
+                                + backAttrStr));
                     }
                 }
             } else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
@@ -552,12 +518,12 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                     remoteChannel.write(binBuffer, remoteSocketAddress);
                 } else {
                     logger.warn(
-                            "the send buffer2 is full, so disconnect it!please 
check remote client"
-                                    + "; Connection info:" + remoteChannel + 
";attr is "
-                                    + backattrs);
+                        "the send buffer2 is full, so disconnect it!please 
check remote client"
+                            + "; Connection info:" + remoteChannel + ";attr is 
"
+                            + backattrs);
                     throw new Exception(new Throwable(
-                            "the send buffer2 is full,so disconnect it!please 
check remote client, Connection info:"
-                                    + remoteChannel + ";attr is " + 
backattrs));
+                        "the send buffer2 is full,so disconnect it!please 
check remote client, Connection info:"
+                            + remoteChannel + ";attr is " + backattrs));
                 }
             }
         }
@@ -568,7 +534,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         logger.info("message received");
         if (e == null) {
             logger.error("get null messageevent, just skip");
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             return;
         }
         ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
@@ -578,7 +544,7 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         if (len == 0 && this.filterEmptyMsg) {
             logger.warn("skip empty msg.");
             cb.clear();
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             return;
         }
 
@@ -587,31 +553,32 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         try {
             resultMap = serviceProcessor.extractData(cb, remoteChannel);
         } catch (MessageIDException ex) {
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             throw new IOException(ex.getCause());
         }
 
         if (resultMap == null) {
             logger.info("result is null");
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             return;
         }
 
         MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
         if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
             remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             return;
         }
 
         if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
 //            ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
 //            remoteChannel.write(binBuffer, remoteSocketAddress);
-            this.addMetric(false, null);
+            this.addMetric(false, 0);
             return;
         }
 
-        Map<String, String> commonAttrMap = (Map<String, String>) 
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+        Map<String, String> commonAttrMap =
+                (Map<String, String>) 
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
         if (commonAttrMap == null) {
             commonAttrMap = new HashMap<String, String>();
         }
@@ -620,7 +587,9 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
         if (msgList != null
                 && !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
                 && 
!commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
-            Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new 
HashMap<>(msgList.size());
+            Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
+                    new HashMap<String, HashMap<String, List<ProxyMessage>>>(
+                    msgList.size());
 
             updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP, 
msgType);
 
@@ -638,10 +607,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
-                    this.addMetric(true, message);
+                    this.addMetric(true, body.length);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will 
discard.", ex);
-                    this.addMetric(false, message);
+                    this.addMetric(false, body.length);
                     throw new ChannelException(
                             "Process Controller Event error can't write event 
to channel.");
                 }
@@ -659,10 +628,10 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
                 Event event = EventBuilder.withBody(body, headers);
                 try {
                     processor.processEvent(event);
-                    this.addMetric(true, message);
+                    this.addMetric(true, body.length);
                 } catch (Throwable ex) {
                     logger.error("Error writing to controller,data will 
discard.", ex);
-                    this.addMetric(false, message);
+                    this.addMetric(false, body.length);
                     throw new ChannelException(
                             "Process Controller Event error can't write event 
to channel.");
                 }
@@ -709,19 +678,15 @@ public class ServerMessageHandler extends 
SimpleChannelHandler {
      * addMetric
      * 
      * @param result
-     * @param msg
+     * @param size
      */
-    private void addMetric(boolean result, ProxyMessage msg) {
-        int size = (msg == null) ? 0 : msg.getData().length;
+    private void addMetric(boolean result, long size) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
         dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, 
source.getName());
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, 
msg.getGroupId());
-        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, 
msg.getStreamId());
-        long msgTime = System.currentTimeMillis();
-        long auditFormatTime = msgTime - msgTime % 
CommonPropertiesHolder.getAuditFormatInterval();
-        dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME, 
String.valueOf(auditFormatTime));
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
+        dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
         DataProxyMetricItem metricItem = 
this.metricItemSet.findMetricItem(dimensions);
         if (result) {
             metricItem.readSuccessCount.incrementAndGet();

Reply via email to