This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 370815119 [INLONG-8084][DataProxy] Add abnormal event statistics in
sink (#8085)
370815119 is described below
commit 370815119d7d8898bbf13febc5b70628893ef43f
Author: Goson Zhang <[email protected]>
AuthorDate: Thu May 25 14:05:29 2023 +0800
[INLONG-8084][DataProxy] Add abnormal event statistics in sink (#8085)
---
.../dataproxy/config/CommonConfigHolder.java | 86 ++++++++++++++++++++++
.../inlong/dataproxy/consts/StatConstants.java | 27 ++++---
.../inlong/dataproxy/sink/common/SinkContext.java | 38 ++++++++++
.../sink/mq/MessageQueueZoneSinkContext.java | 3 +
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 43 +++++------
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 28 +++++--
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 36 +++++++--
.../inlong/dataproxy/source2/BaseSource.java | 38 ++++------
.../inlong/dataproxy/source2/SourceConstants.java | 11 ---
.../source2/httpMsg/InLongHttpMsgHandler.java | 3 +-
10 files changed, 227 insertions(+), 86 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d6f3aec76..71a514444 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -75,6 +75,26 @@ public class CommonConfigHolder {
// whether enable whitelist, optional field.
public static final String KEY_ENABLE_WHITELIST = "proxy.enable.whitelist";
public static final boolean VAL_DEF_ENABLE_WHITELIST = false;
+ // whether enable file metric, optional field.
+ public static final String KEY_ENABLE_FILE_METRIC = "file.metric.enable";
+ public static final boolean VAL_DEF_ENABLE_FILE_METRIC = true;
+ // file metric statistic interval (second)
+ public static final String KEY_FILE_METRIC_STAT_INTERVAL_SEC =
"file.metric.stat.interval.sec";
+ public static final int VAL_DEF_FILE_METRIC_STAT_INVL_SEC = 60;
+ public static final int VAL_MIN_FILE_METRIC_STAT_INVL_SEC = 0;
+ // file metric max statistic key count
+ public static final String KEY_FILE_METRIC_MAX_CACHE_CNT =
"file.metric.max.cache.cnt";
+ public static final int VAL_DEF_FILE_METRIC_MAX_CACHE_CNT = 1000000;
+ public static final int VAL_MIN_FILE_METRIC_MAX_CACHE_CNT = 0;
+ // source metric statistic name
+ public static final String KEY_FILE_METRIC_SOURCE_OUTPUT_NAME =
"file.metric.source.output.name";
+ public static final String VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME =
"Source";
+ // sink metric statistic name
+ public static final String KEY_FILE_METRIC_SINK_OUTPUT_NAME =
"file.metric.sink.output.name";
+ public static final String VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME = "Sink";
+ // event metric statistic name
+ public static final String KEY_FILE_METRIC_EVENT_OUTPUT_NAME =
"file.metric.event.output.name";
+ public static final String VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME =
"DataProxy_monitors";
// Audit fields
public static final String KEY_ENABLE_AUDIT = "audit.enable";
public static final boolean VAL_DEF_ENABLE_AUDIT = true;
@@ -140,6 +160,12 @@ public class CommonConfigHolder {
private String proxyNodeId = VAL_DEF_PROXY_NODE_ID;
private String msgCompressType = VAL_DEF_MSG_COMPRESS_TYPE;
private int prometheusHttpPort = VAL_DEF_PROMETHEUS_HTTP_PORT;
+ private boolean enableFileMetric = VAL_DEF_ENABLE_FILE_METRIC;
+ private int fileMetricStatInvlSec = VAL_DEF_FILE_METRIC_STAT_INVL_SEC;
+ private int fileMetricStatCacheCnt = VAL_DEF_FILE_METRIC_MAX_CACHE_CNT;
+ private String fileMetricSourceOutName =
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
+ private String fileMetricSinkOutName =
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
+ private String fileMetricEventOutName =
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
/**
* get instance for common.properties config manager
@@ -232,6 +258,18 @@ public class CommonConfigHolder {
return enableAudit;
}
+ public boolean isEnableFileMetric() {
+ return enableFileMetric;
+ }
+
+ public int getFileMetricStatInvlSec() {
+ return fileMetricStatInvlSec;
+ }
+
+ public int getFileMetricStatCacheCnt() {
+ return fileMetricStatCacheCnt;
+ }
+
public HashSet<String> getAuditProxys() {
return auditProxys;
}
@@ -280,6 +318,18 @@ public class CommonConfigHolder {
return msgCompressType;
}
+ public String getFileMetricSourceOutName() {
+ return fileMetricSourceOutName;
+ }
+
+ public String getFileMetricSinkOutName() {
+ return fileMetricSinkOutName;
+ }
+
+ public String getFileMetricEventOutName() {
+ return fileMetricEventOutName;
+ }
+
private void preReadFields() {
String tmpValue;
// read cluster tag
@@ -331,6 +381,42 @@ public class CommonConfigHolder {
if (StringUtils.isNotBlank(tmpValue)) {
this.managerAuthSecretKey = tmpValue.trim();
}
+ // read whether enable file metric
+ tmpValue = this.props.get(KEY_ENABLE_FILE_METRIC);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableFileMetric = "TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read file metric statistic interval
+ tmpValue = this.props.get(KEY_FILE_METRIC_STAT_INTERVAL_SEC);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ int statInvl = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_FILE_METRIC_STAT_INVL_SEC);
+ if (statInvl >= VAL_MIN_FILE_METRIC_MAX_CACHE_CNT) {
+ this.fileMetricStatInvlSec = statInvl;
+ }
+ }
+ // read file metric statistic max cache count
+ tmpValue = this.props.get(KEY_FILE_METRIC_MAX_CACHE_CNT);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ int maxCacheCnt = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_FILE_METRIC_MAX_CACHE_CNT);
+ if (maxCacheCnt >= VAL_MIN_FILE_METRIC_STAT_INVL_SEC) {
+ this.fileMetricStatCacheCnt = maxCacheCnt;
+ }
+ }
+ // read source file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_SOURCE_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricSourceOutName = tmpValue.trim();
+ }
+ // read sink file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_SINK_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricSinkOutName = tmpValue.trim();
+ }
+ // read event file statistic output name
+ tmpValue = this.props.get(KEY_FILE_METRIC_EVENT_OUTPUT_NAME);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ this.fileMetricEventOutName = tmpValue.trim();
+ }
// read whether enable audit
tmpValue = this.props.get(KEY_ENABLE_AUDIT);
if (StringUtils.isNotEmpty(tmpValue)) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 6dbc4f14b..c445cadd3 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -23,17 +23,6 @@ public class StatConstants {
public static final java.lang.String EVENT_SERVICE_UNREADY =
"sink.unready";
public static final java.lang.String EVENT_VISITIP_ILLEGAL =
"links.illegal";
public static final java.lang.String EVENT_NOTOPIC = "config.notopic";
-
- public static final java.lang.String METASINK_SUCCESS = "metasink.success";
- public static final java.lang.String METASINK_DROPPED = "metasink.dropped";
- public static final java.lang.String METASINK_RETRY = "metasink.retry";
- public static final java.lang.String METASINK_OTHEREXP =
"metasink.otherexp";
- public static final java.lang.String METASINK_NOTOPIC = "metasink.notopic";
- public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
- public static final java.lang.String METASINK_MSG_NOTOPIC =
"metasink.msgnotopic";
- public static final java.lang.String METASINK_PROCESS_SPEED =
"metasink.process.speed";
- public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
- public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
// source
public static final java.lang.String EVENT_LINKS_OVERMAX = "links.overmax";
public static final java.lang.String EVENT_LINKS_IN = "links.linkin";
@@ -72,6 +61,22 @@ public class StatConstants {
public static final java.lang.String EVENT_HTTP_POST_SUCCESS =
"httpmsg.success";
public static final java.lang.String EVENT_HTTP_POST_DROPPED =
"httpmsg.dropped";
+ public static final java.lang.String EVENT_SINK_NOUID = "sink.nouid";
+ public static final java.lang.String EVENT_SINK_NOTOPIC = "sink.notopic";
+ public static final java.lang.String EVENT_SINK_NOPRODUCER =
"sink.noproducer";
+ public static final java.lang.String EVENT_SINK_SENDEXCEPT =
"sink.sendexcept";
+ public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
+ public static final java.lang.String EVENT_SINK_FAILDROPPED =
"sink.dropped";
+ public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
+ public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT =
"sink.rcvexcept";
+
+ public static final java.lang.String METASINK_OTHEREXP =
"metasink.otherexp";
+ public static final java.lang.String METASINK_NOSLAVE = "metasink.noslave";
+ public static final java.lang.String METASINK_MSG_NOTOPIC =
"metasink.msgnotopic";
+ public static final java.lang.String METASINK_PROCESS_SPEED =
"metasink.process.speed";
+ public static final java.lang.String EVENT_OTHEREXP = "socketmsg.otherexp";
+ public static final java.lang.String EVENT_INVALID = "socketmsg.invalid";
+
public static final java.lang.String AGENT_MESSAGES_SENT_SUCCESS =
"agent.messages.success";
public static final java.lang.String AGENT_PACKAGES_SENT_SUCCESS =
"agent.packages.success";
public static final java.lang.String MSG_COUNTER_KEY = "msgcnt";
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 8641d0606..95e00d382 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -21,8 +21,11 @@ import org.apache.commons.lang.ClassUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.monitor.MonitorIndex;
+import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -59,6 +62,9 @@ public class SinkContext {
//
protected final DataProxyMetricItemSet metricItemSet;
protected Timer reloadTimer;
+ // file metric statistic
+ protected MonitorIndex monitorIndex = null;
+ private MonitorIndexExt monitorIndexExt = null;
/**
* Constructor
@@ -80,6 +86,17 @@ public class SinkContext {
* start
*/
public void start() {
+ // init monitor logic
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+ this.monitorIndexExt = new MonitorIndexExt(
+
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+ + AttrConstants.SEP_HASHTAG + this.getSinkName(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
+ }
try {
this.reload();
this.setReloadTimer();
@@ -97,6 +114,27 @@ public class SinkContext {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
+ // stop file statistic index
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (monitorIndex != null) {
+ monitorIndex.shutDown();
+ }
+ if (monitorIndexExt != null) {
+ monitorIndexExt.shutDown();
+ }
+ }
+ }
+
+ public void fileMetricEventInc(String eventKey) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorIndexExt.incrementAndGet(eventKey);
+ }
+ }
+
+ public void fileMetricRecordAdd(String key, int cnt, int packCnt, long
packSize, int failCnt) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
+ }
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 8308af8ec..2f9d54375 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.SinkContext;
@@ -271,9 +272,11 @@ public class MessageQueueZoneSinkContext extends
SinkContext {
public void processSendFail(BatchPackProfile currentRecord, String mqName,
String topic, long sendTime) {
if (currentRecord.isResend()) {
dispatchQueue.offer(currentRecord);
+ fileMetricEventInc(StatConstants.EVENT_SINK_FAILRETRY);
this.addSendResultMetric(currentRecord, mqName, topic, false,
sendTime);
} else {
currentRecord.fail();
+ fileMetricEventInc(StatConstants.EVENT_SINK_FAILDROPPED);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index bce84aba9..dc3bc94a9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -23,6 +23,7 @@ import org.apache.flume.Context;
import org.apache.inlong.common.constant.Constants;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -111,20 +112,24 @@ public class KafkaHandler implements MessageQueueHandler {
// idConfig
IdTopicConfig idConfig =
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
if (idConfig == null) {
+ sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
String baseTopic = idConfig.getTopicName();
if (baseTopic == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
String topic = getProducerTopic(baseTopic, idConfig);
-
// create producer failed
if (producer == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
sinkContext.processSendFail(event, clusterName, topic, 0);
return false;
}
@@ -138,8 +143,9 @@ public class KafkaHandler implements MessageQueueHandler {
}
return true;
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+ LOG.error(e.getMessage(), e);
return false;
}
}
@@ -188,14 +194,11 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- if (event.isResend()) {
- sinkContext.processSendFail(event, clusterName, topic,
sendTime);
- } else {
- event.fail();
- }
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+ sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ LOG.error("Send ProfileV1 to Kafka failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -235,14 +238,11 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- if (event.isResend()) {
- sinkContext.processSendFail(event, clusterName, topic,
sendTime);
- } else {
- event.fail();
- }
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+ sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ LOG.error("Send SimpleProfileV0 to Kafka failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -279,14 +279,11 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
- if (event.isResend()) {
- sinkContext.processSendFail(event, clusterName, topic,
sendTime);
- } else {
- event.fail();
- }
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+ sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ LOG.error("Send OrderProfileV0 to Kafka failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index b500485d7..35205ee62 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -21,8 +21,10 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
+import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
@@ -58,6 +60,8 @@ import static
org.apache.inlong.dataproxy.consts.ConfigConstants.KEY_STATS_INTER
public class PulsarHandler implements MessageQueueHandler {
public static final Logger LOG =
LoggerFactory.getLogger(PulsarHandler.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
public static final String KEY_TENANT = "tenant";
public static final String KEY_NAMESPACE = "namespace";
@@ -194,19 +198,24 @@ public class PulsarHandler implements MessageQueueHandler
{
// idConfig
IdTopicConfig idConfig =
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
if (idConfig == null) {
+ sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
String baseTopic = idConfig.getTopicName();
if (baseTopic == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
// topic
String producerTopic = this.getProducerTopic(baseTopic, idConfig);
if (producerTopic == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
event.fail();
@@ -236,6 +245,7 @@ public class PulsarHandler implements MessageQueueHandler {
}
// create producer failed
if (producer == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
sinkContext.processSendFail(event, clusterName, producerTopic,
0);
return false;
}
@@ -249,8 +259,9 @@ public class PulsarHandler implements MessageQueueHandler {
}
return true;
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+ LOG.error(e.getMessage(), e);
return false;
}
}
@@ -321,10 +332,11 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic,
sendTime);
+ LOG.error("Send ProfileV1 to Pulsar failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -354,10 +366,11 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic,
sendTime);
+ LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -383,10 +396,11 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, producerTopic,
sendTime);
+ LOG.error("Send OrderProfileV0 to Pulsar failure", ex);
} else {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index ecd1ddb36..cad19c2ee 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -19,9 +19,11 @@ package org.apache.inlong.dataproxy.sink.mq.tube;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
+import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
@@ -49,6 +51,9 @@ import java.util.Set;
public class TubeHandler implements MessageQueueHandler {
public static final Logger LOG =
LoggerFactory.getLogger(TubeHandler.class);
+ // log print count
+ private static final LogCounter logCounter = new LogCounter(10, 100000, 30
* 1000);
+
private static String MASTER_HOST_PORT_LIST = "master-host-port-list";
public static final String KEY_NAMESPACE = "namespace";
@@ -172,20 +177,25 @@ public class TubeHandler implements MessageQueueHandler {
// idConfig
IdTopicConfig idConfig =
sinkContext.getIdTopicHolder().getIdConfig(event.getUid());
if (idConfig == null) {
+ sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOUID);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
String topic = getTubeTopic(idConfig);
if (topic == null) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOTOPIC);
sinkContext.addSendResultMetric(event, clusterName,
event.getUid(), false, 0);
sinkContext.getDispatchQueue().release(event.getSize());
+ event.fail();
return false;
}
// create producer failed
if (producer == null) {
- LOG.error("producer is null");
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_NOPRODUCER);
sinkContext.processSendFail(event, clusterName, topic, 0);
+ LOG.error("producer is null");
return false;
}
// publish
@@ -203,8 +213,9 @@ public class TubeHandler implements MessageQueueHandler {
}
return true;
} catch (Exception e) {
- LOG.error(e.getMessage(), e);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SENDEXCEPT);
sinkContext.processSendFail(event, clusterName, event.getUid(), 0);
+ LOG.error(e.getMessage(), e);
return false;
}
}
@@ -238,6 +249,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -245,9 +257,11 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send ProfileV1 to tube failure", ex);
+ }
}
};
producer.sendMessage(message, callback);
@@ -268,6 +282,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -275,9 +290,11 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send SimpleProfileV0 to tube failure", ex);
+ }
}
};
producer.sendMessage(message, callback);
@@ -304,6 +321,7 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(event, clusterName, topic,
true, sendTime);
sinkContext.getDispatchQueue().release(event.getSize());
event.ack();
@@ -312,9 +330,11 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onException(Throwable ex) {
- LOG.error("Send fail:{}", ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+
sinkContext.fileMetricEventInc(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
sinkContext.processSendFail(event, clusterName, topic,
sendTime);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send OrderProfileV0 to tube failure", ex);
+ }
}
};
producer.sendMessage(message, callback);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
index d843f8b1e..7a86784c6 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/BaseSource.java
@@ -33,6 +33,7 @@ import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.dataproxy.admin.ProxyServiceMBean;
import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -109,13 +110,10 @@ public abstract class BaseSource
// send buffer size
protected int maxSendBufferSize;
// file metric statistic
- protected boolean fileMetricOn;
- protected int monitorStatInvlSec;
- protected int maxMonitorStatCnt;
protected MonitorIndex monitorIndex = null;
+ private MonitorIndexExt monitorIndexExt = null;
// metric set
protected DataProxyMetricItemSet metricItemSet;
- private MonitorIndexExt monitorIndexExt = null;
public BaseSource() {
super();
@@ -195,27 +193,12 @@ public abstract class BaseSource
SourceConstants.SRCCXT_MAX_READ_IDLE_TIME_MS + " must be in ["
+ SourceConstants.VAL_MIN_READ_IDLE_TIME_MS + ", "
+ SourceConstants.VAL_MAX_READ_IDLE_TIME_MS + "]");
- // get file metric statistic
- this.monitorStatInvlSec = ConfStringUtils.getIntValue(context,
- SourceConstants.SRCCXT_STAT_INTERVAL_SEC,
SourceConstants.VAL_DEF_STAT_INVL_SEC);
- Preconditions.checkArgument((this.monitorStatInvlSec >=
SourceConstants.VAL_MIN_STAT_INVL_SEC),
- SourceConstants.SRCCXT_STAT_INTERVAL_SEC + " must be >= "
- + SourceConstants.VAL_MIN_STAT_INVL_SEC);
- // get max monitor key count
- this.maxMonitorStatCnt = ConfStringUtils.getIntValue(context,
- SourceConstants.SRCCXT_MAX_MONITOR_STAT_CNT,
SourceConstants.VAL_DEF_MON_STAT_CNT);
- Preconditions.checkArgument(this.maxMonitorStatCnt >=
SourceConstants.VAL_MIN_MON_STAT_CNT,
- SourceConstants.SRCCXT_MAX_MONITOR_STAT_CNT + " must be >= "
- + SourceConstants.VAL_MIN_MON_STAT_CNT);
// get max connect count
this.maxConnections = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_MAX_CONNECTION_CNT,
SourceConstants.VAL_DEF_MAX_CONNECTION_CNT);
Preconditions.checkArgument(this.maxConnections >=
SourceConstants.VAL_MIN_CONNECTION_CNT,
SourceConstants.SRCCXT_MAX_CONNECTION_CNT + " must be >= "
+ SourceConstants.VAL_MIN_CONNECTION_CNT);
- // get whether enable file metric
- this.fileMetricOn =
context.getBoolean(SourceConstants.SRCCXT_FILE_METRIC_ON,
- SourceConstants.VAL_DEF_FILE_METRIC_ON);
// get max receive buffer size
this.maxRcvBufferSize = ConfStringUtils.getIntValue(context,
SourceConstants.SRCCXT_RECEIVE_BUFFER_SIZE,
SourceConstants.VAL_DEF_RECEIVE_BUFFER_SIZE);
@@ -251,10 +234,15 @@ public abstract class BaseSource
CommonConfigHolder.getInstance().getClusterName(), getName(),
String.valueOf(srcPort));
MetricRegister.register(metricItemSet);
// init monitor logic
- if (fileMetricOn) {
- this.monitorIndex = new MonitorIndex("Source", monitorStatInvlSec,
maxMonitorStatCnt);
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
this.monitorIndexExt = new MonitorIndexExt(
- "DataProxy_monitors#" + this.getProtocolName(),
monitorStatInvlSec, maxMonitorStatCnt);
+
CommonConfigHolder.getInstance().getFileMetricEventOutName()
+ + AttrConstants.SEP_HASHTAG +
this.getProtocolName(),
+
CommonConfigHolder.getInstance().getFileMetricStatInvlSec(),
+
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
}
startSource();
// register
@@ -285,7 +273,7 @@ public abstract class BaseSource
// stop super class
super.stop();
// stop file statistic index
- if (fileMetricOn) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
if (monitorIndex != null) {
monitorIndex.shutDown();
}
@@ -373,13 +361,13 @@ public abstract class BaseSource
}
public void fileMetricEventInc(String eventKey) {
- if (fileMetricOn) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
monitorIndexExt.incrementAndGet(eventKey);
}
}
public void fileMetricRecordAdd(String key, int cnt, int packCnt, long
packSize, int failCnt) {
- if (fileMetricOn) {
+ if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
monitorIndex.addAndGet(key, cnt, packCnt, packSize, failCnt);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
index 1f556c895..64bcf55eb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/SourceConstants.java
@@ -62,17 +62,6 @@ public class SourceConstants {
public static final int VAL_DEF_WORKER_THREADS =
Runtime.getRuntime().availableProcessors();
public static final int VAL_MIN_WORKER_THREADS = 1;
public static final int VAL_MAX_WORKER_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
- // file metric statistic interval(second)
- public static final String SRCCXT_STAT_INTERVAL_SEC = "stat-interval-sec";
- public static final int VAL_DEF_STAT_INVL_SEC = 60;
- public static final int VAL_MIN_STAT_INVL_SEC = 0;
- // max file statistic key count
- public static final String SRCCXT_MAX_MONITOR_STAT_CNT = "max-monitor-cnt";
- public static final int VAL_DEF_MON_STAT_CNT = 1000000;
- public static final int VAL_MIN_MON_STAT_CNT = 0;
- // max file statistic key count
- public static final String SRCCXT_FILE_METRIC_ON = "file-metric-on";
- public static final boolean VAL_DEF_FILE_METRIC_ON = true;
// max connection count
public static final String SRCCXT_MAX_CONNECTION_CNT = "connections";
public static final int VAL_DEF_MAX_CONNECTION_CNT = 5000;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 924760a7e..8de181843 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -274,7 +274,8 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
// build metric data item
dataTime = dataTime / 1000 / 60 / 10;
dataTime = dataTime * 1000 * 60 * 10;
-
strBuff.append("http").append(AttrConstants.SEP_HASHTAG).append(topicName)
+ strBuff.append(source.getProtocolName())
+ .append(AttrConstants.SEP_HASHTAG).append(topicName)
.append(AttrConstants.SEP_HASHTAG).append(streamId)
.append(AttrConstants.SEP_HASHTAG).append(clientIp)
.append(AttrConstants.SEP_HASHTAG).append(NetworkUtils.getLocalIp())