This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c807df39f9 [INLONG-11320][Audit] Add a metric monitoring system for
the Audit Proxy itself (#11359)
c807df39f9 is described below
commit c807df39f92ac57423f1f7a12cb86bd1a7c29e96
Author: doleyzi <[email protected]>
AuthorDate: Tue Oct 15 17:34:46 2024 +0800
[INLONG-11320][Audit] Add a metric monitoring system for the Audit Proxy
itself (#11359)
---
.../apache/inlong/audit/file/ConfigManager.java | 25 +++++-
.../apache/inlong/audit/metric/AbstractMetric.java | 23 +++++
.../inlong/audit/config/ConfigConstants.java | 31 +++++++
.../inlong/audit/metric/MetricDimension.java | 41 +++++++++
.../org/apache/inlong/audit/metric/MetricItem.java | 44 ++++++++++
.../apache/inlong/audit/metric/MetricsManager.java | 98 ++++++++++++++++++++++
.../metric/prometheus/ProxyPrometheusMetric.java | 83 ++++++++++++++++++
.../org/apache/inlong/audit/node/Application.java | 8 +-
.../org/apache/inlong/audit/sink/KafkaSink.java | 5 ++
.../org/apache/inlong/audit/sink/PulsarSink.java | 3 +
.../org/apache/inlong/audit/sink/TubeSink.java | 5 ++
.../inlong/audit/source/ServerMessageHandler.java | 14 ++++
inlong-audit/conf/application.properties | 8 +-
13 files changed, 382 insertions(+), 6 deletions(-)
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
index 68745efd23..62a2f551ca 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/file/ConfigManager.java
@@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
public class ConfigManager {
@@ -94,12 +95,28 @@ public class ConfigManager {
return null;
}
- public String getValue(String key) {
+ public <T> T getValue(String key, T defaultValue, Function<String, T>
parser) {
ConfigHolder holder = holderMap.get(DEFAULT_CONFIG_PROPERTIES);
- if (holder != null) {
- return holder.getHolder().get(key);
+ if (holder == null) {
+ return defaultValue;
}
- return null;
+ Object value = holder.getHolder().get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return parser.apply((String) value);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+ public String getValue(String key, String defaultValue) {
+ return getValue(key, defaultValue, Function.identity());
+ }
+
+ public int getValue(String key, int defaultValue) {
+ return getValue(key, defaultValue, Integer::parseInt);
}
private boolean updatePropertiesHolder(Map<String, String> result,
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
new file mode 100644
index 0000000000..4c2f627916
--- /dev/null
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/metric/AbstractMetric.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+public interface AbstractMetric {
+
+ public void report();
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
new file mode 100644
index 0000000000..67d5183ce5
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/config/ConfigConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.config;
+
+/**
+ * Config constants
+ */
+public class ConfigConstants {
+
+ public static final String AUDIT_PROXY_SERVER_NAME = "audit-proxy";
+ public static final String KEY_PROMETHEUS_PORT =
"audit.proxy.prometheus.port";
+ public static final int DEFAULT_PROMETHEUS_PORT = 10082;
+ public static final String KEY_PROXY_METRIC_CLASSNAME =
"audit.proxy.metric.classname";
+ public static final String DEFAULT_PROXY_METRIC_CLASSNAME =
+ "org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric";
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
new file mode 100644
index 0000000000..3f410330c0
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricDimension.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+public enum MetricDimension {
+
+ RECEIVE_COUNT_SUCCESS("receiveCountSuccess"),
+ RECEIVE_PACK_SUCCESS("receivePackSuccess"),
+ RECEIVE_SIZE_SUCCESS("receiveSizeSuccess"),
+ RECEIVE_COUNT_INVALID("receiveCountInvalid"),
+ RECEIVE_COUNT_EXPIRED("receiveCountExpired"),
+ SEND_COUNT_SUCCESS("sendCountSuccess"),
+ SEND_COUNT_FAILED("sendCountFailed"),
+ SEND_PACK_SUCCESS("sendPackSuccess"),
+ SEND_DURATION("sendDuration");
+
+ private final String key;
+
+ MetricDimension(String key) {
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
new file mode 100644
index 0000000000..a95f4e3484
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricItem.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+import lombok.Data;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+@Data
+public class MetricItem {
+
+ public static final String K_DIMENSION_KEY = "dimensionName";
+ private AtomicLong receiveCountSuccess = new AtomicLong(0);
+ private AtomicLong receivePackSuccess = new AtomicLong(0);
+ private AtomicLong receiveSizeSuccess = new AtomicLong(0);
+ private AtomicLong receiveCountInvalid = new AtomicLong(0);
+ private AtomicLong receiveCountExpired = new AtomicLong(0);
+ private AtomicLong sendCountSuccess = new AtomicLong(0);
+ private AtomicLong sendCountFailed = new AtomicLong(0);
+ public void resetAllMetrics() {
+ receiveCountSuccess.set(0);
+ receivePackSuccess.set(0);
+ receiveSizeSuccess.set(0);
+ receiveCountInvalid.set(0);
+ receiveCountExpired.set(0);
+ sendCountSuccess.set(0);
+ sendCountFailed.set(0);
+ }
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
new file mode 100644
index 0000000000..433fc71848
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/MetricsManager.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric;
+
+import org.apache.inlong.audit.file.ConfigManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
+import static
org.apache.inlong.audit.config.ConfigConstants.DEFAULT_PROXY_METRIC_CLASSNAME;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PROMETHEUS_PORT;
+import static
org.apache.inlong.audit.config.ConfigConstants.KEY_PROXY_METRIC_CLASSNAME;
+
+public class MetricsManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MetricsManager.class);
+
+ private static class Holder {
+
+ private static final MetricsManager INSTANCE = new MetricsManager();
+ }
+
+ private AbstractMetric metric;
+
+ public void init(String metricName) {
+ try {
+ ConfigManager configManager = ConfigManager.getInstance();
+ String metricClassName =
configManager.getValue(KEY_PROXY_METRIC_CLASSNAME,
DEFAULT_PROXY_METRIC_CLASSNAME);
+ LOGGER.info("Metric class name: {}", metricClassName);
+ Constructor<?> constructor = Class.forName(metricClassName)
+ .getDeclaredConstructor(String.class, MetricItem.class,
int.class);
+ constructor.setAccessible(true);
+ metric = (AbstractMetric) constructor.newInstance(metricName,
metricItem,
+ configManager.getValue(KEY_PROMETHEUS_PORT,
DEFAULT_PROMETHEUS_PORT));
+
+ timer.scheduleWithFixedDelay(() -> {
+ metric.report();
+ metricItem.resetAllMetrics();
+ }, 0, 1, TimeUnit.MINUTES);
+ } catch (ClassNotFoundException | NoSuchMethodException |
InstantiationException | IllegalAccessException
+ | InvocationTargetException exception) {
+ LOGGER.error("Init metrics manager has exception: ", exception);
+ }
+ }
+
+ public static MetricsManager getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ private final MetricItem metricItem = new MetricItem();
+ protected final ScheduledExecutorService timer =
Executors.newSingleThreadScheduledExecutor();
+
+ public void addReceiveCountInvalid(long count) {
+ metricItem.getReceiveCountInvalid().addAndGet(count);
+ }
+
+ public void addReceiveCountExpired(long count) {
+ metricItem.getReceiveCountExpired().addAndGet(count);
+ }
+
+ public void addReceiveSuccess(long count, long pack, long size) {
+ metricItem.getReceiveCountSuccess().addAndGet(count);
+ metricItem.getReceivePackSuccess().addAndGet(pack);
+ metricItem.getReceiveSizeSuccess().addAndGet(size);
+ }
+
+ public void addSendSuccess(long count) {
+ metricItem.getSendCountSuccess().addAndGet(count);
+ }
+ public void addSendFailed(long count) {
+ metricItem.getSendCountFailed().addAndGet(count);
+ }
+ public void shutdown() {
+ timer.shutdown();
+ }
+}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
new file mode 100644
index 0000000000..07c2397743
--- /dev/null
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/metric/prometheus/ProxyPrometheusMetric.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.metric.prometheus;
+
+import org.apache.inlong.audit.metric.AbstractMetric;
+import org.apache.inlong.audit.metric.MetricDimension;
+import org.apache.inlong.audit.metric.MetricItem;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.exporter.HTTPServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * PrometheusMetric
+ */
+public class ProxyPrometheusMetric extends Collector implements AbstractMetric
{
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProxyPrometheusMetric.class);
+ private static final String HELP_DESCRIPTION = "help";
+
+ private final MetricItem metricItem;
+ private final String metricName;
+ private HTTPServer server;
+
+ public ProxyPrometheusMetric(String metricName, MetricItem metricItem, int
prometheusPort) {
+ this.metricName = metricName;
+ this.metricItem = metricItem;
+ try {
+ server = new HTTPServer(prometheusPort);
+ this.register();
+ } catch (IOException e) {
+ LOGGER.error("Construct proxy prometheus metric has IOException",
e);
+ }
+ }
+
+ @Override
+ public List<MetricFamilySamples> collect() {
+ List<MetricFamilySamples.Sample> samples = Arrays.asList(
+ createSample(MetricDimension.RECEIVE_COUNT_SUCCESS,
metricItem.getReceiveCountSuccess().doubleValue()),
+ createSample(MetricDimension.RECEIVE_PACK_SUCCESS,
metricItem.getReceivePackSuccess().doubleValue()),
+ createSample(MetricDimension.RECEIVE_SIZE_SUCCESS,
metricItem.getReceiveSizeSuccess().doubleValue()),
+ createSample(MetricDimension.RECEIVE_COUNT_INVALID,
metricItem.getReceiveCountInvalid().doubleValue()),
+ createSample(MetricDimension.RECEIVE_COUNT_EXPIRED,
metricItem.getReceiveCountExpired().doubleValue()),
+ createSample(MetricDimension.SEND_COUNT_SUCCESS,
metricItem.getSendCountSuccess().doubleValue()),
+ createSample(MetricDimension.SEND_COUNT_FAILED,
metricItem.getSendCountFailed().doubleValue()));
+
+ MetricFamilySamples metricFamilySamples =
+ new MetricFamilySamples(metricName, Type.GAUGE,
HELP_DESCRIPTION, samples);
+
+ return Collections.singletonList(metricFamilySamples);
+ }
+
+ private MetricFamilySamples.Sample createSample(MetricDimension key,
double value) {
+ return new MetricFamilySamples.Sample(metricName,
Collections.singletonList(MetricItem.K_DIMENSION_KEY),
+ Collections.singletonList(key.getKey()), value);
+ }
+
+ @Override
+ public void report() {
+ LOGGER.info("Report proxy prometheus metric: {} ",
metricItem.toString());
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
index 4ab7f29e51..1aa7f6b7d0 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -18,6 +18,7 @@
package org.apache.inlong.audit.node;
import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -58,8 +59,9 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
+import static
org.apache.inlong.audit.config.ConfigConstants.AUDIT_PROXY_SERVER_NAME;
+
/**
- *
* Application
*/
public class Application {
@@ -259,6 +261,7 @@ public class Application {
/**
* main
+ *
* @param args
*/
public static void main(String[] args) {
@@ -344,9 +347,12 @@ public class Application {
@Override
public void run() {
appReference.stop();
+ MetricsManager.getInstance().shutdown();
}
});
+ MetricsManager.getInstance().init(AUDIT_PROXY_SERVER_NAME);
+
} catch (Exception e) {
logger.error("A fatal error occurred while running. Exception
follows.", e);
}
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
index dc2c7c154d..db2a63c46d 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/KafkaSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.audit.sink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -385,6 +386,8 @@ public class KafkaSink extends AbstractSink implements
Configurable {
}
public void handleMessageSendSuccess(EventStat es) {
+ MetricsManager.getInstance().addSendSuccess(1);
+
// Statistics tube performance
totalKafkaSuccSendCnt.incrementAndGet();
totalKafkaSuccSendSize.addAndGet(es.getEvent().getBody().length);
@@ -494,6 +497,8 @@ public class KafkaSink extends AbstractSink implements
Configurable {
} else {
logger.warn("Send message failed, error message: {},
resendQueue size: {}, event:{}",
e.getMessage(), resendQueue.size(),
es.getEvent().hashCode());
+
+ MetricsManager.getInstance().addSendFailed(1);
}
es.incRetryCnt();
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
index a081c32cb8..c99cb9ce16 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -18,6 +18,7 @@
package org.apache.inlong.audit.sink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.metric.MetricsManager;
import org.apache.inlong.audit.sink.pulsar.CreatePulsarClientCallBack;
import org.apache.inlong.audit.sink.pulsar.PulsarClientService;
import org.apache.inlong.audit.sink.pulsar.SendMessageCallBack;
@@ -319,6 +320,7 @@ public class PulsarSink extends AbstractSink
@Override
public void handleMessageSendSuccess(Object result, EventStat eventStat) {
+ MetricsManager.getInstance().addSendSuccess(1);
/*
* Statistics pulsar performance
*/
@@ -346,6 +348,7 @@ public class PulsarSink extends AbstractSink
@Override
public void handleMessageSendException(EventStat eventStat, Object e) {
+ MetricsManager.getInstance().addSendFailed(1);
if (e instanceof TooLongFrameException) {
PulsarSink.this.overflow = true;
} else if (e instanceof ProducerQueueIsFullError) {
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
index c6cefcb088..f2eab52615 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/sink/TubeSink.java
@@ -20,6 +20,7 @@ package org.apache.inlong.audit.sink;
import org.apache.inlong.audit.base.HighPriorityThreadFactory;
import org.apache.inlong.audit.consts.ConfigConstants;
import org.apache.inlong.audit.file.ConfigManager;
+import org.apache.inlong.audit.metric.MetricsManager;
import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.audit.MQInfo;
@@ -313,6 +314,8 @@ public class TubeSink extends AbstractSink implements
Configurable {
* Send message of success.
*/
public void handleMessageSendSuccess(EventStat es) {
+ MetricsManager.getInstance().addSendSuccess(1);
+
// Statistics tube performance
totalTubeSuccSendCnt.incrementAndGet();
totalTubeSuccSendSize.addAndGet(es.getEvent().getBody().length);
@@ -630,6 +633,8 @@ public class TubeSink extends AbstractSink implements
Configurable {
return;
}
+ MetricsManager.getInstance().addSendFailed(1);
+
// handle sent error
if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
logger.warn("Send message failed, error message: {},
resendQueue size: {}, event:{}",
diff --git
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
index 7768ef5e8c..7595586dd4 100644
---
a/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
+++
b/inlong-audit/audit-proxy/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.audit.source;
+import org.apache.inlong.audit.metric.MetricsManager;
import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
@@ -142,13 +143,20 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
private AuditReply handleRequest(AuditRequest auditRequest) throws
Exception {
if (auditRequest == null) {
+
+ MetricsManager.getInstance().addReceiveCountInvalid(1);
+
throw new Exception("audit request cannot be null");
}
+
AuditReply reply = AuditReply.newBuilder()
.setRequestId(auditRequest.getRequestId())
.setRspCode(RSP_CODE.SUCCESS)
.build();
List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+
+ MetricsManager.getInstance().addReceiveSuccess(bodyList.size(), 1,
auditRequest.getSerializedSize());
+
int errorMsgBody = 0;
LOGGER.debug("Receive message count: {}",
auditRequest.getMsgBodyCount());
for (AuditMessageBody auditMessageBody : bodyList) {
@@ -156,6 +164,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
if (msgDays >= this.msgValidThresholdDays) {
LOGGER.debug("Discard the data as it is from {} days ago, only
the data with a log timestamp"
+ " less than {} days is valid", msgDays,
this.msgValidThresholdDays);
+
+ MetricsManager.getInstance().addReceiveCountExpired(1);
+
continue;
}
AuditData auditData = new AuditData();
@@ -194,6 +205,9 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
}
if (errorMsgBody != 0) {
+
+ MetricsManager.getInstance().addReceiveCountInvalid(errorMsgBody);
+
reply = reply.toBuilder()
.setMessage("writing data error, discard it, error body
count=" + errorMsgBody)
.setRspCode(RSP_CODE.FAILED)
diff --git a/inlong-audit/conf/application.properties
b/inlong-audit/conf/application.properties
index 17fd3461f6..063b01aa8f 100644
--- a/inlong-audit/conf/application.properties
+++ b/inlong-audit/conf/application.properties
@@ -50,4 +50,10 @@ audit.kafka.group.id=audit-consumer-group
audit.store.jdbc.driver=com.mysql.cj.jdbc.Driver
audit.store.jdbc.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2b8&rewriteBatchedStatements=true&allowMultiQueries=true&zeroDateTimeBehavior=CONVERT_TO_NULL
audit.store.jdbc.username=root
-audit.store.jdbc.password=inlong
\ No newline at end of file
+audit.store.jdbc.password=inlong
+
+############################
+# metric config
+# org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric is the
default monitoring
+###########################
+audit.proxy.metric.classname=org.apache.inlong.audit.metric.prometheus.ProxyPrometheusMetric
\ No newline at end of file