This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5b65155 [INLONG-2096] DataProxy add InlongGroupId+InlongStreamId
metric dimensions in TDSDKSource and TubeSink. (#2105)
5b65155 is described below
commit 5b651553cf38a8a8ff0b59e3e808e7ad3a23967a
Author: 卢春亮 <[email protected]>
AuthorDate: Wed Jan 5 17:42:31 2022 +0800
[INLONG-2096] DataProxy add InlongGroupId+InlongStreamId metric dimensions
in TDSDKSource and TubeSink. (#2105)
---
.../org/apache/inlong/dataproxy/sink/MetaSink.java | 50 +++--
.../dataproxy/source/ServerMessageHandler.java | 240 ++++++++++++---------
2 files changed, 165 insertions(+), 125 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 159f860..1590b0e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -386,16 +386,7 @@ public class MetaSink extends AbstractSink implements
Configurable {
if (clientIdCache && clientId != null && lastTime != null &&
lastTime > 0) {
logger.info("{} agent package {} existed,just discard.",
getName(), clientId);
} else {
- Message message = new Message(topic, event.getBody());
- message.setAttrKeyVal("dataproxyip",
NetworkUtils.getLocalIp());
- String streamId = "";
- if
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- streamId =
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
- } else if
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId =
event.getHeaders().get(AttributeConstants.INAME);
- }
- message.putSystemHeader(streamId,
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
+ Message message = this.parseEvent2Message(topic, event);
producer.sendMessage(message, new MyCallback(es));
flag.set(true);
@@ -414,22 +405,39 @@ public class MetaSink extends AbstractSink implements
Configurable {
agentIdCache.put(clientId, System.currentTimeMillis());
}
- Message message = new Message(topic, event.getBody());
- message.setAttrKeyVal("dataproxyip",
NetworkUtils.getLocalIp());
- String streamId = "";
- if
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
- streamId =
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
- } else if
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
- streamId =
event.getHeaders().get(AttributeConstants.INAME);
- }
- message.putSystemHeader(streamId,
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
-
+ Message message = this.parseEvent2Message(topic, event);
producer.sendMessage(message, new MyCallback(es));
flag.set(true);
}
}
illegalTopicMap.remove(topic);
}
+
+ /**
+ * parseEvent2Message
+ * @param topic
+ * @param event
+ * @return
+ */
+ private Message parseEvent2Message(String topic, Event event) {
+ Message message = new Message(topic, event.getBody());
+ message.setAttrKeyVal("dataproxyip", NetworkUtils.getLocalIp());
+ String streamId = "";
+ if
(event.getHeaders().containsKey(AttributeConstants.INTERFACE_ID)) {
+ streamId =
event.getHeaders().get(AttributeConstants.INTERFACE_ID);
+ } else if
(event.getHeaders().containsKey(AttributeConstants.INAME)) {
+ streamId = event.getHeaders().get(AttributeConstants.INAME);
+ }
+ message.putSystemHeader(streamId,
event.getHeaders().get(ConfigConstants.PKG_TIME_KEY));
+ // common attributes
+ Map<String, String> headers = event.getHeaders();
+ message.setAttrKeyVal(Constants.INLONG_GROUP_ID,
headers.get(Constants.INLONG_GROUP_ID));
+ message.setAttrKeyVal(Constants.INLONG_STREAM_ID,
headers.get(Constants.INLONG_STREAM_ID));
+ message.setAttrKeyVal(Constants.TOPIC,
headers.get(Constants.TOPIC));
+ message.setAttrKeyVal(Constants.HEADER_KEY_MSG_TIME,
headers.get(Constants.HEADER_KEY_MSG_TIME));
+ message.setAttrKeyVal(Constants.HEADER_KEY_SOURCE_IP,
headers.get(Constants.HEADER_KEY_SOURCE_IP));
+ return message;
+ }
private void handleException(Throwable t, String topic, boolean
decrementFlag, EventStat es) {
if (t instanceof TubeClientException) {
@@ -585,6 +593,7 @@ public class MetaSink extends AbstractSink implements
Configurable {
} else {
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
}
+ DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
DataProxyMetricItem metricItem =
MetaSink.this.metricItemSet.findMetricItem(dimensions);
if (result) {
metricItem.sendSuccessCount.incrementAndGet();
@@ -678,6 +687,7 @@ public class MetaSink extends AbstractSink implements
Configurable {
} else {
dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
"");
}
+ DataProxyMetricItem.fillInlongId(event, dimensions);
DataProxyMetricItem metricItem =
this.metricItemSet.findMetricItem(dimensions);
metricItem.readFailCount.incrementAndGet();
metricItem.readFailSize.addAndGet(event.getBody().length);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 1f6d87d..97373c9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
@@ -45,10 +46,10 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
-import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.Constants;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -81,20 +82,20 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
.on(AttributeConstants.SEPARATOR)
.trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
- private static final ThreadLocal<SimpleDateFormat> dateFormator =
- new ThreadLocal<SimpleDateFormat>() {
- @Override
- protected SimpleDateFormat initialValue() {
- return new SimpleDateFormat("yyyyMMddHHmm");
- }
- };
- private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
- new ThreadLocal<SimpleDateFormat>() {
- @Override
- protected SimpleDateFormat initialValue() {
- return new SimpleDateFormat("yyyyMMddHHmmss");
- }
- };
+ private static final ThreadLocal<SimpleDateFormat> dateFormator = new
ThreadLocal<SimpleDateFormat>() {
+
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmm");
+ }
+ };
+ private static final ThreadLocal<SimpleDateFormat> dateFormator4Transfer =
new ThreadLocal<SimpleDateFormat>() {
+
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return new SimpleDateFormat("yyyyMMddHHmmss");
+ }
+ };
private AbstractSource source;
private final ChannelGroup allChannels;
private int maxConnections = Integer.MAX_VALUE;
@@ -110,10 +111,10 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
private final DataProxyMetricItemSet metricItemSet;
public ServerMessageHandler(AbstractSource source, ServiceDecoder
serProcessor,
- ChannelGroup allChannels,
- String topic, String attr, Boolean
filterEmptyMsg, Integer maxMsgLength,
- Integer maxCons,
- Boolean isCompressed, String protocolType) {
+ ChannelGroup allChannels,
+ String topic, String attr, Boolean filterEmptyMsg, Integer
maxMsgLength,
+ Integer maxCons,
+ Boolean isCompressed, String protocolType) {
this.source = source;
this.processor = source.getChannelProcessor();
this.serviceProcessor = serProcessor;
@@ -226,7 +227,7 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
}
private void checkGroupIdInfo(ProxyMessage message, Map<String, String>
commonAttrMap,
- Map<String, String> attrMap, AtomicReference<String> topicInfo) {
+ Map<String, String> attrMap, AtomicReference<String> topicInfo) {
String groupId = message.getGroupId();
String streamId = message.getStreamId();
if (null != groupId) {
@@ -234,10 +235,10 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
if ("dc".equals(from)) {
String dcInterfaceId = message.getStreamId();
if (StringUtils.isNotEmpty(dcInterfaceId)
- && configManager.getDcMappingProperties()
- .containsKey(dcInterfaceId.trim())) {
+ && configManager.getDcMappingProperties()
+ .containsKey(dcInterfaceId.trim())) {
groupId = configManager.getDcMappingProperties()
- .get(dcInterfaceId.trim()).trim();
+ .get(dcInterfaceId.trim()).trim();
message.setGroupId(groupId);
}
}
@@ -259,16 +260,17 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
String streamIdNum =
commonAttrMap.get(AttributeConstants.STREAMID_NUM);
if (configManager.getGroupIdMappingProperties() != null
- && configManager.getStreamIdMappingProperties() != null) {
+ && configManager.getStreamIdMappingProperties() != null) {
groupId =
configManager.getGroupIdMappingProperties().get(groupIdNum);
streamId =
(configManager.getStreamIdMappingProperties().get(groupIdNum) == null)
- ? null :
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
+ ? null
+ :
configManager.getStreamIdMappingProperties().get(groupIdNum).get(streamIdNum);
if (groupId != null && streamId != null) {
- String enableTrans =
- (configManager.getGroupIdEnableMappingProperties() ==
null)
- ? null :
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
+ String enableTrans =
(configManager.getGroupIdEnableMappingProperties() == null)
+ ? null
+ :
configManager.getGroupIdEnableMappingProperties().get(groupIdNum);
if (("TRUE".equalsIgnoreCase(enableTrans) && "TRUE"
- .equalsIgnoreCase(num2name))) {
+ .equalsIgnoreCase(num2name))) {
String extraAttr = "groupId=" + groupId + "&" +
"streamId=" + streamId;
message.setData(newBinMsg(message.getData(),
extraAttr));
}
@@ -288,8 +290,8 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
}
private void updateMsgList(List<ProxyMessage> msgList, Map<String, String>
commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP, MsgType msgType) {
for (ProxyMessage message : msgList) {
Map<String, String> attrMap = message.getAttributeMap();
@@ -316,15 +318,15 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
if (groupId != null && streamId != null) {
String tubeSwtichKey = groupId + SEPARATOR + streamId;
if (configManager.getTubeSwitchProperties().get(tubeSwtichKey)
!= null
- && "false".equals(configManager.getTubeSwitchProperties()
- .get(tubeSwtichKey).trim())) {
+ &&
"false".equals(configManager.getTubeSwitchProperties()
+ .get(tubeSwtichKey).trim())) {
continue;
}
}
if (!"pb".equals(attrMap.get(AttributeConstants.MESSAGE_TYPE))
- && !MsgType.MSG_MULTI_BODY.equals(msgType)
- && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
+ && !MsgType.MSG_MULTI_BODY.equals(msgType)
+ && !MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
byte[] data = message.getData();
if (data[data.length - 1] == '\n') {
int tripDataLen = data.length - 1;
@@ -341,16 +343,25 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
streamId = "";
}
HashMap<String, List<ProxyMessage>> streamIdMsgMap = messageMap
- .computeIfAbsent(topic, k -> new HashMap<>());
+ .computeIfAbsent(topic, k -> new HashMap<>());
List<ProxyMessage> streamIdMsgList = streamIdMsgMap
- .computeIfAbsent(streamId, k -> new ArrayList<>());
+ .computeIfAbsent(streamId, k -> new ArrayList<>());
streamIdMsgList.add(message);
}
}
+ /**
+ * formatMessagesAndSend
+ *
+ * @param commonAttrMap
+ * @param messageMap
+ * @param strRemoteIP
+ * @param msgType
+ * @throws MessageIDException
+ */
private void formatMessagesAndSend(Map<String, String> commonAttrMap,
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
- String strRemoteIP, MsgType msgType) throws MessageIDException {
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap,
+ String strRemoteIP, MsgType msgType) throws MessageIDException {
int tdMsgVer = 1;
if (MsgType.MSG_MULTI_BODY_ATTR.equals(msgType)) {
@@ -386,11 +397,9 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
}
}
- if (commonAttrMap.get(AttributeConstants.DATA_TIME) != null) {
- headers.put(AttributeConstants.DATA_TIME,
commonAttrMap.get(AttributeConstants.DATA_TIME));
- } else {
- headers.put(AttributeConstants.DATA_TIME,
String.valueOf(System.currentTimeMillis()));
- }
+ long dtTime =
NumberUtils.toLong(commonAttrMap.get(AttributeConstants.DATA_TIME),
+ System.currentTimeMillis());
+ headers.put(AttributeConstants.DATA_TIME,
String.valueOf(dtTime));
headers.put(ConfigConstants.TOPIC_KEY, topicEntry.getKey());
headers.put(AttributeConstants.INTERFACE_ID,
streamIdEntry.getKey());
@@ -408,49 +417,72 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
StringBuilder sidBuilder = new StringBuilder();
sidBuilder.append(topicEntry.getKey()).append(SEPARATOR).append(streamIdEntry.getKey())
- .append(SEPARATOR).append(sequenceId);
+ .append(SEPARATOR).append(sequenceId);
headers.put(ConfigConstants.SEQUENCE_ID,
sidBuilder.toString());
}
headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
- Event event = EventBuilder.withBody(data, headers);
- long dtten = 0;
- try {
- dtten =
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
- } catch (Exception e1) {
- long uniqVal =
Long.parseLong(commonAttrMap.get(AttributeConstants.UNIQ_ID));
- throw new MessageIDException(uniqVal,
- ErrorCode.DT_ERROR,
- new Throwable("attribute dt=" +
headers.get(AttributeConstants.DATA_TIME
- + " has error, detail is: topic=" +
topicEntry.getKey() + "&streamId="
- + streamIdEntry.getKey() + "&NodeIP=" +
strRemoteIP), e1));
- }
+ // process proxy message list
+ this.processProxyMessageList(headers,
streamIdEntry.getValue());
+ }
+ }
+ }
- dtten = dtten / 1000 / 60 / 10;
- dtten = dtten * 1000 * 60 * 10;
- try {
- processor.processEvent(event);
- this.addMetric(true, data.length);
- } catch (Throwable ex) {
- logger.error("Error writting to channel,data will
discard.", ex);
- this.addMetric(false, data.length);
- throw new ChannelException("ProcessEvent error can't write
event to channel.");
- }
+ /**
+ * processProxyMessageList
+ *
+ * @param commonHeaders
+ * @param proxyMessages
+ */
+ private void processProxyMessageList(Map<String, String> commonHeaders,
+ List<ProxyMessage> proxyMessages) {
+ for (ProxyMessage message : proxyMessages) {
+ Event event = this.parseProxyMessage2Event(commonHeaders, message);
+ try {
+ processor.processEvent(event);
+ this.addMetric(true, message);
+ } catch (Throwable ex) {
+ logger.error("Error writting to channel,data will discard.",
ex);
+ this.addMetric(false, message);
+ throw new ChannelException("ProcessEvent error can't write
event to channel.");
}
}
}
+ /**
+ * parseProxyMessage2Event
+ *
+ * @param commonHeaders
+ * @param proxyMessage
+ * @return
+ */
+ private Event parseProxyMessage2Event(Map<String, String> commonHeaders,
ProxyMessage proxyMessage) {
+ Map<String, String> headers = new HashMap<>();
+ if (proxyMessage.getAttributeMap() != null) {
+ headers.putAll(proxyMessage.getAttributeMap());
+ }
+ headers.putAll(commonHeaders);
+ headers.put(AttributeConstants.MESSAGE_COUNT, "1");
+ headers.put(Constants.INLONG_GROUP_ID, proxyMessage.getGroupId());
+ headers.put(Constants.INLONG_STREAM_ID, proxyMessage.getStreamId());
+ headers.put(Constants.TOPIC, proxyMessage.getTopic());
+ headers.put(Constants.HEADER_KEY_MSG_TIME,
commonHeaders.get(AttributeConstants.DATA_TIME));
+ headers.put(Constants.HEADER_KEY_SOURCE_IP,
commonHeaders.get(AttributeConstants.NODE_IP));
+ Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
+ return event;
+ }
+
private void responsePackage(Map<String, String> commonAttrMap,
- Map<String, Object> resultMap,
- Channel remoteChannel,
- SocketAddress remoteSocketAddress,
- MsgType msgType) throws Exception {
+ Map<String, Object> resultMap,
+ Channel remoteChannel,
+ SocketAddress remoteSocketAddress,
+ MsgType msgType) throws Exception {
if (!commonAttrMap.containsKey("isAck") ||
"true".equals(commonAttrMap.get("isAck"))) {
if (MsgType.MSG_ACK_SERVICE.equals(msgType) ||
MsgType.MSG_ORIGINAL_RETURN
- .equals(msgType)
- || MsgType.MSG_MULTI_BODY.equals(msgType) ||
MsgType.MSG_MULTI_BODY_ATTR
- .equals(msgType)) {
+ .equals(msgType)
+ || MsgType.MSG_MULTI_BODY.equals(msgType) ||
MsgType.MSG_MULTI_BODY_ATTR
+ .equals(msgType)) {
byte[] backAttr =
mapJoiner.join(commonAttrMap).getBytes(StandardCharsets.UTF_8);
byte[] backBody = null;
@@ -475,14 +507,14 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
} else {
String backAttrStr = new String(backAttr,
StandardCharsets.UTF_8);
logger.warn(
- "the send buffer1 is full, so disconnect it!please
check remote client"
- + "; Connection info:"
- + remoteChannel + ";attr is " + backAttrStr);
+ "the send buffer1 is full, so disconnect
it!please check remote client"
+ + "; Connection info:"
+ + remoteChannel + ";attr is " +
backAttrStr);
throw new Exception(new Throwable(
- "the send buffer1 is full, so disconnect it!please
check remote client"
- +
- "; Connection info:" + remoteChannel + ";attr
is "
- + backAttrStr));
+ "the send buffer1 is full, so disconnect
it!please check remote client"
+ +
+ "; Connection info:" + remoteChannel +
";attr is "
+ + backAttrStr));
}
}
} else if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
@@ -520,12 +552,12 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
remoteChannel.write(binBuffer, remoteSocketAddress);
} else {
logger.warn(
- "the send buffer2 is full, so disconnect it!please
check remote client"
- + "; Connection info:" + remoteChannel + ";attr is
"
- + backattrs);
+ "the send buffer2 is full, so disconnect it!please
check remote client"
+ + "; Connection info:" + remoteChannel +
";attr is "
+ + backattrs);
throw new Exception(new Throwable(
- "the send buffer2 is full,so disconnect it!please
check remote client, Connection info:"
- + remoteChannel + ";attr is " + backattrs));
+ "the send buffer2 is full,so disconnect it!please
check remote client, Connection info:"
+ + remoteChannel + ";attr is " +
backattrs));
}
}
}
@@ -536,7 +568,7 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
logger.info("message received");
if (e == null) {
logger.error("get null messageevent, just skip");
- this.addMetric(false, 0);
+ this.addMetric(false, null);
return;
}
ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
@@ -546,7 +578,7 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
if (len == 0 && this.filterEmptyMsg) {
logger.warn("skip empty msg.");
cb.clear();
- this.addMetric(false, 0);
+ this.addMetric(false, null);
return;
}
@@ -555,32 +587,31 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
try {
resultMap = serviceProcessor.extractData(cb, remoteChannel);
} catch (MessageIDException ex) {
- this.addMetric(false, 0);
+ this.addMetric(false, null);
throw new IOException(ex.getCause());
}
if (resultMap == null) {
logger.info("result is null");
- this.addMetric(false, 0);
+ this.addMetric(false, null);
return;
}
MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
- this.addMetric(false, 0);
+ this.addMetric(false, null);
return;
}
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
// ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
// remoteChannel.write(binBuffer, remoteSocketAddress);
- this.addMetric(false, 0);
+ this.addMetric(false, null);
return;
}
- Map<String, String> commonAttrMap =
- (Map<String, String>)
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
+ Map<String, String> commonAttrMap = (Map<String, String>)
resultMap.get(ConfigConstants.COMMON_ATTR_MAP);
if (commonAttrMap == null) {
commonAttrMap = new HashMap<String, String>();
}
@@ -589,9 +620,7 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
if (msgList != null
&& !commonAttrMap.containsKey(ConfigConstants.FILE_CHECK_DATA)
&&
!commonAttrMap.containsKey(ConfigConstants.MINUTE_CHECK_DATA)) {
- Map<String, HashMap<String, List<ProxyMessage>>> messageMap =
- new HashMap<String, HashMap<String, List<ProxyMessage>>>(
- msgList.size());
+ Map<String, HashMap<String, List<ProxyMessage>>> messageMap = new
HashMap<>(msgList.size());
updateMsgList(msgList, commonAttrMap, messageMap, strRemoteIP,
msgType);
@@ -609,10 +638,10 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
- this.addMetric(true, body.length);
+ this.addMetric(true, message);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will
discard.", ex);
- this.addMetric(false, body.length);
+ this.addMetric(false, message);
throw new ChannelException(
"Process Controller Event error can't write event
to channel.");
}
@@ -630,10 +659,10 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
- this.addMetric(true, body.length);
+ this.addMetric(true, message);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will
discard.", ex);
- this.addMetric(false, body.length);
+ this.addMetric(false, message);
throw new ChannelException(
"Process Controller Event error can't write event
to channel.");
}
@@ -680,15 +709,16 @@ public class ServerMessageHandler extends
SimpleChannelHandler {
* addMetric
*
* @param result
- * @param size
+ * @param msg
*/
- private void addMetric(boolean result, long size) {
+ private void addMetric(boolean result, ProxyMessage msg) {
+ int size = (msg == null) ? 0 : msg.getData().length;
Map<String, String> dimensions = new HashMap<>();
dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID,
source.getName());
- dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
- dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID,
msg.getGroupId());
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID,
msg.getStreamId());
long msgTime = System.currentTimeMillis();
long auditFormatTime = msgTime - msgTime %
CommonPropertiesHolder.getAuditFormatInterval();
dimensions.put(DataProxyMetricItem.KEY_MESSAGE_TIME,
String.valueOf(auditFormatTime));